Tag Archives: Analytics

How FactSet automated exporting data from Amazon DynamoDB to Amazon S3 Parquet to build a data analytics platform

Post Syndicated from Arvind Godbole original https://aws.amazon.com/blogs/big-data/how-factset-automated-exporting-data-from-amazon-dynamodb-to-amazon-s3-parquet-to-build-a-data-analytics-platform/

This is a guest post by Arvind Godbole, Lead Software Engineer with FactSet and Tarik Makota, AWS Principal Solutions Architect. In their own words “FactSet creates flexible, open data and software solutions for tens of thousands of investment professionals around the world, which provides instant access to financial data and analytics that investors use to make crucial decisions. At FactSet, we are always working to improve the value that our products provide.”

One area that we’ve been looking into is the relevancy of search results for our clients. Given the wide variety of client use cases and the large number of searches per day, we needed a platform to store anonymized usage data and allow us to analyze that data to boost results using our custom scoring algorithm. Amazon EMR was the obvious choice to host the calculations, but the question arose on how to get our anonymized data into a form that Amazon EMR could use. We worked with AWS and chose to use Amazon DynamoDB to prepare the data for usage in Amazon EMR.

This post walks you through how FactSet takes data from a DynamoDB table and converts that data into Apache Parquet. We store the Parquet files in Amazon S3 to enable near real-time analysis with Amazon EMR. Along the way, we encountered challenges related to data type conversion, which we will explain and show how we were able to overcome these.

Workflow overview

Our workflow contained the following steps:

  1. Anonymized log data is stored into DynamoDB tables. These entries have different fields, depending on how the logs were generated. Whenever we create items in the tables, we use DynamoDB Streams to write out a record. The stream records contain information from a single item in a DynamoDB table.
  2. An AWS Lambda function is hooked into the DynamoDB stream to capture the new items stored in a DynamoDB table. We built our Lambda function off of the lambda-streams-to-firehose project on GitHub to convert the DynamoDB stream image to JSON, which we stringify and push to Amazon Kinesis Data Firehose.
  3. Kinesis Data Firehose transforms the JSON data into Parquet using data contained within an AWS Glue Data Catalog table.
  4. Kinesis Data Firehose stores the Parquet files in S3.
  5. An AWS Glue crawler discovers the schema of DynamoDB items and stores the associated metadata into the Data Catalog.

The following diagram illustrates this workflow.

AWS Glue provides tools to help with data preparation and analysis. A crawler can run on a DynamoDB table to take inventory of the table data and store that information in a Data Catalog. Other services can use the Data Catalog as an index to the location, schema, and types of the table data. There are other ways to add metadata into a Data Catalog, but the key idea is that you can update and modify the metadata easily. For more information, see Populating the AWS Glue Data Catalog.

Problem: Data type disparities

Using a variety of technologies to build a solution often requires mapping and converting data types between these technologies. The cloud is no exception. In our case, log items stored in DynamoDB contained attributes of type String Set. String Set values caused data conversion exceptions when Kinesis tried to transform the data to Parquet. After investigating the problem, we found the following:

  • As the crawler indexes the DynamoDB table, Set data types (StringSet, NumberSet) are stored in the Glue metadata catalog as set<string> and set<bigint>.
  • Kinesis Data Firehose uses that same catalog when it performs the conversion to Apache Parquet. The conversion requires valid Hive data types.
  • set<string> and set<bigint> are not valid Hive data types, so the conversion fails, and an exception is generated. The exception looks similar to the following code:
    [{
       "lastErrorCode": "DataFormatConversion.InvalidSchema",
       "lastErrorMessage": "The schema is invalid. Error parsing the schema: Error: type expected at the position 38 of 'array,used:bigint>>' but 'set' is found."
    }]

Solution: Construct data mapping

While working with the AWS team, we confirmed that the Kinesis Data Firehose converter needs valid Hive data types in the Data Catalog to succeed. When it comes to complex data types, Hive doesn’t support set<data_type>, but it does support the following:

  • ARRAY<data_type>
  • MAP<primitive_type, data_type
  • STRUCT<col_name : data_type [COMMENT col_comment], ...>
  • UNIONTYPE<data_type, data_type, ...>

In our case, this meant that we must convert set<string> and set<bigint> into array<string> and array<bigint>. Our first step was to manually change the types directly in the Data Catalog. After we updated the Data Catalog to change all occurrences of set<data_type> to array<data_type>, the Kinesis transformation to Parquet completed successfully.

Our business case calls for a data store that can store items with different attributes in the same table and the addition of new attributes on-the-fly. We took advantage of DynamoDB’s schema-less nature and ability to scale up and down on-demand so we could focus on our functionality and not the management of the underlying infrastructure. For more information, see Should Your DynamoDB Table Be Normalized or Denormalized?

If our data had a static schema, a manual change would be good enough. Given our business case, a manual solution wasn’t going to scale. Every time we introduced new attributes to the DynamoDB table, we needed to run the crawler, which re-created the metadata and overwrote the change.

Serverless event architecture

To automate the data type updates to the Data Catalog, we used Amazon EventBridge and Lambda to implement the modifications to the data type mapping. EventBridge is a serverless event bus that connects applications using events. An event is a signal that a system’s state has changed, such as the status of a Data Catalog table.

The following diagram shows the previous workflow with the new architecture.

  1. The crawler stays as-is and crawls the DynamoDB table to obtain the metadata.
  2. The metadata obtained by the crawler is stored in the Data Catalog. Previous metadata is updated or removed, and changes (manual or automated) are overwritten.
  3. The event GlueTableChanged in EventBridge listens to any changes to the Data Catalog tables. After we receive the event that there was a change to the table, we trigger the Lambda function.
  4. The Lambda function uses AWS SDK to update the Glue Catalog table using the glue.update_table() API to replace occurrences of set<data_type> with array<data_type>.

To set up EventBridge, we set Event pattern to be “Pre-defined pattern by service”. For service provider, we selected AWS and Glue as service. Event Type we selected “Glue Data Catalog Table State Change”. The following screenshot shows the EventBridge configuration that sends events to the Lambda function that updates the Data Catalog.

The following is the baseline Lambda code:

# This is NOT production worthy code please modify and implement error handling routines as appropriate
import json
import logging
import boto3

glue = boto3.client('glue')

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Define subsegments manually
def table_contains_set(databaseName, tableName):
    
    # returns Glue Catalog description for Table structure
    response = glue.get_table( DatabaseName=databaseName,Name=tableName)
    logger.info(response)  
    
    # loop thru all the Columns of the table 
    isModified = False
    for i in response['Table']['StorageDescriptor']['Columns']: 
        logger.info("## Column: " + str(i['Name']))
        # if Column datatype starts with set< then change it to array<
        if i['Type'].find("set<") != -1:
            i['Type'] = i['Type'].replace("set<", "array<")
            isModified = True
            logger.info(i['Type'])
    
    if isModified:
        # following 3 statements simply clean up the response JSON so that update_table API call works
        del response['Table']['DatabaseName']
        del response['Table']['CreateTime']
        del response['Table']['UpdateTime']
        glue.update_table(DatabaseName=databaseName,TableInput=response['Table'],SkipArchive=True)
        
    logger.info("============ ### =============") 
    logger.info(response)
    
    return True
    
def lambda_handler(event, context):
    logger.info('## EVENT')
    # logger.info(event)
    # This is Sample of the event payload that would be received
    # { 'version': '0', 
    #   'id': '2b402842-21f5-1d76-1a9a-c90076d1d7da', 
    #   'detail-type': 'Glue Data Catalog Table State Change', 
    #   'source': 'aws.glue', 
    #   'account': '1111111111', 
    #   'time': '2019-08-18T02:53:41Z', 
    #   'region': 'us-east-1', 
    #   'resources': ['arn:aws:glue:us-east-1:111111111:table/ddb-glue-fh/ddb_glu_fh_sample'], 
    #   'detail': {
    #           'databaseName': 'ddb-glue-fh', 
    #           'changedPartitions': [], 
    #           'typeOfChange': 'UpdateTable', 
    #           'tableName': 'ddb_glu_fh_sample'
    #    }
    # }
    
    # get the database and table name of the Glue table triggered the event
    databaseName = event['detail']['databaseName']
    tableName = event['detail']['tableName']
    logger.info("DB: " + databaseName + " | Table: " + tableName)
    
    table_contains_set(databaseName, tableName)
   
    # TODO implement and modify
    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }

The Lambda function is straightforward; this post provides a basic skeleton. You can use this as a template to implement your own functionality for your specific data.

Conclusion

Simple things such as data type conversion and mapping can create unexpected outcomes and challenges when data crosses service boundaries. One of the advantages of AWS is the wide variety of tools with which you can create robust and scalable solutions tailored to your needs. Using event-driven architecture, we solved our data type conversion errors and automated the process to eliminate the issue as we move forward.

 


About the Authors

Arvind Godbole is a Lead Software Engineer at FactSet Research Systems. He has experience in building high-performance, high-availability client facing products and services, ranging from real-time financial applications to search infrastructure. He is currently building an analytics platform to gain insights into client workflows. He holds a B.S. in Computer Engineering from the University of California, San Diego

 

 

 

Tarik Makota is a Principal Solutions Architect with the Amazon Web Services. He provides technical guidance, design advice and thought leadership to AWS’ customers across US Northeast. He holds an M.S. in Software Development and Management from Rochester Institute of Technology.

 

 

Amazon Redshift at re:Invent 2019

Post Syndicated from Corina Radovanovich original https://aws.amazon.com/blogs/big-data/amazon-redshift-at-reinvent-2019/

The annual AWS re:Invent learning conference is an exciting time full of new product and program launches. At the first re:Invent conference in 2012, AWS announced Amazon Redshift. Since then, tens of thousands of customers have started using Amazon Redshift as their cloud data warehouse. In 2019, AWS shared several significant launches and dozens of sessions. This post presents highlights of what happened with Amazon Redshift at re:Invent 2019.

Andy Jassy’s AWS re:Invent 2019 keynote

When Andy Jassy takes the stage to talk about what’s new at AWS, he launches the new Amazon Redshift node type, RA3 with managed storage; the new Federated Query (preview) feature, Export to Data Lake; and Advanced Query Accelerator (AQUA) (preview) for Amazon Redshift. Watch AWS re:Invent 2019 – Keynote with Andy Jassy on YouTube, or jump ahead for the Amazon RedShift announcements.

Deep dive and best practices for Amazon Redshift

Every year the Amazon Redshift deep dive session rates highly, and people continue to watch and re-watch it after the event. This year was no different. Specialist Solution Architects Harshida Patel and Tony Gibbs take an in-depth look at best practices for data warehousing with Amazon Redshift. It’s a must-see for existing Amazon Redshift users. Watch AWS re:Invent 2019: Deep dive and best practices for Amazon Redshift (ANT418) on YouTube.

What’s new with Amazon Redshift, featuring Yelp and Workday

With over 200 new features and capabilities launched in the last 18 months, there’s a lot to cover in a session about what’s new with Amazon Redshift. Join one of the Product Managers driving RA3 and managed storage, Himanshu Raja, to catch up on the recent performance, concurrency, elasticity, and manageability enhancements behind Amazon Redshift’s record price-to-performance ratio. You also get more insight into the architectural evolution of Amazon Redshift with RA3 and managed storage, and how it uses machine learning to create a self-optimizing data warehouse. In the second half of the session, you hear from Steven Moy, a software engineer at Yelp, about how Amazon Redshift’s latest features have helped Yelp achieve optimization and scale for an organization with an enormous about of data and sophisticated analytics. Watch AWS re:Invent 2019: What’s new with Amazon Redshift, featuring Yelp (ANT320-R1) on YouTube.

The session repeated with Michalis Petropoulos, Director of Engineering, and Erol Guney of Workday. Watch the full session to get a slightly different take on what’s new, or jump to the customer presentation to hear from Erol Guney, Architect, Data Platform, at Workday about how Amazon Redshift empowers their Data as a Service product team to focus on architecture goals and business logic.

Migrate your data warehouse to the cloud in record time, featuring Nielsen and Fannie Mae

In this session, you learn about important concepts and tips for migrating your legacy on-premise data warehouse to the cloud. You hear from Tejas Desai, VP of Technology at Neilsen, about their migration journey and benefits. Watch AWS re:Invent 2019: Migrate your data warehouse to the cloud, featuring Nielsen (ANT334-R1) on YouTube.

The repeat of this session features Amy Tseng from Fannie Mae. If you don’t want to listen to Tony’s overview again, skip ahead to learn how Fannie Mae embraced a data lake architecture with Amazon Redshift for analytics to save costs, maximize performance, and scale. Amy’s presentation was a crowd favorite, with some of the most positive customer feedback and a wealth of great information about how Fannie Mae managed their migration.

How to scale data analytics with Amazon Redshift, featuring Duolingo and Warner Brothers Interactive Entertainment

Data is growing fast, and so is the value that business users need to gain from business data. When AWS first announced Amazon Redshift in 2012, it could handle up to 640 TB of compressed data. It can now scale to 8 PB of compressed data. Learn more about Amazon Redshift’s unique ability to deliver top performance at the lowest and most predictable cost from Vinay Shukla, Principal Product Manager. This is an especially important session if you want to learn more about the newest Amazon Redshift node type RA3. You also hear from Jonathan Burket of Duolingo about their experience in the preview of RA3 nodes and how Duolingo uses Amazon Redshift. Duolingo is a wildly popular language-learning platform and the most downloaded education app in the world, with over 300 million users. Enabling data-driven decisions with A/B tests and ad hoc analysis has been a driver of their success. Watch AWS re:Invent 2019: How to scale data analytics with Amazon Redshift (ANT335-R2) on YouTube.

The repeat session features Redshift Product Manager Maor Kleider with an in-depth case study from Matt Howell, Executive Director, Analytics, and Kurt Larson, Technical Director, Analytics, at Warner Brothers Interactive Entertainment. Watch the full session for another perspective about how to scale with the latest Amazon Redshift features, with unique insights about analytics across Amazon Redshift and your data lake. You can also jump to the customer presentation. Not only is this session packed with interesting insights about how data analytics drives the success of games like Batman and Mortal Kombat, it also has an action-packed trailer about all the awesome Warner Brothers games.

If you prefer to see a session without the announcements from the keynote and with demos, watch Debu Panda showcase the new Amazon Redshift console and share practical tips about using Amazon Redshift.

Amazon Redshift reimagined: RA3 and AQUA

This embargoed session is the first opportunity to learn more about AQUA for Amazon Redshift, and how it improves query performance to up to 10 times faster. Britt Johnston, Director of Product Management, kicks off with an intro into the next generation of Amazon Redshift, and Senior Principal Engineer Andy Caldwell jumps in to share the origin and vision of the exciting new technology. The enthusiasm Andy feels about sharing AQUA with customers for the first time is palpable. Watch AWS re:Invent 2019: [NEW LAUNCH!] Amazon Redshift reimagined: RA3 and AQUA (ANT230) on YouTube.

State-of-the-art cloud data warehousing, featuring Asurion and Comcast

This session serves as a great introduction to cloud data warehousing at AWS, with insightful presentations from a different customer in each delivery. You can hear from Asurion about how they use data analytics to serve over 300 million people with excellent customer satisfaction scores. You learn about how to use AWS services with Amazon Redshift and why Asurion believes in their data lake-based architecture. Watch AWS re:Invent 2019: State-of-the-art cloud data warehousing, featuring Asurion (ANT213-R1) on YouTube.

In the repeat session, Rajat Garg, Senior Principal Architect from Comcast, talks about moving to Amazon Redshift from a legacy on-premise Oracle Exadata environment. He shares their strategy, approach, and performance improvements.

What’s next and more information

In addition to these sessions at re:Invent, there are also hands-on workshops, intimate builder roundtables, and interactive chalk talks that weren’t recorded.

Keep exploring the following links for more information about new releases:

We hope to see you in Las Vegas for re:Invent 2020, or at one of the hundreds of other AWS virtual and in-person events running around the world. For more information, see AWS Events and Webinars.


About the authors

Corina Radovanovich leads product marketing for cloud data warehousing at AWS.

 

 

 

How Verizon Media Group migrated from on-premises Apache Hadoop and Spark to Amazon EMR

Post Syndicated from Lev Brailovskiy original https://aws.amazon.com/blogs/big-data/how-verizon-media-group-migrated-from-on-premises-apache-hadoop-and-spark-to-amazon-emr/

This is a guest post by Verizon Media Group.

At Verizon Media Group (VMG), one of the major problems we faced was the inability to scale out computing capacity in a required amount of time—hardware acquisitions often took months to complete. Scaling and upgrading hardware to accommodate workload changes was not economically viable, and upgrading redundant management software required significant downtimes and carried a large amount of risk.

At VMG, we depend on technologies such as Apache Hadoop and Apache Spark to run our data processing pipelines. We previously managed our clusters with Cloudera Manager, which was subject to slow release cycles. As a result, we ran older versions of available open-source releases and couldn’t take advantage of the latest bug fixes and performance improvements on Apache projects. These reasons, combined with our already existing investment in AWS, made us explore migrating our distributed computing pipelines to Amazon EMR.

Amazon EMR is a managed cluster platform that simplifies running big data frameworks, such as Apache Hadoop and Apache Spark.

This post discusses the issues we encountered and solved while building a pipeline to address our data processing needs.

About us

Verizon Media is, ultimately, an online advertising company. Most online advertising today is done through display ads, also known as banners or video ads. Regardless of format, all internet ads usually fire various kinds of beacons to tracking servers, which are usually highly scalable web server deployments with a sole responsibility to log received beacons to one or multiple event sinks.

Pipeline architecture

In our group, which deals mostly with video advertising, we use NGINX web servers deployed in multiple geographical locations, which log events fired from our video player directly to Apache Kafka for real-time processing and to Amazon S3 for batch processing. A typical data pipeline in our group involves processing such input feeds, applying validation and enrichment routines, aggregating resulting data, and replicating it to further destinations for reporting purposes. The following diagram shows a typical pipeline that we created.

We start getting data on our NGINX beacon servers. The data is stored in 1-minute intervals on local disk in gzip files. Every minute, we move the data from NGINX servers to raw data location in S3. Upon landing on S3, the file sends a message to Amazon SQS. Apache NiFi is listening to SQS messages to start working on files. During this time, NiFi groups smaller files into larger files and stores the outcome in a special path on a temporary location on S3. The path name is combined using an inverse timestamp to make sure we store data in a random location to avoid reading bottlenecks.

Every hour, we scale out a Spark cluster on Amazon EMR to process the raw data. This processing includes enriching and validating the data. This data is stored in a permanent location folder on S3 in an Apache ORC columnar format. We also update the AWS Glue Data Catalog to expose this data in Amazon Athena in case we need to investigate it for issues. After raw data processing is finished, we downscale the Spark EMR cluster and start aggregating data based on pre-defined aggregation templates using Presto on Amazon EMR. The aggregated data is stored in ORC format in a special location on S3 for aggregated data.

We also update our Data Catalog with the location of the data so we can query it with Athena. Additionally, we replicate the data from S3 into Vertica for our reporting to expose the data to internal and external customers. In this scenario, we use Athena as the disaster recovery (DR) solution for Vertica. Every time our reporting platform sees that Vertica is in bad health, we automatically fail over to Amazon Athena. This solution proved to be extremely cost-effective for us. We have another use case for Athena in our real-time analytics that we do not discuss in this post.

Migration challenges

Migration to Amazon EMR required us to make some design changes to get the best results. When running big data pipelines on the cloud, operational cost optimization is the name of the game. The two major costs are storage and compute. In traditional on-premises Hadoop warehouses, these are coupled as storage nodes that also serve as computation nodes. This can provide a performance advantage due to data locality. However, the disadvantage of this coupling is that any changes to the storage layer, such as maintenance, can also affect the computational layer. In an environment such as AWS, we can decouple storage and computation by using S3 for storage and Amazon EMR for computation. This provides a major flexibility advantage when dealing with cluster maintenance because all clusters are ephemeral.

To further save costs, we had to figure out how to achieve maximum utilization on our computational layer. This meant that we had to switch our platform to using multiple clusters for different pipelines, where each cluster is automatically scaled depending on the pipeline’s needs.

Switching to S3

Running a Hadoop data warehouse on S3 introduces additional considerations. S3 is not a file system like HDFS and does not provide the same immediate consistency guarantees. You can consider S3 as an eventually consistent object store with a REST API to access it.

The rename

A key difference with S3 is that rename is not an atomic operation. All rename operations on S3 run a copy followed by a delete operation. Executing renames on S3 is undesirable due to running time costs. To use S3 efficiently, you must remove the use of any rename operations. Renames are commonly used in Hadoop warehouses at various commit stages, such as moving a temporary directory to its final destination as an atomic operation. The best approach is to avoid any rename operations and instead write data once.

Output committers

Both Spark and Apache MapReduce jobs have commit stages that commit output files produced by multiple distributed workers to final output directories. Explaining how output committers work is beyond the scope of this post, but the important thing is that standard default output committers designed to work on HDFS depend on rename operations, which as explained previously have a performance penalty on storage systems like S3. A simple strategy that worked for us was disabling speculative execution and switching the output committer’s algorithm version. It is also possible to write your own custom committers, which do not depend on renames. For example, as of Amazon EMR 5.19.0, AWS released a custom OutputCommitter for Spark that optimizes writes to S3.

Eventual consistency

One of the major challenges working with S3 is that it is eventually consistent, whereas HDFS is strongly consistent. S3 does offer read-after-write guarantees for PUTS of new objects, but this is not always enough to build consistent distributed pipelines on. One common scenario that comes up a lot in big data processing is one job outputting a list of files to a directory and another job reading from that directory. For the second job to run, it has to list the directory to find all the files it has to read. In S3, there are no directories; we simply list files with the same prefix, which means you might not see all the new files immediately after your first job is finished running.

To address this issue, AWS offers EMRFS, which is a consistency layer added on top of S3 to make it behave like a consistent file system. EMRFS uses Amazon DynamoDB and keeps metadata about every file on S3. In simple terms, with EMRFS enabled when listing an S3 prefix, the actual S3 response is compared to metadata on DynamoDB. If there’s a mismatch, the S3 driver polls a little longer and waits for data to show up on S3.

In general, we found that EMRFS was necessary to ensure data consistency. For some of our data pipelines, we use PrestoDB to aggregate data that is stored on S3, where we chose to run PrestoDB without EMRFS support. While this has exposed us to the eventual consistency risk for our upstream jobs, we found that we can work around these issues by monitoring for discrepancies between downstream and upstream data and rerunning the upstream jobs if needed. In our experience, consistency issues happen very rarely, but they are possible. If you choose to run without EMRFS, you should design your system accordingly.

Automatic scaling strategies

An important and yet in some ways trivial challenge was figuring out how to take advantage of Amazon EMR automatic scaling capabilities. To achieve optimal operational costs, we want to make sure no server is sitting idle.

To achieve that, the answer might seem obvious—create a long-running EMR cluster and use readily available automatic scaling features to control a cluster’s size based on a parameter, such as available free memory on the cluster. However, some of our batch pipelines start every hour, run for exactly 20 minutes, and are computationally very intensive. Because processing time is very important, we want to make sure we don’t waste any time. The optimal strategy for us is to preemptively resize the cluster through custom scripts before particular big batch pipelines start.

Additionally, it would be difficult to run multiple data pipelines on a single cluster and attempt to keep it at optimal capacity at any given moment because every pipeline is slightly different. We have instead opted to run all our major pipelines on independent EMR clusters. This has a lot of advantages and only a minor disadvantage. The advantages are that each cluster can be resized at exactly the required time, run the software version required by its pipeline, and be managed without affecting other pipelines. The minor disadvantage is that there’s a small amount of computational waste by running extra name nodes and task nodes.

When developing an automatic scaling strategy, we first tried to create and drop clusters every time we need to run our pipelines. However, we quickly found that bootstrapping a cluster from scratch can take more time than we’d like. We instead keep these clusters always running, and we upsize the cluster by adding task nodes before the pipeline starts and remove the task nodes as soon as the pipeline ends. We found that by simply adding task nodes, we can start running our pipelines much faster. If we run into issues with long-running clusters, we can quickly recycle and create a new one from scratch. We continue to work with AWS on these issues.

Our custom automatic scaling scripts are simple Python scripts, which usually run before a pipeline starts. For example, assume that our pipeline consists of a simple MapReduce job with a single mapping and reduce phase. Also assume that the mapping phase is more computationally expensive. We can write a simple script that looks at the amount of data that needs to be processed the next hour and figures out the amount of mappers that are needed to process this data in the same way that a Hadoop job does. When we know the amount of mapping tasks, we can decide how many servers we want to run all the mapper tasks in parallel.

When running Spark real-time pipelines, things are a little trickier because we sometimes have to remove computational resources while the application is running. A simple strategy that worked for us is to create a separate real-time cluster in parallel to the existing one, scale it up to a required size based on amount of data processed during the last hour with some extra capacity, and restart the real-time application on the new cluster.

Operational costs

You can evaluate all AWS costs up front with the EC2 calculator. The main costs when running big data pipelines are storage and computation, with some extra minor costs such as DynamoDB when using EMRFS.

Storage costs

The first cost to consider is storage. Because HDFS has a default replication factor of 3, it would require 3 PB of actual storage capacity instead of 1 PB.

Storing 1 GB on S3 costs ±$0.023 per month. S3 is already highly redundant so you don’t need to take the replication factor into account, which reduces our costs immediately by 67%. You should also consider the other costs for write or read requests, but these usually tend to be small.

Computation costs

The second-largest cost after storage is the cost of computation. To reduce computation costs, you should take advantage of reserved instance pricing as much as possible. An m4.4xlarge instance type with 16 VCPUs on AWS costs $0.301 an hour when it is reserved for 3 years, with all fees up-front. An On-Demand Instance costs $0.8 an hour, which is a 62% difference in price. This is easier to achieve in larger organizations that perform regular capacity planning. An extra hourly fee of $0.24 is added to every Amazon EMR machine for the use of the Amazon EMR platform. It is possible to reduce costs even further by using Amazon EC2 Spot Instances. For more information, see Instance Purchasing Options.

To achieve optimal operational costs, try to make sure that your computation clusters are never sitting idle and try to downscale dynamically based on the amount of work your clusters are doing at any given moment.

Final thoughts

We have been operating our big data pipelines on Amazon EMR for over a year and storing all our data on S3. At times, our real-time processing pipelines have peaked at handling more than 2 million events per second, with a total processing latency from the initial event to updated aggregates of 1 minute. We’ve been enjoying the flexibility around Amazon EMR and its ability to tear down and recreate clusters in a matter of minutes. We are satisfied with the overall stability of the Amazon EMR platform and we will continue working with AWS to improve it.

As we have mentioned before, cost is a major factor to consider, and you could argue that it could be cheaper to run Hadoop in your own data centers. However, this argument hinges on your organization’s ability to do so efficiently; it may have hidden operational costs as well as reduce elasticity. We know through first-hand experience that running on-premises is not an undertaking that you should take lightly and requires a lot of planning and maintenance. We believe that platforms such as Amazon EMR bring a lot of advantages when designing big data systems.

Disclaimer: The content and opinions in this post are those of the third-party author and AWS is not responsible for the content or accuracy of this post.


About the authors

Lev Brailovskiy is Director of Engineering leading Service Engineering Group in Supply Side Platform (SSP) at Verizon Media. He has over 15 years of experience designing and building software systems. In the past six years, Lev spent time designing, developing, and running large-scale reporting and data processing software both in private Data Centers and in the public Cloud. He can be be contacted via LinkedIn.

 

 

Zilvinas Shaltys is Technical Lead for the Video Syndication cloud data warehouse platform at Verizon. Zilvinas has years of experience working with a wide variety of big data technologies deployed at considerable scale. He was responsible for migrating big data pipelines from AOL data centers to Amazon EMR. Zilvinas is currently working on improving stability and scalability of existing batch and realtime big data systems. He can be contacted via LinkedIn.

 

Maximize data ingestion and reporting performance on Amazon Redshift

Post Syndicated from Vasu Kiran Gorti original https://aws.amazon.com/blogs/big-data/maximize-data-ingestion-and-reporting-performance-on-amazon-redshift/

This is a guest post from ZS. In their own words, “ZS is a professional services firm that works closely with companies to help develop and deliver products and solutions that drive customer value and company results. ZS engagements involve a blend of technology, consulting, analytics, and operations, and are targeted toward improving the commercial experience for clients.”

ZS was involved in setting up and operating a MicroStrategy-based BI application that sources 700 GB of data from Amazon Redshift as a data warehouse in an Amazon-hosted backend architecture. ZS sourced healthcare data from various pharma data vendors from different systems such as Amazon S3 buckets and FTP systems into the data lake. They processed this data using transient Amazon EMR clusters and stored it on Amazon S3 for reporting consumption. The reporting-specific data is moved to Amazon Redshift using COPY commands, and MicroStrategy uses it to refresh front-end dashboards.

ZS has strict, client-set SLAs to meet with the available Amazon Redshift infrastructure. We carried out experiments to identify an approach to handle large data volumes using the available small Amazon Redshift cluster.

This post provides an approach for loading a large volume of data from S3 to Amazon Redshift and applies efficient distribution techniques for enhanced performance of reporting queries on a relatively small Amazon Redshift cluster.

Data processing methodology

ZS infrastructure is hosted on AWS, where they store and process pharma industry data from various vendors using AWS services before reporting the data on a MicroStrategy BI reporting tool. The following diagram shows the overall data flow from flat files to reports shown on MicroStrategy for end-users.

Step 1: Pharma data is sourced from various vendors and different systems like FTP location, individual systems and Amazon S3 buckets etc.

Step 2: Cost-effective transient clusters are spun as needed to provide compute power to execute pyspark codes.

Step 3: Post the processing, data is stored in Amazon S3 buckets for consumption of downstream applications.

Step 4: 700 GB of data is then ingested into Amazon Redshift for MSTR consumption.

Step 5: This data is read from Amazon Redshift and the insights are displayed to the end-users in the form of reports on MicroStrategy.

Dataset under consideration

In this specific scenario, ZS was working with data from the pharma domain. The following table demonstrates the data’s typical structure—it has several doctor, patient, treatment-pertinent IDs, and healthcare metrics.

Table 1
Column Name EMR datatypeAmazon Redshift datatype
Time IDintegerint
Geography IDintegerint
Product IDintegerint
Market IDintegerint
Doctor IDintegerint
Doctor Attribute 1 IDintegerint
Doctor Attribute 2 IDintegerint
Doctor Attribute 3 IDintegerint
Doctor Attribute 4 IDintegerint
Doctor Rankintegerint
Metric 1doubledecimal(18,6)
Metric 2doubledecimal(18,6)
Metric 3doubledecimal(18,6)
Metric 4doubledecimal(18,6)
Metric 5doubledecimal(18,6)
Metric 6doubledecimal(18,6)
Metric 7doubledecimal(18,6)
Metric 8doubledecimal(18,6)
Metric 9doubledecimal(18,6)
Metric 10doubledecimal(18,6)
Metric 11doubledecimal(18,6)
Metric 12doubledecimal(18,6)
Metric 13doubledecimal(18,6)
Metric 14doubledecimal(18,6)
Metric 15doubledecimal(18,6)
Metric 16doubledecimal(18,6)
Metric 17doubledecimal(18,6)
Metric 18doubledecimal(18,6)
Metric 19doubledecimal(18,6)
Metric 20doubledecimal(18,6)
Metric 21doubledecimal(18,6)
Metric 22doubledecimal(18,6)
Metric 23doubledecimal(18,6)
Data Snapshot Datetimestamptimestamp
Data Refresh Datetimestamptimestamp
Data Refresh IDstringvarchar

Each table has approximately 35–40 columns and holds approximately 200–250M rows of data. ZS used 40 such tables; they sourced the data in these tables from various healthcare data vendors and processed them as per reporting needs.

The total dataset is approximately 2 TB in size in CSV format and approximately 700 GB in Parquet format.

Challenges and constraints

The five-step process for data refresh and insight generation outlined previously takes place over the weekends within a stipulated time frame. Under default, unoptimized state data load from S3 to Amazon Redshift and MicroStrategy refresh (Step 4 in the previous diagram) took almost 13–14 hours on a 2node ds2.8xlarge cluster and was affecting the overall weekend run SLA (1.5 hours).

The following diagram outlines the three constraints that ZS had to solve for to meet client needs:

Weekly time-based SLA – Load within 1 hour and fetch data on MSTR within 1.5 hours

The client IT and Business teams set a strict SLA to load 700 GB of Parquet data (equivalent to 2 TB CSV) onto Amazon Redshift and refresh the reports on the MicroStrategy BI tool. In this scenario, the client team had moved from another vendor to AWS, and the overall client expectation was to reduce costs without a significant performance dip.

Fixed cluster size – Pre-decided 2 node ds2.8xlarge cluster

The client IT teams determined the cluster size and configuration, and took into consideration the cost, data volumes, and load patterns. These were fixed and not adjustable: a 2 node ds2.8xlarge cluster. ZS carried out PoCs to optimize the environment subject to these constraints.

High data volume – Truncate load 700GB data in Parquet format

The data that ZS used was pertinent to the Pharma domain. The dataset under consideration in this scenario was 700 GB in Parquet format. In this specific use case, with every refresh, even historic data was updated, and therefore a lot of data could not be appended. Therefore, we followed a truncate and load process.

Iterative optimization

With constraints over time, data volume, and cluster size, ZS performed various experiments to optimize Amazon Redshift data load and read time—two key aspects to gauge performance. ZS created an iterative framework that helps do the following:

  • Decide the file format
  • Define optimal data distribution through distribution and sort keys
  • Identify the techniques to parallelize the data-loading process

The below diagram shows the key steps that can be followed to get the best data load and read performance on any Amazon RedShift cluster.

Data load optimization

We identified and optimized four key factors impacting data load performance: file formats, file size at source, concurrency, and column encoding.

File formats

Many projects usually load data in CSV format from S3 to Amazon Redshift. ZS had data available in Parquet format with snappy compression as an output of Spark processes. (Spark processes work best with this combination.)

To identify an efficient format for Amazon Redshift, we compared Parquet with commonly used CSV and GZIP formats. We loaded a table from S3, with 200M rows of data generated through the Spark process, which equates to 41 GB in CSV, 11 GB in Parquet, and 10 GB in GZIP, and compared load time and CPU utilization. The below diagram shows load time vs CPU utilization for same data stored in different file formats.

For the dataset and constraints we were working with, loading the Parquet format file required low CPU utilization and lesser I/O compared to CSV and GZIP, and occupied a smaller memory footprint on S3 compared to the memory-intensive CSV format. Lower CPU utilization allowed more parallel loads in this scenario, thereby reducing the overall runtime required to load Parquet files.

File size at source

The next aspect was to choose the block size in which the Parquet files were broken down and stored on S3. A block size of 128 MB is commonly used for Spark jobs and considered to be optimal for data processing. However, Amazon Redshift works best with larger files.

We loaded 10 GB of Parquet data broken down into smaller equisized files of 250 MB, 750 MB, 1 GB, 1.5 GB, and 3 GB block sizes and noted the performance in each case. The following graph shows the different load times.

There was a gradual improvement in the data load time until the block size reached 1 GB (with best load timing). Beyond 1GB mark there was a dip in performance observed with larger files and Amazon Redshift took more time to process larger files.

These numbers are specific to the kind of data we were working with. The recommendations can vary as the form and shape of data changes.

As a best practice, identify the block size and have the number of files as a multiple of the number of slices of the Amazon Redshift cluster. This makes sure that each slice does an equal amount of work and there are no idle slices, thereby increasing efficiency and improving performance. For more information, see Top 8 Best Practices for High-Performance ETL Processing Using Amazon Redshift.

Concurrency

The COPY command is relatively low on memory. The more parallel the loads, the better the performance. ZS loaded a table approximately 7.3 GB multiple times with separate concurrency settings. We measured the throughput in terms of the average time taken per GB to move files to Amazon Redshift with 1 to 20 concurrent loads. The following table summarizes the results.

TestNumber of tables loaded in parallel (concurrency)Total data loaded (GB)
Test 117.3
Test 2536.5
Test 31073
Test 415109.5
Test 520146

The below diagram shows the time taken to load 1GB of data and the CPU utilization for various concurrency settings.

For the dataset and constraints we were working with, a concurrency of 10 gave the best throughput for our specific dataset, with a 25% CPU availability buffer of about 25%. Depending on the nature of the data and volume fluctuations every release, you can opt for a different buffer.

Column encoding

To identify the column encoding and compression on Amazon Redshift that gives the best performance and occupies lower storage footprint, ZS compared ZSTD (which the ANALYZE COMPRESSSION command recommended), LZO, and none encoding formats on Amazon Redshift tables for load performance. The below diagram shows the time taken to load same data volume into the tables with none, ZSTD and LZO encoding applied to the columns.

For the dataset and constraints we were working with, ZSTD encoding on columns offered a high compression ratio (~3, than when there was no compression used) and gave the best data copy performance and low storage footprint on Amazon Redshift for our use case. You can have varied results depending on the datatype and cardinality of the data.

Note: This solution was implemented prior to the feature release for AZ64 encoding and hence does not consider its impact. One could use the approach described in this blog post considering AZ64 compression encoding among all the compression encodings Amazon Redshift supports.

Data read optimization

ZS also improved the data read performance by MicroStrategy from Amazon Redshift by using distribution and sorting keys and SQL optimization (minimizing filters on MicroStrategy auto-generated SQL queries).

SQL queries

MicroStrategy is a business intelligence tool and reads data from a database by intelligently building its own SQL. We compared the performance of MSTR SQLs (typical DW queries such as SELECT, GROUP BY, or temporary tables) with and without filters and observed that the query with and without filters for our dataset ran in almost the same time and used the same resources. But the unfiltered query gave four times more data. The following table summarizes the results.

Filters?

Rows fetchedQuery runtimeCPU utilization

Y

500K

5.1 (mins)

15%

N2M5.1 (mins)

15%

Tweaking the SQLs to have minimum or no filters can help fetch significantly more rows with only a small increase in processing time, compared to what it takes to read data with more filters. If you need to use the entire dataset, it is better to fetch complete data using one query than using several SQLs with separate filters and running them in parallel.

Distribution and sort keys

The following table is an analysis of a table load and read performance with and without distribution and sort keys for our use case.

Dist. Style Dist KeySort KeysLoad time (mins)Query run time (mins)Query CPU utilization
None16.759.559%
KeyMost suitable distribution key depending on the query that gives even distribution6 columns that appear in where clause, in the order of group by clause in SQL17.536.132%

The table without a distribution key loaded a little faster. However, differences in data read time and CPU utilization between the two queries is significant. This means that more and more parallel loads could run efficiently when the keys were set appropriately, because the overall CPU utilization reduced significantly. The key takeaway is to use a distribution style—auto, even, or manual—to optimize data read from Amazon Redshift and allow more parallel processing. ZS used a manual distribution style and chose distribution and sort keys based on an extensive understanding of data and refresh cadence.

Next steps to get the best output from your Amazon Redshift instance

As an outcome of the multiple POC results, we identified the most suitable file formats, compression techniques, re-partitioned Parquet file block sizes, and distribution and interleaved sorting logic that offer the best performance for our dataset for reporting on MicroStrategy with Amazon Redshift as the database. This helped us identify the best data load and read combinations to load 700 GB of Parquet data (equivalent to 2 TB of CSV data) within the client-set 2.5 hour SLA using the available fixed 2 node ds2.8xlarge cluster.

The following diagram shows the iterative process you can follow to identify the best data load and read techniques suited for an Amazon Redshift cluster configuration.

The following are a few key takeaways:

  • Parquet and Amazon Redshift worked well together. The data in Parquet format had low CPU utilization and I/O requirements, which allowed more parallel loads.
  • ZSTD encoding worked best for this specific dataset due to encoding on numbers as well.
  • Sort keys and distribution keys on tables can bring down read time by approximately 80% compared to tables with no distribution and sorting logic applied.
  • Filtering data on Amazon Redshift does not work the same as typical databases. You can improve data filtering performance with the appropriate sort keys.
  • File size at the source and concurrency are interrelated and you should choose them accordingly. Larger blocks (maximum 1 GB, for this specific dataset) are loaded faster onto Amazon Redshift.

 


About the Authors

Vasu Kiran Gorti is a result-oriented professional with technology and functional/domain experience predominantly in sales and marketing consulting. Essaying a role of Associate Consultant at ZS Associates, he has experience in working with life sciences and healthcare clients in alignment to their business objectives and expectations. He specializes in MicroStrategy, Business Intelligence, Analytics, and reporting. Proactive and innovative, Vasu enjoys taking up stimulating initiatives that bridge the gap between technology and business. He’s a permanent beta—always learning, improvising and evolving.

 

 

Ajit Pathak is a Technology Consultant at ZS Associates and leads BI and data management projects for Pharmaceutical companies. His areas of interest and expertise include MSTR, Redshift, and AWS suite. He loves designing complex applications and recommending best architectural practices that lead to efficient and concise dashboards. A qualified technology consultant, Ajit has focused on driving informed business decisions through clear data communication. When he is not focusing on areas of data visualizations and applications, Ajit loves to read, play badminton, and participate in debates ranging from politics to sports.

Amazon QuickSight: 2019 in review

Post Syndicated from Jose Kunnackal original https://aws.amazon.com/blogs/big-data/amazon-quicksight-2019-in-review/

2019 has been an exciting year for Amazon QuickSight. We onboarded thousands of customers, expanded our global presence to 10 AWS Regions, and launched over 60 features—more than a feature a week! We are inspired by you—our customers, and all that you do with Amazon QuickSight. We are thankful for the time you spend with us across in-person meetings, conference calls, emails, discussion forums, and AWS summits. As we close this year, here’s a quick summary of the highlights.

re:Invent 2019

The Amazon QuickSight team was at re:Invent alongside customers such as Best Western, Capital One, and Club OS, who spoke about their experiences implementing and using Amazon QuickSight for their analytics needs. We also ran two hands-on workshops using the newly launched APIs.

ANT324: Deploy business analytics at enterprise scale with Amazon QuickSight

This session discussed how enterprises are rolling out Amazon QuickSight Enterprise Edition for all their users and using features such as Active Directory or Federated SSO (SAML/OpenID Connect) authentication, private connectivity to data in AWS, email reports, and embedded dashboards.

Best Western rolled out Amazon QuickSight for business reporting across tens of thousands of users, using it in conjunction with Amazon Redshift and AWS Glue. While their previous legacy reporting system took 18 months and over 7,000 staff hours to upgrade server software versions, they now benefit from Amazon QuickSight’s modern reporting infrastructure and constant feature updates, and do not need to manage any servers to report on this data.

Capital One has made Amazon QuickSight available internally to their BI community of over 25,000 users. Key benefits that the Capital One team sees from QuickSight include the ability to roll out new BI use cases without server setup or capacity planning, integrated machine learning (ML) capabilities, and the absence of a traditional software update cycle. All this is with PrivateLink connectivity to data in AWS (Snowflake, Presto, Amazon Redshift, and Amazon RDS), and pay-per-session pricing for consumption with a max charge of $5 per month, per reader.

ANT217: Embedding analytics into applications with Amazon QuickSight

This session dove into the latest features that allow you to integrate Amazon QuickSight dashboards into your software development lifecycle, including new APIs, theming capabilities, and dashboard embedding. The full session recording is available on YouTube.

As an Independent Software Vendor (ISV), Club OS uses Amazon QuickSight’s embedded capabilities to add analytics to their cloud-based Customer Relationship Management for fitness and wellness businesses. Amazon QuickSight’s serverless architecture lets Club OS focus on meeting actual customer needs without being burdened by operations and infrastructure management. This architecture also provides fast, consistent performance for end-users; the analytics dashboards are rolled out to over 40,000 users across 3,500 health and fitness locations. For more information, see the section of the talk on YouTube by Nicholas Hahn, VP of Product Club OS.

As an enterprise with tens of thousands of internal users, Capital One uses Amazon QuickSight’s embedded capabilities to add analytics to internal applications and portals. With no servers to set up, teams can set up embedded dashboards in their portals in as quickly as a week, whether they want to serve this to just a few users or thousands. Traditional BI approaches to such portals would require server setup and maintenance, and expensive licensing contracts. For more information, see the section of the talk on YouTube by Latha Govada, Senior Manager Analytics at Capital One.

ANT302 – Enhancing your applications with Amazon QuickSight dashboards

This hands-on workshop session guided users through setting up an Amazon QuickSight Enterprise Edition account, connecting to data, creating dashboards, and using APIs to add users, move assets across development and staging versions, and embedding dashboards in a web portal. For more information, see Dashboard Embedding & Operationalizing with Quicksight APIs. Try it out for yourself!

Embedding, theming, and APIs

We followed up on our launch of dashboard embedding for federated AWS identities at re:Invent 2018 (for more information about how the NFL uses embedded Amazon QuickSight dashboards for hundreds of stakeholders of Next Gen Stats, see the session video on YouTube). In 2019, we launched the following features:

Machine learning integrations

We added the following native ML integrations in Amazon QuickSight to help you make the most of your data in AWS and QuickSight:

Visualizations and interactivity

We added the following support for new chart types, conditional formatting, QuickSight Actions, sheets in dashboards, and more:

Calculations and aggregations

We added the following new aggregation options, new functions, and more:

Data connectivity and security

We increased SPICE limits, provided finer control over data sources in QuickSight, enabled data source sharing, and more:

  • SPICE datasets now accommodate up to 100 million rows of data in Enterprise Edition and 25 million rows for Standard Edition. For more information, see Data Source Limits.
  • Fine-grained access control over S3 and Athena allows you to scope down access to these data sources to specific users or groups in Amazon QuickSight using IAM.
  • Cross data source joins allow business analysts to perform joins across supported data sources without relying on data engineering teams to set up complex ETL processes.
  • Shared data sources centralize credential management of data sources. This also allows shared ownership of data connections, allowing collaboration over SQL scripts that define custom SQL datasets.
  • Connecting to Presto data sources in AWS without any public routing of data similar to Amazon Redshift, Snowflake, RDS, and others.
  • Amazon QuickSight data sources connect to Amazon Athena, which you can tag against specific workgroups. This allows cost allocation of Athena queries by workgroup.

Mobile

We launched the new QuickSight Mobile app for iOS and Android, which allows you to access your dashboards and explore your data with drill-downs and filters. For more information, see New QuickSignt Mobile App on YouTube.

Geographic availability

We added the following new Regions and language support in Amazon QuickSight:

  • Amazon QuickSight is now available in 10 AWS regions: US East (N. Virginia and Ohio), US West (Oregon), EU (Frankfurt, Ireland, and London), and Asia Pacific (Seoul, Singapore, Sydney, and Tokyo).
  • The Amazon QuickSight interface is now available in 10 languages: English, German, Spanish, French, Portuguese, Italian, Japanese Korean, Simplified Chinese, and Traditional Chinese.

With all of our updates you can build dashboards like the ones below and integrate them into your applications and portals.

Looking ahead

To see the full list of 2019 launches, see What’s New in Amazon QuickSight or subscribe to the Amazon QuickSight YouTube channel for the latest training and feature walkthroughs. We have a packed roadmap for 2020, and continue to focus on enabling you with insights from all your data, sharing these with all your users, while not having to worry about operations and servers. Thank you for your support.

We wish you all the very best in the new year (and decade)!


About the authors

Jose Kunnackal John is a principal product manager for Amazon QuickSight.

 

 

 

 

Sahitya Pandiri is a senior technical program manager with Amazon Web Services. Sahitya has been in product/program management for 6 years now, and has built multiple products in the retail, healthcare, and analytics space.

 

 

 

Working with nested data types using Amazon Redshift Spectrum

Post Syndicated from Juan Yu original https://aws.amazon.com/blogs/big-data/working-with-nested-data-types-using-amazon-redshift-spectrum/

Redshift Spectrum is a feature of Amazon Redshift that allows you to query data stored on Amazon S3 directly and supports nested data types. This post discusses which use cases can benefit from nested data types, how to use Amazon Redshift Spectrum with nested data types to achieve excellent performance and storage efficiency, and some of the limitations of nested data types.

This post uses a data set generated with dummy data. You can view its table schema. If you’d like to try the dataset, deploy a Redshift cluster, execute the DDLs there, and use the example queries from this post or build your own.

Data modeling

In many scenarios, data is generated in a hierarchy. For example, assume a customer bought several items. For analytic purposes, there are various data modeling approaches to save storage or speed up data processing. One popular approach to achieve storage efficiency is the dimensional model.

The following table shows dummy customer data.

usernamenamesexaddressmailbirthdate
1erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/10
2shepherdlisaMark LeeM754 Michelle Gateway Port Johnstad, ME 35695[email protected]11/10/32
3palmerpaulJennifer MarshallF869 Harrell Forges Apt. 111 East Monica, MO 01243[email protected]3/11/07
4brettmcgeeTravis WilsonM535 Lisa Flat East Andrew, ID 43332[email protected]3/22/10
5torresdianaAshley HoffmanF7815 Lauren Ranch Ambertown, FL 93225[email protected]5/14/60

The following table contains dummy order data, which is linked to the customer table via a foreign key username.

usernametransaction_dateshipping_dateitemsprice
1erin1510/11/1910/13/19104794
2erin1510/11/1910/12/1971697
3erin1510/7/1910/9/19215
4erin1510/6/1910/10/1951744
5erin1510/5/1910/10/1976346

In the dimensional model, each customer’s information is stored only one time. There is no duplicated data, even though a customer could order multiple items at various times.

The dimensional model is optimal for storage. However, it can be challenging to process data efficiently. To get a full picture of your data, you need to join the two tables together to restore the hierarchy.

For example, to find out how many items customer Mark Lee bought and his total spending in the last three months, the query needs to join the customers and orders table. See the following code:

Select c.username, o.transaction_date, o.shipping_date, sum(items), sum(price) 
from customers c inner join orders o on (c.username = o.username) 
where c.name = ‘Mark Lee’ 
and transaction_date > DATEADD(month, -3, GETDATE())
group by 1,2,3;

When there are millions of customers who might buy multiple items in each transaction, the join can be very expensive. A fast-growing dataset can be so large that you need to store it in a distributed system. To perform the join, you need to shuffle data through the network, and the cost becomes even more significant.

As storage becomes cheaper and cheaper, people are starting to use a flattened model. In this model, data is pre-joined to gain processing efficiency. The following table shows that the customer and order information is stored in one record and ready to be analyzed.

usernamenamesexaddressmailbirthdatetransaction_dateshipping_dateitemsprice
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/109/14/1910/12/1921237
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/109/16/1910/9/1984824
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/109/17/1910/10/1994392
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/109/17/1910/9/1931079
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/109/25/1910/7/191208
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/1010/2/1910/5/19103689
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/1010/5/1910/10/1976346
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/1010/6/1910/10/1951744
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/1010/7/1910/9/19215
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/1010/11/1910/13/19104794
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/1010/11/1910/12/1971697
palmerpaulJennifer MarshallF869 Harrell Forges Apt. 111 East Monica, MO 01243[email protected]3/11/079/14/199/22/1964642
palmerpaulJennifer MarshallF869 Harrell Forges Apt. 111 East Monica, MO 01243[email protected]3/11/079/17/199/21/191527
palmerpaulJennifer MarshallF869 Harrell Forges Apt. 111 East Monica, MO 01243[email protected]3/11/0710/9/1910/12/195408
torresdianaAshley HoffmanF7815 Lauren Ranch Ambertown, FL 93225[email protected]5/14/609/17/199/28/1995452

This model also works well on a distributed system. Because each row contains complete information, you can process it on any node, and don’t need to shuffle data. You can also use the columnar format to store data, which allows the query engine to read only the needed columns instead of the whole row. This technique improves analytics performance and is storage efficient.

Both models have their pros and cons. The dimensional model trades compute power for storage efficiency, and the flattened model trades storage for processing efficiency.

Some new data types are available that achieve the best of both. Instead of putting child records into another table, you can nest them into the parent record and get the full information without performing a join. It effectively denormalizes the data without duplicating the parent record.

The following diagram illustrates this workflow.

You can apply this model to a schemaful hierarchy dataset. Continuing with the customer and order example, although a customer might buy multiple items, each order item contains the same type of information, such as product ID, price, and vendor.

The hierarchy is clear and consistent. You can map data to a nested structured schema, which you can store and access efficiently via SQL language.

The following table is a nested data presentation of the previous example.

usernamenamesexaddressmailbirthdatetransaction_dateshipping_dateitemsprice
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/109/14/1910/12/1921237
9/16/1910/9/1984824
9/17/1910/10/1994392
9/17/1910/9/1931079
9/25/1910/7/191208
10/2/1910/5/19103689
10/5/1910/10/1976346
10/6/1910/10/1951744
10/7/1910/9/19215
10/11/1910/13/19104794
10/11/1910/12/1971697
palmerpaulJennifer MarshallF869 Harrell Forges Apt. 111 East Monica, MO 01243[email protected]3/11/079/14/199/22/1964642
9/17/199/21/191527
10/9/1910/12/195408
torresdianaAshley HoffmanF7815 Lauren Ranch Ambertown, FL 93225[email protected]5/14/609/17/199/28/1995452

The following graph compares the storage usage for the three models (all in parquet format).

The graph shows that nested structure is as storage efficient as the dimensional model.

Using nested data types

Nested data types are structured data types for some common data patterns. Nested data types support structs, arrays, and maps.

A struct is similar to a relational table. It groups object properties together. For example, if a customer profile contains their name, address, email, and birthdate, it appears as the following schema:

customer struct<
              username:string,
              name:string,
              sex:string,
              address:string,
              mail:string,
              birthdate:string>

The data appears as the following code:

    "customer": {
        "username": "kevin35",
        "name": "Nancy Alvarez",
        "sex": "F",
        "address": "05472 Kathleen Turnpike\nNew Ashley, NV 84430",
        "mail": "[email protected]",
        "birthdate": "1961-02-05"
    }

An array stores one-to-many relationships. For example, a customer may have multiple shipping addresses or phone numbers. If a customer has several phone numbers, it appears as the following schema:

Phonenumbers array<string>

The data appears as the following code:

[‘555-5555’, ‘555-1234’]

A map is a collection of key-value pairs. You can consider it as a list of struct<key, value> elements. For example, if a customer has particular reward preferences, it appears as the following schema:

preference map<string,boolean> 

The data appears as the following code:

{one_day_delivery=true, 
 coupon=false, 
 free_shipping=true}

Nested data could have another nested data type as a member. The most common one is an array of structs. For example, an order containing multiple items could appear as the following schema:

orders array<
            struct<
              product_id:string,
              price:int,
              onsale:boolean,
              tax:int,
              weight:int,
              others:int,
              vendor:string>
            > 

You can create a complex object by combining them. For example, a customer’s online transaction appears as the following schema:

customer struct<
              username:string,
              name:string,
              sex:string,
              address:string,
              mail:string,
              birthdate:string> , 
  shipping_address array<
                      struct<
                        name:string,
                        street_address:string,
                        city:string,
                        postcode:string>
                      > , 
  creditcard string , 
  transaction_date string , 
  shipping_date string , 
  membership string , 
  preference map<string,boolean> , 
  orders array<
            struct<
              product_id:string,
              price:int,
              onsale:boolean,
              tax:int,
              weight:int,
              others:int,
              vendor:string>
            > , 
  platform string , 
  comments string 

Popular query engines such as Hive, Spark, Presto, and Redshift Spectrum support nested data types. The SQL syntax those engines support can be different. To make it straightforward and consistent, all query examples in this post use Amazon Redshift Spectrum. For more information, see Tutorial: Querying Nested Data with Amazon Redshift Spectrum.

Use cases for nested data types

Nested data types have many benefits: simplify your ETL, data modeling, and achieve the good performance. The following are some common use cases that can benefit from nested data types.

Parent-child relationship

Nested data types keep the parent-child (summary-details) relationship by storing them collocated. This often matches how you want to analyze the data. For example, to analyze customers’ purchasing habits, you may need to find the following:

  • Customers who purchase often but buy only a few items each time. They likely want an annual membership that covers the shipping cost.
  • Customers who purchase less frequently but buy many items in one transaction. They likely expect a free shipping benefit or discount.

You need support information from the orders data, such as how many items, on average, a customer buys per transaction.

To find a list of customers who order online at least once per week, with fewer than four items each time, use the following code:

with purchases as (
select co.customer.username as customer, co.transaction_date as transaction_date, co.customer.address as address,
  (select count(*) from co.orders) as total_items, 
  (select sum(case when onsale = true then 1 else 0 end) from co.orders) as items_onsale
from demo.customer_order_nested_parq co )
select customer, count(transaction_date) as tran_cnt, avg(total_items) 
from purchases 
where total_items <= 3 and items_onsale > 0 
      and transaction_date >= '2019-09-01' 
group by 1 having tran_cnt >= 4;

With the nested order details, per item information is already grouped by customer per transaction. Children aggregation is straightforward; you can aggregate order details to categorize a customer. If you use a denormalized table, you have to do GROUP BY two times. The query could also take longer. See the following code:

with purchases as (
select cc_username as customer, transaction_date, cc_address as address,
  count(*) as total_items, 
  sum(case when co_onsale = true then 1 else 0 end) as items_onsale
from demo.customer_order_flatten_parq 
group by 1,2,3)
select customer, count(transaction_date) as tran_cnt, avg(total_items) 
from purchases 
where total_items <= 3 and items_onsale > 0 
      and transaction_date > '2019-09-01'
group by 1 having tran_cnt >= 4;

To find customers who order only once per quarter with at least 10 items and high total spending, use the following code:

with purchases as (
select co.customer.username as customer, co.transaction_date as transaction_date, 
  (select count(*) from co.orders) as total_items, 
  (select sum(price) from co.orders) as total_spending
from demo.customer_order_nested_parq co )
select customer, count(transaction_date) as tran_cnt, avg(total_spending) from purchases 
where total_items >= 10 and total_spending > 5000 and transaction_date > '2019-07-01' transaction_date < '2019-09-30'
group by 1 having tran_cnt < 2
order by 3 desc;

Another benefit of using nested data types for parent-child data analysis is resource usage reduction. If there are one million customer transactions, there could be over five times the item orders. For example, to find each day how many goods ship to Michigan, use the following code:

select co.shipping_date, sum(coo.weight)
from demo.customer_order_nested_parq co, co.orders coo
where co.customer.address like '%MI 012__'
group by 1
order by 1;

Assuming that 3% of customers ship orders to Michigan, after filtering the customer data, there could be approximately 3% of matching transactions. You only need to process 150 thousand item orders instead of 5 million. This greatly reduces the data to process and the resources to use when compared to a flattened model.

For the parent-child use case, nested data types provide straightforward aggregation on children, more efficient filtering, group by, windowing, and storage saving.

Many-to-many relationship

Customers could buy many items from various vendors, and a vendor could sell a product to many customers. This is a many-to-many relationship.

In a dimensional model, you need three tables: a customers table, an orders table, and a transactions table. To find the top vendors who have the most customers, you need to join the three tables. See the following code:

select vendor, transaction_date, count(distinct cc.username)
from customers cc,
     transactions tt,
     orders oo
where cc.username = tt.username
and oo.transaction_id = tt.transaction_id
and tt.transaction_date >= '2019-01-01' 
group by 1,2
order by 3 desc;

With nested data types, the query is similar to the one using the dimensional model. However, because the orders data is collocated with customer transactions, you can join them on-the-fly without paying the cost. See the following code:

select coo.vendor, co.transaction_date, count(distinct co.customer.username)
from demo.customer_order_nested_parq co, 
co.orders coo
where co.transaction_date > '2019-01-01'
group by 1,2
order by 3 desc;

As another example, your vendor, Smith PLC, had a big sale event on October 10, 2019. You want to find out which customers bought your product during this sale and the top customers who spent the most. To do so, use the following code:

select co.customer.username, count(coo.product_id), sum(coo.price)
from demo.customer_order_nested_parq co, co.orders coo
where co.transaction_date = '2019-10-10'
and (select count(*) from co.orders 
     where vendor = 'Smith PLC' and onsale = true) > 0
group by 1
order by 3 desc;

Compared to the dimensional model query, the nested model is two-to-three times faster. This is on a relatively small dataset with only a few million rows. For a larger dataset, the performance improvement is even greater, and with less resource usage.

Sparse and frequently changed data

Assume that you want to reward customers who order from your online store. For each transaction, the customer can choose one or more rewards, such as free shipping, one-day delivery, a discount, or a coupon. Depending on how effective a reward is, you have to frequently modify the reward types, add new ones, or remove ones that aren’t popular.

If you store the data in a flattened model, there are two common options to track this data. The first method is creating a table with one column for each type of reward. You have to think of all possible rewards at the outset and create those columns. This could lead to a wide table and very sparse data. Alternatively, you can modify your table schema when you want to add or remove a reward type. That adds more maintenance work and you may lose history data. The following table demonstrates this method (all transaction_id data in below table examples are faked one).

transaction_idfree_shippingone_day_deliverydiscountcoupon
pklein35966659391853535FALSETRUETRUE
rebeccawiliams228880139768961FALSETRUE
brooke39180013629693040TRUEFALSETRUETRUE
jchapman4283556333561927FALSETRUEFALSEFALSE
mariamartin3515336516983566FALSEFALSETRUE

The second option is storing one reward per row. This avoids the wide table issue and the burden of constantly updating the schema. The approach is suitable if you only need to analyze a single reward. If you want to see whether there is any correlation between rewards, such as if more customers prefer free shipping and one-day delivery more than a discount and coupon, this option is more complicated. This model also needs more storage. The following table demonstrates this method.

transaction_idrewordtypevalue
pklein35966659391853535free_shippingFALSE
pklein35966659391853535one_day_deliveryTRUE
pklein35966659391853535couponTRUE
rebeccawiliams228880139768961one_day_deliveryFALSE
rebeccawiliams228880139768961couponTRUE
brooke39180013629693040free_shippingTRUE
brooke39180013629693040one_day_deliveryFALSE
brooke39180013629693040discountTRUE
brooke39180013629693040couponTRUE

A compromise is to use a JSON string to store selected rewards together in one column, which avoids schema change. See the following code:

preference varchar(65535)	

The following table shows how the data is stored in JSON string:

transaction_idpreference
pklein35966659391853535{“coupon”:true, “free_shipping”:false,”one_day_delivery”:true}
rebeccawiliams228880139768961{“coupon”:true, one_day_delivery”:false}
brooke39180013629693040{“coupon”:true, “discount”:true, “free_shipping”:true,”one_day_delivery”:false}
jchapman4283556333561927{“coupon”:false, “discount”:false, “free_shipping”:false, “one_day_delivery”:true}
mariamartin3515336516983566{“discount”:true, “free_shipping”:false,”one_day_delivery”:false}

You can analyze it by using a JSON function to extract the reward data. See the following code:

select correlation, count(username) from (
select username,
(case when 
    (json_extract_path_text(preference,'free_shipping')  = 'true' and  
     json_extract_path_text(preference,'one_day_delivery')  = 'true') 
     then 1
 when 
    (json_extract_path_text(preference,'discount') = 'true' and  
     json_extract_path_text(preference,'coupon')  = 'true') 
     then 2
else 0 
 end) as correlation
from demo.transactions
  )
group by 1;

This solution is acceptable, but you could be more storage efficient and more performant by using the nested data type map. See the following code:

preference map<string, boolean>

The following table shows how the data is stored in map:

transaction_idpreference
pklein35966659391853535{coupon=true, free_shipping=false,one_day_delivery=true}
rebeccawiliams228880139768961{coupon=true, one_day_delivery=false}
brooke39180013629693040{coupon=true, discount=true, free_shipping=true,one_day_delivery=false}
jchapman4283556333561927{coupon=false, discount=false, free_shipping=false, one_day_delivery=true}
mariamartin3515336516983566{discount=true, free_shipping=false,one_day_delivery=false}

 

You can analyze a single reward or multiple rewards using SQL. For example, to find how many customers prefer free shipping, use the following code:

select count(distinct co.customer.username)
from demo.customer_order_nested_parq co, co.preference cm
where cm.key = 'free_shipping' and cm.value = true;

To find how many customers prefer free shipping and one-day delivery more than a coupon or discount, use the following code:

with customer_rewards as (
select co.customer.username as customer, 
 (select count(*) from co.preference cm 
where cm.key = 'free_shipping' and cm.value = true) as shipping_pref,
 (select count(*) from co.preference cm 
where cm.key = 'one_day_delivery' and cm.value = true) as delivery_pref,
 (select count(*) from co.preference cm 
where cm.key = 'coupon' and cm.value = true) as coupon_pref,
 (select count(*) from co.preference cm 
where cm.key = 'discount' and cm.value = true) as discount_pref
from demo.customer_order_nested_parq co;
select case when shipping_pref > 0 and delivery_pref > 0 then 1
            when coupon_pref > 0 and discount_pref > 20 then 2
            else 0
       end as correlation, count(customer)
from customer_rewards
group by 1;

The map type allows you to add any key-value pair. You can add a new reward type at any time without a schema change, and you can analyze the new reward right away.

The main advantage of the map type is that it supports flexible schema and eliminates the need to update the schema frequently. However, there is not much performance benefit. If performance is your top priority, a flattened table is recommended. You can also flatten the most-often accessed columns, and use map for the less frequently accessed columns.

Limitations of nested data types

Although nested data types are useful in many use cases, they have the following limitations:

  • There is a hard limit on children size.
  • You can only append, and updating data is difficult and slow. You need to rewrite the entire nested object even if you want to modify one child attribute.
  • Processing is split at the parent record level. You may run into problems if the children data is heavily skewed.
  • The query engine may not support all types of analytics on nested data.
  • Amazon Redshift Spectrum Nested Data Limitations.

Summary

This post discussed the benefits of nested data types and use cases in which nested data types can help improve storage efficiency, performance, or simplify analysis. There are many more use cases in which nested data types can be an ideal solution. Try it out and share your experiences!

 


About the Author

 Juan Yu is a Data Warehouse Specialist Solutions Architect at AWS.

 

 

 

 

Collect and distribute high-resolution crypto market data with ECS, S3, Athena, Lambda, and AWS Data Exchange

Post Syndicated from Jared Katz original https://aws.amazon.com/blogs/big-data/collect-and-distribute-high-resolution-crypto-market-data-with-ecs-s3-athena-lambda-and-aws-data-exchange/

This is a guest post by Floating Point Group. In their own words, “Floating Point Group is on a mission to bring institutional-grade trading services to the world of cryptocurrency.”

The need and demand for financial infrastructure designed specifically for trading digital assets may not be obvious. There’s a rather pervasive narrative that these coins and tokens are effectively natively digital counterparts to traditional assets such as currencies, commodities, equities, and fixed income. This narrative often manifests in the form of pithy one-liners recycled by pundits attempting to communicate the value proposition of various projects in the space (such as, “Bitcoin is just a currency with an algorithmically controlled, tamper-proof monetary policy,” or, “Ether is just a commodity like gasoline that you can use to pay for computational work on a global computer.”). Unsurprisingly, we at FPG often hear the question, “What’s so special about cryptocurrencies that they warrant dedicated financial services? Why do we need solutions for problems that have already been solved?”

The truth is that these assets and the widespread public interest surrounding them are entirely unprecedented. The decentralized ledger technology that serves as an immutable record of network transactions, the clever use of proof-of-work algorithms to economically incentivize rational actors to help uphold the security of the network (the proof-of-work concept dates back at least as far as 1993, but it was not until bitcoin that the technology showed potential for widespread adoption), the irreversible nature of transactions that poses unique legal challenges in cases such as human error or extortion, the precariousness of self-custody (third-party custody solutions don’t exactly have track records that inspire trust), the regulatory uncertainties that come with the difficulty of both classifying these assets as well as arbitrating their exchange which must ultimately be reconciled by entities like the IRS, SEC, and CFTC—it is all very new, and very weird. With 24-hour market volume regularly exceeding $100 billion, we decided to direct our focus towards problems related specifically to trading these assets. Granted, crypto trading has undoubtedly matured since the days of bartering for bitcoin in web forums and witnessing 10% price spreads between international exchanges. But there is still a long path ahead.

One major pain point we are aiming to address for institutional traders involves liquidity (or, more precisely, the lack thereof). Simply put, the buying and selling of cryptocurrencies occurs across many different trading venues (exchanges), and liquidity (the offers to buy or sell a certain quantity of an asset at a certain price) continues to become more fragmented as new exchanges emerge. So say you’re trying to buy 100 bitcoins. You must buy from people who are willing to sell. As you take the best (cheapest) offers, you’re left with increasingly expensive offers. By the time you fill your order (in this example, buy all 100 bitcoins), you may have paid a much higher average price than, say, the price you paid for the first bitcoin of your order. This phenomenon is referred to as slippage. One easy way to minimize slippage is by expanding your search for offers. So rather than looking at the offers on just one exchange, look at the offers across hundreds of exchanges. This process, traditionally referred to as smart order routing (SOR), is one of the core services we provide. Our SOR service allows traders to easily submit orders that our system can match against the best offers available across multiple trading venues by actively monitoring liquidity across dozens of exchanges.

Fanning out large orders in search of the best prices is a rather intuitive and widely applicable concept—roughly 75% of equities are purchased and sold via SOR. But the value of such a service for crypto markets is particularly salient: a perpetual cycle of new exchanges surging in popularity while incumbents falter has resulted in a seemingly incessant fragmentation of liquidity across trading venues—yet traders tend to assume an exchange-agnostic mindset, concerned exclusively with finding the best price for a given quantity of an asset.

Access to both real-time and historical market data is essential to the functionality of our SOR service. The highest resolution data we could hope to obtain for a given market would include every trade and every change applied to the order book, effectively allowing us to recreate the state of a market at any given point in time. The updates provided through the WebSocket streams are not sufficient for reconstructing order books. We also need to periodically fetch snapshots of the order books and store those, which we can do using an exchange’s REST API. We can fetch a snapshot and apply the corresponding updates from the streams to “replay” the order book.

Fortunately, this data is freely available, because many exchanges offer real-time feeds of market data via WebSocket APIs. We found several third-party vendors selling subscriptions to these data sets, typically in the form of CSV dumps delivered at a weekly or monthly cadence. This presented the question of build vs. buy. Given that we felt capable of building a robust and reliable system for ingesting real-time market data in a relatively short amount of time and at a fraction of the cost of purchasing the data from a vendor, we were already leaning in favor of building. Further investigation made buying look like an increasingly unattractive option. Disclaimers that multiple vendors issued about their inability to guarantee data quality and consistency did not inspire confidence. Inspecting sample data sets revealed that some essential fields provided in the original data streams were missing—fields necessary for achieving our goal of recreating the state of a market at an arbitrary point in time. We also recognized that a weekly or monthly delivery schedule would restrict our ability to explore relatively recent market data.

This post provides a high-level overview of how we ingest and store real-time market data and how we use the AWS Data Exchange API to organize and publish our data sets programmatically. Our system’s functionality extends well beyond data ingestion, normalization, and persistence; we run dedicated services for data validation, caching the most recent trade and order book for every market, computing and storing derivative metrics, and other services that help safeguard data accuracy and minimize the latency of our trading systems.

Data ingestion

The WebSocket streams we connect to for data consumption are often the same APIs responsible for providing real-time updates to an exchange’s trading dashboard.

WebSocket connections transmit data as discrete messages. We can inspect the content of individual messages as they stream into the browser. For example, the following screenshot shows a batch of order book updates.

The updates are expressed as arrays of bids and asks that were either added to the book or removed from it. Client-side code processes each update, resulting in a real-time rendering of the market’s order book. In practice, our data ingestion service (Ingester) does not read a single stream, but rather thousands of different streams, covering various data feeds for all markets across multiple exchanges. All the connections required for such broad coverage and the resulting flood of incoming data raise some obvious concerns about data loss. We’ve taken several measures to mitigate such concerns, including a redundant system design that allows us to spin up an arbitrary number of instances of the Ingester service. Like most of our microservices, Ingester is a Dockerized service run on Amazon ECS and deployed via Terraform.

All these instances consume the same data feeds as each other while a downstream mechanism handles deduplication (this is covered in more detail later in this post). We also set up Amazon CloudWatch alerts to notify us when we detect non-contiguous messages, indicating a gap in the incoming data. The alerts don’t directly mitigate data loss, but they do serve the important function of prompting an investigation.

Ingester builds up separate buffers of incoming messages, split out by data-type/exchange/market. Then, after a fixed time interval, each buffer is flushed into Amazon S3 as a gzipped JSON file. The buffer-flush cycle repeats.

The following screenshot shows a portion of the file content.

This code snippet is a single, pretty-printed JSON record from the file in the screenshot above.

{
   "event_type":"trade",
   "timestamp":1571980320422,
   "ticker_pair":"BTCUSDT",
   "trade_id":194230159,
   "price":"7405.69000000",
   "quantity":"3.20285300",
   "buyer_order_id":730178987,
   "seller_order_id":730178953,
   "trade_timestamp":1571980320417,
   "buyer_market_maker":false,
   "M":true
}

Ingester handles additional functionality, such as applying pre-defined mappings of venue-specific field names to our internal field names. Data normalization is one of many processes necessary to enable our systems to build a holistic understanding of market dynamics.

As with most distributed system designs, our services are written with horizontal scalability as a first-order priority. We took the same approach in designing our data ingestion service, but it has some features that make it a bit different than the archetypical horizontally scalable microservice. The most common motivations for adjusting the number of instances of a given service are load-balancing and throttling throughput. Either your system is experiencing backpressure and a consumer service scales to alleviate that pressure, or the consumer is over-provisioned and you scale down the number of instances for the sake of parsimony. For our data ingestion service, however, our motivation for running multiple instances is to minimize data loss via redundancy. The CPU usage for each instance is independent of instance count, because each instance does identical work.

For example, rather than helping alleviate backpressure by pulling messages from a single queue, each instance of our data ingestion service connects to the same WebSocket streams and performs the same amount of work. Another somewhat unusual and confounding aspect of horizontally scaling our data ingestion service is related to state: we batch records in memory and flush the records to S3 every minute (based on the incoming message’s timestamp, not the system timestamp, because those would be inconsistent). Redundancy is our primary measure for minimizing data loss, but we also need each instance to write the files to S3 in such a way that we don’t end up with duplicate records. Our first thought was that we’d need a mechanism for coordinating activity across the instances, such as maintaining a cache that would allow us to check if a record had already been persisted. But we realized that we could perform this deduplication without any coordination between instances at all. Most of the message streams we consume publish messages with sequence IDs. We can combine the sequence IDs with the incoming message timestamp to achieve our deduplication mechanism: we can deterministically generate the same exact file names containing the exact same data by writing our service code to check that the message added to the batch has the appropriate sequence ID relative to the previous message in the batch and using the timestamp on the incoming message to determine the exact start and end of each batch (we typically get a UNIX timestamp and check when we’ve rolled over to the next clock minute). This allows us to simply rely on a key collision in S3 for deduplication.

AWS suggests a similar solution for a slightly different problem, relating to Amazon Kinesis Data Streams. For more information, see Handling Duplicate Records.

With this scheme, even if records are processed more than one time, the resulting Amazon S3 file has the same name and has the same data. The retries only result in writing the same data to the same file more than one time.

After we store the data, we can perform simple analytics queries on the billions of records we’ve stored in S3 using Amazon Athena, a query service that requires minimal configuration and zero infrastructure overhead. Athena has a concept of partitions (inherited from one of its underlying services, Apache Hive). Partitions are mappings between virtual columns (in our case: pair, year, month, and day) and the S3 directories in which the corresponding data is stored.

S3’s file system is not actually hierarchical. Files are prepended with long key prefixes that are rendered as directories in the AWS console when browsing a bucket’s contents. This has some non-trivial performance consequences when querying or filtering on large data sets.

The following screenshot illustrates a typical directory path.

By pointing Athena directly to a particular subset of data, a well-defined partitioning scheme can drastically reduce query run times and costs. Though the ability the perform ad hoc business analytics queries is primarily a convenience, taking time to choose a sane multi-level partitioning scheme for Athena based on some of our most common access patterns seemed worthwhile. A poorly designed partition structure can result in Athena unnecessarily scanning huge swaths of data and ultimately render the service unusable.

Data publication

Our pipeline for transforming thousands of small gzipped JSON files into clean CSVs and loading them into AWS Data Exchange involves three distinct jobs, each expressed as an AWS Lambda function.

Job 1

Job 1 is initiated shortly after midnight UTC by a cron-scheduled CloudWatch event. As mentioned previously, our data ingestion service’s batching mechanism flushes each batch to S3 at a regular time interval. A timestamp on the incoming message (applied server-side) determines the rollover from one interval to the next, as opposed to the ingestion service’s system timestamp, so in the rare case that a non-trivial amount of time elapses between the consumption of the final message of batch n and the first message of batch n+1, we kick off the first Lambda function 20 minutes after midnight UTC to minimize the likelihood of omitting data pending write.

Job 1 formats values for the date and data source into an Athena query template and outputs the query results as a CSV to a specified prefix path in S3. (Every Athena query produces a .metadata file and a CSV file of the query results, though DDL statements do not output a CSV.) This PUT request to S3 triggers an S3 event notification.

We run a full replica data ingestion system as an additional layer of redundancy. Using the coalesce conditional expression, the Athena query in Job 1 merges data from our primary system with the corresponding data from our replica system, and fills in any gaps while deduplicating redundant records.

We experimented fairly extensively with AWS Glue and PySpark for the ETL-related work performed in Job 1. When we realized that we could merge all the small source files into one, join the primary and replica data sets, and sort the results with a single Athena query, we decided to stick with this seemingly simpler and more elegant approach.

The following code shows one of our Athena query templates.

Job 2

Job 2 is triggered by the S3 event notification from Job 1. Job 2 simply copies the query results CSV file to a different key within the same S3 bucket.

The motivation for this step is twofold. First, we cannot dictate the name of an Athena query results CSV file; it is automatically set to the Athena query ID. Second, when adding an S3 object as an asset to an AWS Data Exchange revision, the asset’s name is automatically set to the S3 object’s key. So to dictate how the CSV file name appears in AWS Data Exchange, we must first rename it, which we accomplish by copying it to a specified S3 key.

Job 3

Job 3 handles all work related to AWS Data Exchange and AWS Marketplace Catalog via their respective APIs. We use boto3, AWS’s Python SDK, to interface with these APIs. The AWS Marketplace Catalog API is necessary for adding data set revisions to products that have already been published. For more information, see Tutorial: Adding New Data Set Revisions to a Published Data Product.

Our code explicitly defines mappings with the following structure:

data source / DataSet / Product

The following code shows how we configure relationships between data sources, data sets, and products.

Our data sources are typically represented by a trading venue and data type combination (such as Binance trades or CoinbasePro order books). Each new file for a given data source is delivered as a single asset within a single new revision for a particular data set.

An S3 trigger kicks off the Lambda function. The trigger is scoped to a specified prefix that maps to a single data set. The function alias feature of AWS Lambda allows us to define the unique S3 triggers for each data set while reusing the same underlying Lambda function. Job 3 carries out the following steps (note that steps 1 through 5 refer to the AWS Data Exchange API while steps 6 and 7 refer to the AWS Marketplace Catalog API):

  1. Submits a request to create a new revision for the corresponding data set via CreateRevision.
  2. Adds the file that was responsible for triggering the Lambda function to the newly created revision via CreateJob using the IMPORT_ASSETS_FROM_S3 job type. To submit this job, we need to supply a few values: the S3 bucket and key values for the file are pulled from the Lambda event message, while the RevisionID argument comes from the response to the CreateRevision call in the previous step.
  3. Kicks off the job with StartJob, sourcing the JobID argument from the response to the CreateJob call in the previous step.
  4. Polls the job’s status via GetJob (using the job ID from the response to the StartJob call in the previous step) to check that our file (the asset) was successfully added to the revision.
  5. Finalizes the revision via UpdateRevision.
  6. Requests a description of the marketplace entity using DescribeEntity, passing in the product ID stored in our hardcoded mappings as the EntityID
  7. Kicks off the entity ChangeSet via StartChangeSet, passing in the entity ID from the previous step, the entity ID from the DescribeEntity response in the previous step as EntityID, the revision ARN parsed from the response to our earlier call to CreateRevision as RevisionArn, and the data set ARN as DataSetArn, which we fetch at the start of the code’s runtime using AWS Data Exchange API’s GetDataSet.

Here’s a thin wrapper class we wrote to carry out the steps detailed above:

from time import sleep
import logging
import json

import boto3

from config import (
    DATA_EXCHANGE_REGION,
    MARKETPLACE_CATALOG_REGION,
    LambdaS3TriggerMappings
)

logger = logging.getLogger()


class CustomDataExchangeClient:
    def __init__(self):
        self._de_client = boto3.client('dataexchange', region_name=DATA_EXCHANGE_REGION)
        self._mc_client = boto3.client('marketplace-catalog', region_name=MARKETPLACE_CATALOG_REGION)
    
    def _get_s3_data_source(self, bucket, prefix):
        return LambdaS3TriggerMappings[(bucket, prefix)]

    # Job State can be one of: WAITING | IN_PROGRESS | ERROR | COMPLETED | CANCELLED | TIMED_OUT
    def _wait_for_de_job_completion(self, job_id):
        while True:
            get_job_resp = self._de_client.get_job(JobId=job_id)
            if get_job_resp['State'] == 'COMPLETED':
                logger.info(f"Job '{job_id}' succeeded:\n\t{get_job_resp}")
                break
            elif get_job_resp['State'] in ('ERROR', 'CANCELLED'):
                raise Exception(f"Job '{job_id}' failed:\n\t{get_job_resp}")
            else:
                sleep(5)
                logger.info(f"Still waiting on job {job_id}...")
        return get_job_resp

    # ChangeSet Status can be one of: PREPARING | APPLYING | SUCCEEDED | CANCELLED | FAILED
    def _wait_for_mc_change_set_completion(self, change_set_id):
        while True:
            describe_change_set_resp = self._mc_client.describe_change_set(
                Catalog='AWSMarketplace',
                ChangeSetId=change_set_id
                )
            if describe_change_set_resp['Status'] == 'SUCCEEDED':
                logger.info(
                    f"ChangeSet '{change_set_id}' succeeded:\n\t{describe_change_set_resp}"
                )
                break
            elif describe_change_set_resp['Status'] in ('FAILED', 'CANCELLED'):
                raise Exception(
                    f"ChangeSet '{change_set_id}' failed:\n\t{describe_change_set_resp}"
                )
            else:
                sleep(1)
                logger.info(f"Still waiting on ChangeSet {change_set_id}...")
        return describe_change_set_resp

    def process_s3_event(self, s3_event):
        source_bucket = s3_event['Records'][0]['s3']['bucket']['name']
        source_key = s3_event['Records'][0]['s3']['object']['key']
        source_prefix = '/'.join(source_key.split('/')[0:-1])
        s3_data_source = self._get_s3_data_source(source_bucket, source_prefix)
        obj_name = source_key.split('/')[-1]
        
        s3_data_source.validate_object_name(obj_name)
        
        for data_set in s3_data_source.lambda_s3_trigger_target_data_sets:
            # Create revision
            create_revision_resp = self._de_client.create_revision(
                DataSetId=data_set.id,
                Comment=obj_name
            )
            logger.debug(create_revision_resp)
            revision_id = create_revision_resp['Id']
            revision_arn = create_revision_resp['Arn']

            # Create job
            create_job_resp = self._de_client.create_job(
                Type='IMPORT_ASSETS_FROM_S3',
                Details={
                    'ImportAssetsFromS3': {
                      'AssetSources': [
                          {
                              'Bucket': source_bucket,
                              'Key': source_key
                          },
                      ],
                      'DataSetId': data_set.id,
                      'RevisionId': revision_id
                    }
                }
            )
            logger.debug(create_job_resp)

            # Start job
            job_id = create_job_resp['Id']
            start_job_resp = self._de_client.start_job(JobId=job_id)
            logger.debug(start_job_resp)

            # Wait for Data Exchange job completion
            get_job_resp = self._wait_for_de_job_completion(job_id)
            logger.debug(get_job_resp)

            # Finalize revision
            update_revision_resp = self._de_client.update_revision(
                DataSetId=data_set.id,
                RevisionId=revision_id,
                Finalized=True
            )
            logger.debug(update_revision_resp)

            # Ensure revision finalization succeeded
            finalized_status = update_revision_resp['Finalized']
            if finalized_status is not True:
                raise Exception(f"Failed to finalize revision:\n{update_revision_resp}")

            # Publish the new revision to each product associated with the data set
            for product in data_set.products:
                # Describe the AWS Marketplace entity corresponding to the Data Exchange product
                describe_entity_resp = self._mc_client.describe_entity(
                    Catalog='AWSMarketplace',
                    EntityId=product.id
                )
                logger.debug(describe_entity_resp)

                entity_type = describe_entity_resp['EntityType']
                entity_id = describe_entity_resp['EntityIdentifier']

                # Isolate the target data set in the DescribeEntity response
                describe_entity_resp_data_sets = json.loads(describe_entity_resp['Details'])['DataSets']
                describe_entity_resp_data_set = list(
                    filter(lambda ds: ds['DataSetArn'] == data_set.arn, describe_entity_resp_data_sets)
                )
                # We should get the data set of interest in describe_entity_resp and only that data set
                assert len(describe_entity_resp_data_set) == 1

                # Start a ChangeSet to add the newly finalized revision to an existing product
                start_change_set_resp = self._mc_client.start_change_set(
                    Catalog='AWSMarketplace',
                    ChangeSet=[
                        {
                            "ChangeType": "AddRevisions",
                            "Entity": {
                                "Identifier": entity_id,
                                "Type": entity_type
                            },
                            "Details": json.dumps({
                                "DataSetArn": data_set.arn,
                                "RevisionArns": [revision_arn]
                            })
                        }
                    ]
                )
                logger.debug(start_change_set_resp)

                # Wait for the ChangeSet workflow to complete
                change_set_id = start_change_set_resp['ChangeSetId']
                describe_change_set_resp = self._wait_for_mc_change_set_completion(change_set_id)
                logger.debug(describe_change_set_resp)

The following screenshot shows the S3 trigger for Job 3.

The following screenshot shows an example of CloudWatch logs for Job 3.

The following screenshot shows a CloudWatch alarm for Job 3.

Finally, we can verify that our revisions were successfully added to their corresponding data sets and products through the AWS console.

AWS Data Exchange allows you to create private offers for your AWS account IDs, providing a convenient means of checking that revisions show up in each product as expected.

Conclusion

This post demonstrated how you can integrate AWS Data Exchange into an existing data pipeline frictionlessly. We’re pleased to have been invited to participate in the AWS Data Exchange private preview, and even more pleased with the service itself, which has proven to be a sophisticated yet natural extension of our system.

I want to offer special thanks to both Kyle Patsen and Rafic Melhem of the AWS Data Exchange team for generously fielding my questions (and patiently enduring my ramblings) for the better part of the past year. I also want to thank Lucas Adams for helping me design the system discussed in this post and, more importantly, for his unwavering vote of confidence.

If you are interested in learning more about FPG, don’t hesitate to contact us.

 

Under the hood: Scaling your Kinesis data streams

Post Syndicated from Ahmed Gaafar original https://aws.amazon.com/blogs/big-data/under-the-hood-scaling-your-kinesis-data-streams/

Real-time delivery of data and insights enables businesses to pivot quickly in response to changes in demand, user engagement, and infrastructure events, among many others. Amazon Kinesis offers a managed service that lets you focus on building your applications, rather than managing infrastructure. Scalability is provided out-of-the-box, allowing you to ingest and process gigabytes of streaming data per second. Data replication to three Availability Zones offers high availability and durability. Pricing is based on usage and requires no upfront costs, making Kinesis a cost-effective solution.

Amazon Kinesis Data Streams use a provisioned capacity model. Each data stream is composed of one or more shards that act as units of capacity. Shards make it easy for you to design and scale a streaming pipeline by providing a predefined write and read capacity. As workloads grow, an application may read or write to a shard at a rate that exceeds its capacity, creating a hot shard and requiring you to add capacity quickly. Shards also enable you to parallelize the processing of large datasets and compute results quickly.

This post discusses how to scale your data streams and avoid hot shards. The post first shows you how to estimate the number of shards you need in your data stream as you design your streaming pipeline. Then it looks at reasons that lead to hot shards and how to avoid those using Kinesis Data Streams scaling mechanisms and reviews important metrics for monitoring.

Estimating your stream capacity

The following diagram shows a streaming data pipeline connected to a multiplayer video game. Kinesis Data Streams ingest player scores and other stats. You can filter and enrich the data, and write it to DynamoDB tables that populate the game’s various leaderboards.

As you embark on designing your streaming pipeline, it’s important to set up the data stream with enough capacity to handle producers ingesting the data records producers create, and handle users consuming the same records. You can ingest up to 1 MB per second per shard or 1,000 data records per second per shard for writes. Read capacity is up to 2 MB per second per shard or five read transactions per second. All applications reading from the stream share the read capacity. You can use the enhanced fan-out feature to scale the number of consuming applications and make sure that each has a dedicated 2 MB per second connection.

This post uses the preceding application as an example. It’s estimated that producers create data records at a rate of 20,000 KB per second, and your consumer nodes need to process this same amount of data at the other end of the stream. In addition to handling these rates, it is a good idea to add extra capacity to give the stream headroom for growth.

This headroom also helps your application recover faster in scenarios that could cause a delay or a pause in ingesting or processing data. These scenarios may include:

  • Deploying a new version of a consumer application
  • Transient network issues

As these nodes work to catch up after recovery, they produce or consume records at a higher than standard rate, requiring higher capacity. For this example, you can add 25% percent, or five shards, for headroom. Shards are cost-efficient, but it is up to you how many you want to add.

Scaling scenario

At the time of the game’s release, this capacity is deemed sufficient for the application. Ingestion and processing of the data are both running smoothly, and the game’s leaderboards are populated with current data. It’s now a few weeks after release; the game is steadily gaining popularity and concurrent player numbers are increasing. In a scenario such as this, it’s important to have sufficient monitoring to detect the increased load so you can increase throughput by scaling the stream.

The following diagram provides a simplified view of using CloudWatch metrics to monitor your data streams and triggering scaling operations

In this example, these scaling issues could manifest in delayed leaderboard update reports. Because shards are the capacity units in a data stream, each shard’s capacity is independent of other shards. If the producers write to a single shard at a rate higher than 1 MB per second or 1,000 records per second, that shard becomes a hot shard and requests exceeding that capacity get throttled, leading to a delay in leaderboard updates. This condition can happen while other shards in the stream are underutilized, so if you are monitoring metrics at the stream level, you may not see any cause for concern, because the stream overall is ingesting data at a rate below its total capacity of 25 MB per second. Amazon Kinesis enables you to seamlessly scale your stream without interrupting your streaming pipeline.

Core concepts that enable scaling

You can write records to a data stream using Put APIs. To write a single record, use PutRecord; to write multiple records, use PutRecords. When executing either one, the request to the Kinesis API has to include the following three components:

  • The stream name.
  • The data record to write to the stream. For this post, this is the scoring result of a particular round in the game.
  • A partition key (for example, the fame session).

The following diagram shows multiple producers writing to a Kinesis data stream. The partition key value is used in combination with a hash function to determine the shard a given record will be written to.

The partition key determines to which shard the record is written. The partition key is a Unicode string with a maximum length of 256 bytes. Kinesis runs the partition key value that you provide in the request through an MD5 hash function. The resulting value maps your record to a specific shard within the stream, and Kinesis writes the record to that shard. Partition keys dictate how to distribute data across the stream and use shards.

Certain use cases require you to partition data based on specific criteria for efficient processing by the consuming applications. As an example, if you use player ID pk1234 as the hash key, all scores related to that player route to shard1. The consuming application can use the fact that data stored in shard1 has an affinity with the player ID and can efficiently calculate the leaderboard. An increase in traffic related to players mapped to shard1 can lead to a hot shard. Kinesis Data Streams allows you to handle such scenarios by splitting or merging shards without disrupting your streaming pipeline.

If your use cases do not require data stored in a shard to have high affinity, you can achieve high overall throughput by using a random partition key to distribute data. Random partition keys help distribute the incoming data records evenly across all the shards in the stream and reduce the likelihood of one or more shards getting hit with a disproportionate number of records. You can use a universally unique identifier (UUID) as a partition key to achieve this uniform distribution of records across shards. This strategy can increase the latency of record processing if the consumer application has to aggregate data from multiple shards.

Kinesis Data Streams scaling mechanisms

The stream remains fully functional during these actions. Producers and consumers can continue to read and write to the stream during the scaling process.

Upon receiving the scaling request, Kinesis sets the stream status to Updating. You can use the DescribeStreams API to check the stream status. When the operation is complete, the stream status shows as Active.

The SplitShard  action splits one active shard into two shards, increasing the read and write capacity of the stream. This can be helpful if there is an expected increase in the number of records ingested into the stream or so more Kinesis Data Streams applications can simultaneously read data from the stream for real-time processing.

SplitShard facilitates that process. The hash key space of the parent shard also splits. The two new shards accept new data records and the parent shard stops accepting new records. Existing records in the parent shard are retained for the duration of the stream retention period (the default is 24 hours, configurable up to 7 days). You must specify the new-starting-hash-key value when issuing this command. This value determines the point of the split within the parent shard’s hash key space. In most cases, you want to do an even split. However, you might need to do an uneven split if you have unbalanced shards that you want to rebalance, for example. The following diagram shows the SplitShard process in action.

Many streaming workloads have variable data flow rates that fluctuate over time, sometimes following daily, weekly, or seasonal patterns. As you monitor your data flow rates, you may see underutilized shards that, if merged, still have a data flow rate below the shard limits, yet reduce the cost of the stream.

The MergeShards action merges two adjacent shards, producing one shard. Two shards are considered adjacent if the hash key spaces for the two form a contiguous set with no gaps. When the two shards merge, their hash key spaces also merge. The new shard starts accepting new data records. The two parent shards stop accepting new records and retain existing records up to the stream’s configured retention period. The following diagram shows the MergeShards process in action.

The UpdateShardCount action is useful when you need to scale your stream, up or down, to a specific number of shards, and you provide that number as a parameter in the API call. Scaling in increments of 25% of the current capacity (25%, 50%, 75%, 100%) helps the operation complete faster, but is not required. The command executes a series of SplitShard and MergeShards actions as needed to reach the explicit number of shards you specified. This command splits shard hash key space evenly, creating shards of equal size. There is no option to choose a different value.

Additionally, the Kinesis Scaling Utility available on GitHub provides autoscaling for Kinesis Data Streams by monitoring a stream’s Amazon CloudWatch metrics and scaling it up or down accordingly. It can scale a stream by an explicit shard count or as a percentage of the total fleet. There is no requirement for you to manage the allocation of the key space to shards when using this API; it happens automatically.

Balancing shards

After completing a scaling operation, check the distribution of the hash key space in your stream. In most use cases, the hash key space should be evenly distributed across the shards in the stream. Errors in calculating or inputting a shard’s hash key space starting value can lead to creating new shards with an unusually large or small hash key space. Unusually large shards receive a high number of read and write requests, leading to throttling and underutilizing the unusually small shards.

The output of the ListShards API lists the starting and ending hash key value for each shard in the stream. You can use these values to identify unbalanced shards, and perform the necessary splits or merges to balance them. The Kinesis Scaling Utility can also generate a report of shard key space sizes that can help you achieve the same result. See the following code:

{
    "Shards": [
        {
            "ShardId": "shardId-000000000000", 
            "HashKeyRange": {
                "EndingHashKey": "170141183460469231731687303715884105727", 
                "StartingHashKey": "0"
            }, 
            "SequenceNumberRange": {
                "StartingSequenceNumber": "49600965817078608863948980125442188478720910276534730754"
            }
        }, 
        {
            "ShardId": "shardId-000000000001", 
            "HashKeyRange": {
                "EndingHashKey": "340282366920938463463374607431768211455", 
                "StartingHashKey": "170141183460469231731687303715884105728"
            }, 
            "SequenceNumberRange": {
                "StartingSequenceNumber": "49600965817100909609147510748583724196993558638040711186"
            }
        }
    ]
}

Monitoring your streams to preempt hot shards

As the hot shard scenario has demonstrated, monitoring your data streams at only the stream level does not prepare you for issues at the shard level. Kinesis offers a multitude of stream level and shard level metrics. At the shard level, IncomingBytes and IncomingRecords show you the ingestion rate into the shard. WriteProvisionedThroughputExceeded and ReadProvisionedThroughputExceeded indicate throttled Put and Get requests, respectively. At the stream level, keep an eye on PutRecord.Success, in which the average value reflects the percentage of PutRecord success over time. Paired with proper thresholds for alerting, they should help you take scaling actions proactively in response to flow changes in and out of your streams, and reduce the possibility of developing hot shards.

The following image shows a snapshot of a CloudWatch dashboard with several metrics of a Kinesis data stream.

Conclusion

This post discussed how to simplify the scaling and monitoring of your Kinesis Streams. It’s important to spend some time considering the expected data flow rate for your stream to find the appropriate capacity. Choosing a good partition key strategy helps you take full advantage of the capacity you provision and avoid hot shards. Monitoring your stream metrics and setting alarm thresholds helps you gain the visibility you need to make better scaling decisions.

For more information, see What Is Amazon Kinesis Data Streams? For more information about Kinesis API actions, see Actions.

 


About the Author

 Ahmed Gaafar is a senior technical account manager at AWS.

ETL and ELT design patterns for lake house architecture using Amazon Redshift: Part 2

Post Syndicated from Asim Kumar Sasmal original https://aws.amazon.com/blogs/big-data/etl-and-elt-design-patterns-for-lake-house-architecture-using-amazon-redshift-part-2/

Part 1 of this multi-post series, ETL and ELT design patterns for lake house architecture using Amazon Redshift: Part 1, discussed common customer use cases and design best practices for building ELT and ETL data processing pipelines for data lake architecture using Amazon Redshift Spectrum, Concurrency Scaling, and recent support for data lake export.

This post shows you how to get started with a step-by-step walkthrough of a few ETL and ELT design patterns of Amazon Redshift using AWS sample datasets.

Prerequisites

Before getting started, make sure that you meet the following prerequisites:

  1. This post uses two publicly available AWS sample datasets from the US-West-2 (Oregon) Region. Use the US-West-2 (Oregon) Region for your test run to reduce cross-region network latency and cost due to the data movement.
  2. You have an AWS account in the same Region.
  3. You have the AdministratorAccess policy granted to your AWS account (for production, you should restrict this further).
  4. You have an existing Amazon S3 bucket named eltblogpost in your data lake to store unloaded data from Amazon Redshift. Because bucket names are unique across AWS accounts, replace eltblogpost with your unique bucket name as applicable in the sample code provided.
  5. You have AWS CLI installed and configured to use with your AWS account.
  6. You have an IAM policy named redshift-elt-test-s3-policy with the following read and write permissions for the Amazon S3 bucket named eltblogpost:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Action": [
                    "s3:GetBucketLocation",
                    "s3:GetObject",
                    "s3:ListBucket",
                    "s3:ListBucketMultipartUploads",
                    "s3:ListMultipartUploadParts",
                    "s3:AbortMultipartUpload",
                    "s3:PutObject",
                    "s3:DeleteObject"
                ],
                "Resource": [
                    "arn:aws:s3:::eltblogpost",
                    "arn:aws:s3:::eltblogpost/*"
                ],
                "Effect": "Allow"
            }
        ]
    }

  7. You have an IAM policy named redshift-elt-test-sampledata-s3-read-policy with read only permissions for the Amazon S3 bucket named awssampledbuswest2, hosting the sample data used for this walkthrough.
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:Get*",
                    "s3:List*"
                ],
                "Resource": [
                    "arn:aws:s3:::awssampledbuswest2",
                    "arn:aws:s3:::awssampledbuswest2/*"
                ]
            }
        ]
    }

  8. You have an IAM role named redshift-elt-test-role that has a trust relationship with redshift.amazonaws.com and glue.amazonaws.com and the following IAM policies (for production, you should restrict this further as needed):
    • redshift-elt-test-s3-policy
    • redshift-elt-test-sampledata-s3-read-policy
    • AWSGlueServiceRole
    • AWSGlueConsoleFullAccess
  9. Make a note of the ARN for redshift-elt-test-role IAM role.
  10. You have an existing Amazon Redshift cluster with the following parameters:
    • Cluster name as rseltblogpost.
    • Database name as rselttest.
    • Four dc2.large nodes.
    • An associated IAM role named redshift-elt-test-role.
    • A publicly available endpoint.
    • A cluster parameter group named eltblogpost-parameter-group, which you use to change the Concurrency Scaling
    • Cluster workload management set to manual.
  11. You have SQL Workbench/J (or another tool of your choice) and can connect successfully to the cluster.
  12. You have an EC2 instance in the same Region with PostgreSQL client CLI (psql) and can connect successfully to the cluster.
  13. You have an AWS Glue catalog database named eltblogpost as the metadata catalog for Amazon Athena and Redshift Spectrum queries.

Loading data to Amazon Redshift local storage

This post uses the Star Schema Benchmark (SSB) dataset. It is provided publicly in an S3 bucket, which any authenticated AWS user with access to Amazon S3 can access.

To load data to Amazon Redshift local storage, complete the following steps:

  1. Connect to the cluster from the SQL Workbench/J.
  2. Execute the CREATE TABLE statements from the Github repo from your SQL Workbench/J to create tables from the SSB dataset. The following diagram shows the list of tables.
  3. Execute the COPY statements from the Github repo. This step loads data into the tables you created using the sample data available in s3://awssampledbuswest2/ssbgz/. Remember to replace your IAM role ARN noted previously.
  4. To verify that each table loaded correctly, run the following commands:
    select count(*) from LINEORDER; 
    select count(*) from PART;
    select count(*) from CUSTOMER;
    select count(*) from SUPPLIER;
    select count(*) from DWDATE;

    The following results table shows the number of rows for each table in the SSB dataset:

    Table Name    Record Count
    LINEORDER     600,037,902
    PART            1,400,000
    CUSTOMER        3,000,000
    SUPPLIER        1,000,000
    DWDATE              2,556

In addition to the record counts, you can also check for a few sample records from each table.

Performing ELT and ETL using Amazon Redshift and unload to S3

The following are the high-level steps for this walkthrough:

  1. You are looking to pre-aggregate some commonly asked measures by your end-users on the point of sales (POS) data you loaded in Amazon Redshift local storage.
  2. You want to then unload the aggregated data from Amazon Redshift to your data lake (S3) in an open and analytics optimized and compressed Parquet file format. You also want to consider an optimized partitioning for the unloaded data in your data lake to help with the end-user query performance and eventually lower cost.
  3. You want to query the unloaded data from your data lake with Redshift Spectrum. You also want to share the data with other AWS services such as Athena with its pay-per-use and serverless ad hoc and on-demand query model, AWS Glue and Amazon EMR for performing ETL operations on the unloaded data and data integration with your other datasets (such as ERP, finance, or third-party data) stored in your data lake, and Amazon SageMaker for machine learning.

Complete the following steps:

  1. To compute the necessary pre-aggregates, execute the following three ELT queries available on the Github repo from your SQL Workbench/J:
    • ELT Query 1 – This query summarizes the revenue by manufacturer, category, and brand per month per year per supplier region.
    • ELT Query 2 – This query summarizes the revenue by brand per month per year by supplier region and city.
    • ELT Query 3 – This query drills down in time by customer city, supplier city, month, and year.
  2. To unload the aggregated data to S3 with Parquet file format and proper partitioning to help with the access patterns of the unloaded data in the data lake, execute the three UNLOAD queries available on the Github repo from your SQL Workbench/J. To use Redshift Spectrum for querying the unloaded data, you need the following:
    • An Amazon Redshift cluster and a SQL client (SQL Workbench/J or another tool of your choice) that can connect to your cluster and execute SQL commands. The cluster and the data files in S3 must be in the same Region.
    • An external schema in Amazon Redshift that references a database in the external data catalog and provides the IAM role ARN that authorizes your cluster to access S3 on your behalf. It is a best practice to have an external data catalog in AWS Glue.You are now ready to create an AWS Glue crawler.
  3. From AWS CLI, run the following code (replace <Your AWS Account>):
    aws glue create-crawler --cli-input-json file://mycrawler.json --region us-west-2
    Where the file mycrawler.json contains:
    {
        "Name": "eltblogpost_redshift_spectrum_etl_elt_glue_crawler",
        "Role": "arn:aws:iam::<Your AWS Account>:role/redshift-elt-test-role",
        "DatabaseName": "eltblogpost",
        "Description": "",
        "Targets": {
            "S3Targets": [
                {
                    "Path": "s3://eltblogpost/unload_parquet/monthly_revenue_by_region_manufacturer_category_brand"
                },
                {
                    "Path": "s3://eltblogpost/unload_parquet/monthly_revenue_by_region_city_brand"
                },
                {
                    "Path": "s3://eltblogpost/unload_parquet/yearly_revenue_by_city"
                }
            ]
        }
    }

    You can also schedule the crawler to run periodically based on your use case. For example, you can schedule the crawler every 35 minutes to keep the AWS Glue catalog tables up to date with the data being unloaded every 30 minutes. However, this post does not configure any scheduling.

  4. After you create the AWS Glue crawler, run it manually from AWS CLI with the following command:
    aws glue start-crawler --name "eltblogpost_redshift_spectrum_etl_elt_glue_crawler" --region us-west-2
  5. When the AWS Glue crawler run is complete, go to the AWS Glue console to see the following three AWS Glue catalog tables under the database eltblogpost:
    • monthly_revenue_by_region_manufacturer_category_brand
    • monthly_revenue_by_region_city_brand
    • yearly_revenue_by_city
  6. Now that you have an external data catalog in AWS Glue named etlblogpost, create an external schema in the persistent cluster named eltblogpost using the following SQL from your SQL Workbench/J (replace <Your AWS Account>):
    create external schema spectrum_eltblogpost 
    from data catalog 
    database 'eltblogpost' 
    iam_role 'arn:aws:iam::<Your AWS Account>:role/redshift-elt-test-role'
    create external database if not exists;

    Using Spectrum, you can now query the three AWS Glue catalog tables you set up earlier.

  7. Go to SQL Workbench/J and run the following sample queries:
    • Top 10 brands by category and manufacturer contributing to revenue for the region AFRICA for the year of 1992 and month of March:
      SELECT brand, category, manufacturer, revenue 
      from "spectrum_eltblogpost"."monthly_revenue_by_region_manufacturer_category_brand"
      where year = '1992'
      and month = 'March' 
      and supplier_region = 'AFRICA'
      order by revenue desc
      limit 10;
      
      brand | category | manufacturer | revenue
      ----------+----------+--------------+-----------
      MFGR#1313 | MFGR#13 | MFGR#1 | 5170356068
      MFGR#5325 | MFGR#53 | MFGR#5 | 5106463527
      MFGR#3428 | MFGR#34 | MFGR#3 | 5055551376
      MFGR#2425 | MFGR#24 | MFGR#2 | 5046250790
      MFGR#4126 | MFGR#41 | MFGR#4 | 5037843130
      MFGR#219 | MFGR#21 | MFGR#2 | 5018018040
      MFGR#159 | MFGR#15 | MFGR#1 | 5009626205
      MFGR#5112 | MFGR#51 | MFGR#5 | 4994133558
      MFGR#5534 | MFGR#55 | MFGR#5 | 4984369900
      MFGR#5332 | MFGR#53 | MFGR#5 | 4980619214

    • Monthly revenue for the region AMERICA for the year 1995 across all brands:
      SELECT month, sum(revenue) revenue
      FROM "spectrum_eltblogpost"."monthly_revenue_by_region_city_brand"
      where year = '1992'
      and supplier_region = 'AMERICA'
      group by month;
      
      month | revenue
      ----------+--------------
      April | 4347703599195
      January | 4482598782080
      September | 4332911671240
      December | 4489411782480
      May | 4479764212732
      August | 4485519151803
      October | 4493509053843
      June | 4339267242387
      March | 4477659286311
      February | 4197523905580
      November | 4337368695526
      July | 4492092583189

    • Yearly revenue for supplier city ETHIOPIA 4 for the years 1992–1995 and month of December:
      SELECT year, supplier_city, sum(revenue) revenue
      FROM "spectrum_eltblogpost"."yearly_revenue_by_city"
      where supplier_city in ('ETHIOPIA 4')
      and year between '1992' and '1995'
      and month = 'December'
      group by year, supplier_city
      order by year, supplier_city;
      
      year | supplier_city | revenue
      -----+---------------+------------
      1992 | ETHIOPIA 4 | 91006583025
      1993 | ETHIOPIA 4 | 90617597590
      1994 | ETHIOPIA 4 | 92015649529
      1995 | ETHIOPIA 4 | 89732644163

When the data is in S3 and cataloged in the AWS Glue catalog, you can query the same catalog tables using Athena, AWS Glue, Amazon EMR, Amazon SageMaker, Amazon QuickSight, and many more AWS services that have seamless integration with S3.

Accelerating ELT and ETL using Redshift Spectrum and unload to S3

Assume that you need to pre-aggregate a set of commonly requested metrics from your end-users on a large dataset stored in the data lake (S3) cold storage using familiar SQL and unload the aggregated metrics in your data lake for downstream consumption.

The following are the high-level steps for this walkthrough:

  1. This is a batch workload that requires standard SQL joins and aggregations on a fairly large volume of relational and structured data. You want to use the power of Redshift Spectrum to perform the required SQL transformations on the data stored in S3 and unload the transformed results back to S3.
  2. You want to query the unloaded data from your data lake using Redshift Spectrum if you have an existing cluster, Athena with its pay-per-use and serverless ad hoc and on-demand query model, AWS Glue and Amazon EMR for performing ETL operations on the unloaded data and data integration with your other datasets in your data lake, and Amazon SageMaker for machine learning.

Because Redshift Spectrum allows you to query the data directly from your data lake without needing to load into Amazon Redshift local storage, you can spin up a short-lived cluster to perform ELT at a massive scale using Redshift Spectrum and terminate the cluster when the work is complete. You can automate spinning up and terminating the short-lived cluster using AWS CloudFormation. That way, you only pay for the few minutes or hours of use. A short-lived cluster can also avoid overloading the current persistent cluster serving interactive queries from live users. For this post, use your existing cluster rseltblogpost.

This post uses a publicly available sample dataset named tickit provided by AWS,  which any authenticated AWS user with access to S3 can access:

  • Sales – s3://awssampledbuswest2/tickit/spectrum/sales/
  • Event – s3://awssampledbuswest2/tickit/allevents_pipe.txt
  • Date – s3://awssampledbuswest2/tickit/date2008_pipe.txt
  • Users – s3://awssampledbuswest2/tickit/allusers_pipe.txt

It is a best practice of Redshift Spectrum for performance reasons to load the dimension tables in the local storage of your short-lived cluster and use an external table for the fact table Sales.

Complete the following steps:

  1. Connect to the cluster from the SQL Workbench/J.To use Redshift Spectrum for querying data from data lake (S3), you need to have the following:
    • An Amazon Redshift cluster and a SQL client (SQL Workbench/J or another tool of your choice) that can connect to your cluster and execute SQL commands. The cluster and the data files in S3 must be in the same Region.
    • An external schema in Amazon Redshift that references a database in the external data catalog and provides the IAM role ARN that authorizes your cluster to access S3 on your behalf. It is a best practice to have an external data catalog in AWS Glue.
    • An AWS Glue catalog database named eltblogpost that you already created.
    • An external schema in your Redshift cluster named spectrum_eltblogpost that you already created.
  2. Execute the SQL available on the Github repo to create an external table named sales in the same external schema named spectrum_eltblogpost. As shown in the previous section, you can also use an AWS Glue crawler to create the external table.
  3. Execute the SQLs available on the Github repo to create the dimension tables to load the data into Amazon Redshift local storage for Redshift Spectrum performance best practices.
  4. Execute the COPY statements available on the Github repo to load the dimension tables using the sample data available in s3://awssampledbuswest2/tickit/. Replace the IAM role ARN with the IAM role ARN you noted earlier, which is associated with your cluster.
  5. To verify that each table has the correct record count, execute the following commands:
    select count(*) from date;
    select count(*) from users;
    select count(*) from event;
    select count(*) from spectrum_eltblogpost.sales;

    The following results table shows the number of rows for each table in the tickit dataset:

    Table Name                    Record Count
    DATE                           365
    USERS                       49,990
    EVENT                        8,798
    spectrum_eltblogpost.sales 172,456

    In addition to the record counts, you can also check for a few sample records from each table.

  6. To compute the necessary pre-aggregates, execute the following three ELT queries available on the Github repo from your SQL Workbench/J:
    • ELT Query 1 – Total quantity sold on a given calendar date.
    • ELT Query 2 – Total quantity sold to each buyer.
    • ELT Query 3 – Events in the 99.9 percentile in terms of all-time gross sales.
  7. To unload the aggregated data to S3 with Parquet file format and proper partitioning to help with the access patterns of the unloaded data in the data lake, execute the three UNLOAD queries available on the Github repo from your SQL Workbench/J.
  8. To use Redshift Spectrum for querying the unloaded data, you can either create a new AWS Glue crawler or modify the previous crawler named eltblogpost_redshift_spectrum_etl_elt_glue_crawler. Update the existing crawler using the following code from AWS CLI (replace <Your AWS Account>):
    aws glue update-crawler --cli-input-json file://mycrawler.json --region us-west-2
    Where the file mycrawler.json contains:
    {
        "Name": "eltblogpost_redshift_spectrum_etl_elt_glue_crawler",
        "Role": "arn:aws:iam::<Your AWS Account>:role/redshift-elt-test-role",
        "DatabaseName": "eltblogpost",
        "Description": "",
        "Targets": {
            "S3Targets": [
                {
                    "Path": "s3://eltblogpost/unload_parquet/monthly_revenue_by_region_manufacturer_category_brand"
                },
                {
                    "Path": "s3://eltblogpost/unload_parquet/monthly_revenue_by_region_city_brand"
                },
                {
                    "Path": "s3://eltblogpost/unload_parquet/yearly_revenue_by_city"
                },
                {
                    "Path": "s3://eltblogpost/unload_parquet/total_quantity_sold_by_date"
                },
                {
                    "Path": "s3://eltblogpost/unload_parquet/total_quantity_sold_by_buyer_by_date"
                },
                {
                    "Path": "s3://eltblogpost/unload_parquet/total_price_by_eventname"
                }
            ]
        }
    }

  9. After you create the crawler successfully, run it manually from AWS CLI with the following command:
    aws glue start-crawler --name "eltblogpost_redshift_spectrum_etl_elt_glue_crawler" --region us-west-2
  10. When the crawler run is complete, go to the AWS Glue console. The following additional catalog tables are in the catalog database eltblogpost:
    • total_quantity_sold_by_date
    • total_quantity_sold_by_buyer_by_date
    • total_price_by_eventname
  11. Using Spectrum, you can now query the three preceding catalog tables. Go to SQL Workbench/J and run the following sample queries:
      • Top 10 days for quantities sold in February and March 2008:
        SELECT caldate, total_quantity
        FROM "spectrum_eltblogpost"."total_quantity_sold_by_date"
        where caldate between '2008-02-01' and '2008-03-30'
        order by total_quantity desc
        limit 10;
        
        caldate | total_quantity
        -----------+---------------
        2008-02-20 | 1170
        2008-02-25 | 1146
        2008-02-19 | 1145
        2008-02-24 | 1141
        2008-03-26 | 1138
        2008-03-22 | 1136
        2008-03-17 | 1129
        2008-03-08 | 1129
        2008-02-16 | 1127
        2008-03-23 | 1121

      • Top 10 buyers for quantities sold in February and March 2008:
        SELECT firstname,lastname,total_quantity
        FROM "spectrum_eltblogpost"."total_quantity_sold_by_buyer_by_date"
        where caldate between '2008-02-01' and '2008-03-31'
        order by total_quantity desc
        limit 10;
        
        firstname | lastname | total_quantity
        ----------+------------+---------------
        Laurel | Clay | 9
        Carolyn | Valentine | 8
        Amelia | Osborne | 8
        Kai | Gill | 8
        Gannon | Summers | 8
        Ignacia | Nichols | 8
        Ahmed | Mcclain | 8
        Amanda | Mccullough | 8
        Blair | Medina | 8
        Hadley | Bennett | 8

      • Top 10 event names for total price:
        SELECT eventname, total_price
        FROM "spectrum_eltblogpost"."total_price_by_eventname"
        order by total_price desc
        limit 10;
        
        eventname | total_price
        ---------------------+------------
        Adriana Lecouvreur | 51846.00
        Janet Jackson | 51049.00
        Phantom of the Opera | 50301.00
        The Little Mermaid | 49956.00
        Citizen Cope | 49823.00
        Sevendust | 48020.00
        Electra | 47883.00
        Mary Poppins | 46780.00
        Live | 46661.00

After the data is in S3 and cataloged in the AWS Glue catalog, you can query the same catalog tables using Amazon Athena, AWS Glue, Amazon EMR, Amazon SageMaker, Amazon QuickSight, and many more AWS services that have seamless integration with S3.

Scaling ELT and unload running in parallel using Concurrency Scaling

Assume that you have a mixed workload under concurrency when the UNLOAD queries and the ELT jobs run in parallel in your cluster with Concurrency Scaling turned on. When Concurrency Scaling is enabled, Amazon Redshift automatically adds additional cluster capacity when you need to process an increase in concurrent read queries including UNLOAD queries. By default, Concurrency Scaling mode is turned off for your cluster. In this post, you enable the Concurrency Scaling mode for your cluster.

Complete the following steps:

  1. Go to your cluster parameter group named eltblogpost-parameter-group and complete the following:
    • Update max_concurrency_scaling_clusters to 5.
    • Create a new queue named Queue 1 with Concurrency Scaling mode set to Auto and a query group named unload_query for the UNLOAD jobs in the next steps.
  2. After you make these changes, reboot your cluster for the changes to take effect.
  3. For this post, use psql client to connect to your cluster rseltblogpost from the EC2 instance that you set up earlier.
  4. Open an SSH session to your EC2 instance and copy the nine files as shown below from the concurrency folder in the Github repo to /home/ec2-user/eltblogpost/ in your EC2 instance.
  5. Review the concurrency-elt-unload.sh script that runs the following eight jobs in parallel:
    • An ELT script for SSB dataset, which kicks off one query at a time.
    • An ELT script for the tickit dataset, which kicks off one query at a time.
    • Three unload queries for the SSB datasets kicked off in parallel.
    • Three unload queries for the tickit datasets kicked off in parallel.
  6. Run the concurrency-elt-unload.sh While the script is running, you will see the following sample output:             The following are the response times taken by the script:
    real 2m40.245s
    user 0m0.104s
    sys 0m0.000s
  7. Run the following query to validate that some of the UNLOAD queries ran in the Concurrency Scaling clusters (look for “which_cluster = Concurrency Scaling” in the query output below):
    SELECT query,
    Substring(querytxt,1,90) query_text,
    starttime starttime_utc,
    (endtime-starttime)/(1000*1000) elapsed_time_secs,
    case when aborted= 0 then 'complete' else 'error' end status,
    case when concurrency_scaling_status = 1 then 'Concurrency Scaling' else 'Main' end which_cluster
    FROM stl_query
    WHERE database = 'rselttest'
    AND starttime between '2019-10-20 22:53:00' and '2019-10-20 22:56:00’
    AND userid=100
    AND querytxt NOT LIKE 'padb_fetch_sample%'
    AND (querytxt LIKE 'create%' or querytxt LIKE 'UNLOAD%')
    ORDER BY query DESC;

    See the following output from the query: 

  8. Comment out the following SET statement in the six UNLOAD query files (ssb-unload<1-3>.sql and tickit-unload<1-3>.sql) to force all six UNLOAD queries to run in the main cluster:
    set query_group to 'unload_query';

    In other words, disable Concurrency Scaling mode for the UNLOAD queries.

  9. Run the concurrency-elt-unload.sh script. While the script is running, you will see the following sample output:              The following are the response times taken by the script:
    real 3m40.328s
    user 0m0.104s
    sys 0m0.000s

    The following shows the Workload Management settings for the Redshift cluster: 

  10. Run the following query to validate that all the queries ran in the main cluster (look for “which_cluster = Main” in the query output below):
    SELECT query,
    Substring(querytxt,1,90) query_text,
    starttime starttime_utc,
    (endtime-starttime)/(1000*1000) elapsed_time_secs,
    case when aborted= 0 then 'complete' else 'error' end status,
    case when concurrency_scaling_status = 1 then 'Concurrency Scaling' else 'Main' end which_cluster
    FROM stl_query
    WHERE database = 'rselttest'
    AND starttime between '2019-10-20 23:19:00' and '2019-10-20 23:24:00’
    AND userid=100
    AND querytxt NOT LIKE 'padb_fetch_sample%'
    AND (querytxt LIKE 'create%' or querytxt LIKE 'UNLOAD%')
    ORDER BY query DESC;

    See the following output from the query:With Concurrency Scaling, the end-to-end runtime improved by 37.5% (60 seconds faster).

    Summary

    This post provided a step-by-step walkthrough of a few straightforward examples of the common ELT and ETL design patterns of Amazon Redshift using some key Amazon Redshift features such as Amazon Redshift Spectrum, Concurrency Scaling, and recent support for data lake export.

    As always, AWS welcomes feedback. Please submit thoughts or questions in the comments.

     


    About the Authors

    Asim Kumar Sasmal is a senior data architect – IoT in the Global Specialty Practice of AWS Professional Services. He helps AWS customers around the globe to design and build data driven solutions by providing expert technical consulting, best practices guidance, and implementation services on AWS platform. He is passionate about working backwards from customer ask, help them to think big, and dive deep to solve real business problems by leveraging the power of AWS platform.

     

    Maor Kleider is a principal product manager for Amazon Redshift, a fast, simple and cost-effective data warehouse. Maor is passionate about collaborating with customers and partners, learning about their unique big data use cases and making their experience even better. In his spare time, Maor enjoys traveling and exploring new restaurants with his family.

ETL and ELT design patterns for lake house architecture using Amazon Redshift: Part 1

Post Syndicated from Asim Kumar Sasmal original https://aws.amazon.com/blogs/big-data/etl-and-elt-design-patterns-for-lake-house-architecture-using-amazon-redshift-part-1/

Part 1 of this multi-post series discusses design best practices for building scalable ETL (extract, transform, load) and ELT (extract, load, transform) data processing pipelines using both primary and short-lived Amazon Redshift clusters. You also learn about related use cases for some key Amazon Redshift features such as Amazon Redshift Spectrum, Concurrency Scaling, and recent support for data lake export.

Part 2 of this series, ETL and ELT design patterns for lake house architecture using Amazon Redshift: Part 2, shows a step-by-step walkthrough to get started using Amazon Redshift for your ETL and ELT use cases.

ETL and ELT

There are two common design patterns when moving data from source systems to a data warehouse. The primary difference between the two patterns is the point in the data-processing pipeline at which transformations happen. This also determines the set of tools used to ingest and transform the data, along with the underlying data structures, queries, and optimization engines used to analyze the data. The first pattern is ETL, which transforms the data before it is loaded into the data warehouse. The second pattern is ELT, which loads the data into the data warehouse and uses the familiar SQL semantics and power of the Massively Parallel Processing (MPP) architecture to perform the transformations within the data warehouse.

In the following diagram, the first represents ETL, in which data transformation is performed outside of the data warehouse with tools such as Apache Spark or Apache Hive on Amazon EMR or AWS Glue. This pattern allows you to select your preferred tools for data transformations. The second diagram is ELT, in which the data transformation engine is built into the data warehouse for relational and SQL workloads. This pattern is powerful because it uses the highly optimized and scalable data storage and compute power of MPP architecture.

Redshift Spectrum

Amazon Redshift is a fully managed data warehouse service on AWS. It uses a distributed, MPP, and shared nothing architecture. Redshift Spectrum is a native feature of Amazon Redshift that enables you to run the familiar SQL of Amazon Redshift with the BI application and SQL client tools you currently use against all your data stored in open file formats in your data lake (Amazon S3).

A common pattern you may follow is to run queries that span both the frequently accessed hot data stored locally in Amazon Redshift and the warm or cold data stored cost-effectively in Amazon S3, using views with no schema binding for external tables. This enables you to independently scale your compute resources and storage across your cluster and S3 for various use cases.

Redshift Spectrum supports a variety of structured and unstructured file formats such as Apache Parquet, Avro, CSV, ORC, JSON to name a few. Because the data stored in S3 is in open file formats, the same data can serve as your single source of truth and other services such as Amazon Athena, Amazon EMR, and Amazon SageMaker can access it directly from your S3 data lake.

For more information, see Amazon Redshift Spectrum Extends Data Warehousing Out to Exabytes—No Loading Required.

Concurrency Scaling

Using Concurrency Scaling, Amazon Redshift automatically and elastically scales query processing power to provide consistently fast performance for hundreds of concurrent queries. Concurrency Scaling resources are added to your Amazon Redshift cluster transparently in seconds, as concurrency increases, to serve sudden spikes in concurrent requests with fast performance without wait time. When the workload demand subsides, Amazon Redshift automatically shuts down Concurrency Scaling resources to save you cost.

The following diagram shows how the Concurrency Scaling works at a high-level:

For more information, see New – Concurrency Scaling for Amazon Redshift – Peak Performance at All Times.

Data lake export

Amazon Redshift now supports unloading the result of a query to your data lake on S3 in Apache Parquet, an efficient open columnar storage format for analytics. The Parquet format is up to two times faster to unload and consumes up to six times less storage in S3, compared to text formats. You can also specify one or more partition columns, so that unloaded data is automatically partitioned into folders in your S3 bucket to improve query performance and lower the cost for downstream consumption of the unloaded data. For example, you can choose to unload your marketing data and partition it by year, month, and day columns. This enables your queries to take advantage of partition pruning and skip scanning of non-relevant partitions when filtered by the partitioned columns, thereby improving query performance and lowering cost. For more information, see UNLOAD.

Use cases

You may be using Amazon Redshift either partially or fully as part of your data management and data integration needs. You likely transitioned from an ETL to an ELT approach with the advent of MPP databases due to your workload being primarily relational, familiar SQL syntax, and the massive scalability of MPP architecture.

This section presents common use cases for ELT and ETL for designing data processing pipelines using Amazon Redshift.

ELT

Consider a batch data processing workload that requires standard SQL joins and aggregations on a modest amount of relational and structured data. You selected initially a Hadoop-based solution to accomplish your SQL needs. However, over time, as data continued to grow, your system didn’t scale well. You now find it difficult to meet your required performance SLA goals and often refer to ever-increasing hardware and maintenance costs. Relational MPP databases bring an advantage in terms of performance and cost, and lowers the technical barriers to process data by using familiar SQL.

Amazon Redshift has significant benefits based on its massively scalable and fully managed compute underneath to process structured and semi-structured data directly from your data lake in S3.

The following diagram shows how Redshift Spectrum allows you to simplify and accelerate your data processing pipeline from a four-step to a one-step process with the CTAS (Create Table As) command.

The preceding architecture enables seamless interoperability between your Amazon Redshift data warehouse solution and your existing data lake solution on S3 hosting other Enterprise datasets such as ERP, finance, and third-party for a variety of data integration use cases.

The following diagram shows the seamless interoperability between your Amazon Redshift and your data lake on S3:

When you use an ELT pattern, you can also use your existing ELT-optimized SQL workload while migrating from your on-premises data warehouse to Amazon Redshift. This eliminates the need to rewrite relational and complex SQL workloads into a new compute framework from scratch. With Amazon Redshift, you can load, transform, and enrich your data efficiently using familiar SQL with advanced and robust SQL support, simplicity, and seamless integration with your existing SQL tools. You also need the monitoring capabilities provided by Amazon Redshift for your clusters.

ETL

You have a requirement to unload a subset of the data from Amazon Redshift back to your data lake (S3) in an open and analytics-optimized columnar file format (Parquet). You then want to query the unloaded datasets from the data lake using Redshift Spectrum and other AWS services such as Athena for ad hoc and on-demand analysis, AWS Glue and Amazon EMR for ETL, and Amazon SageMaker for machine learning.

You have a requirement to share a single version of a set of curated metrics (computed in Amazon Redshift) across multiple business processes from the data lake. You can use ELT in Amazon Redshift to compute these metrics and then use the unload operation with optimized file format and partitioning to unload the computed metrics in the data lake.

You also have a requirement to pre-aggregate a set of commonly requested metrics from your end-users on a large dataset stored in the data lake (S3) cold storage using familiar SQL and unload the aggregated metrics in your data lake for downstream consumption. In other words, consider a batch workload that requires standard SQL joins and aggregations on a fairly large volume of relational and structured cold data stored in S3 for a short duration of time. You can use the power of Redshift Spectrum by spinning up one or many short-lived Amazon Redshift clusters that can perform the required SQL transformations on the data stored in S3, unload the transformed results back to S3 in an optimized file format, and terminate the unneeded Amazon Redshift clusters at the end of the processing. This way, you only pay for the duration in which your Amazon Redshift clusters serve your workloads.

As shown in the following diagram, once the transformed results are unloaded in S3, you then query the unloaded data from your data lake either using Redshift Spectrum if you have an existing Amazon Redshift cluster, Athena with its pay-per-use and serverless ad hoc and on-demand query model, AWS Glue and Amazon EMR for performing ETL operations on the unloaded data and data integration with your other datasets (such as ERP, finance, and third-party data) stored in your data lake, and Amazon SageMaker for machine learning.

You can also scale the unloading operation by using the Concurrency Scaling feature of Amazon Redshift. This provides a scalable and serverless option to bulk export data in an open and analytics-optimized file format using familiar SQL.

Best practices

The following recommended practices can help you to optimize your ELT and ETL workload using Amazon Redshift.

Analyze requirements to decide ELT versus ETL

MPP architecture of Amazon Redshift and its Spectrum feature is efficient and designed for high-volume relational and SQL-based ELT workload (joins, aggregations) at a massive scale. A common practice to design an efficient ELT solution using Amazon Redshift is to spend sufficient time to analyze the following:

  • Type of data from source systems (structured, semi-structured, and unstructured)
  • Nature of the transformations required (usually encompassing cleansing, enrichment, harmonization, transformations, and aggregations)
  • Row-by-row, cursor-based processing needs versus batch SQL
  • Performance SLA and scalability requirements considering the data volume growth over time
  • Cost of the solution

This helps to assess if the workload is relational and suitable for SQL at MPP scale.

Key considerations for ELT

For ELT and ELT both, it is important to build a good physical data model for better performance for all tables, including staging tables with proper data types and distribution methods. A dimensional data model (star schema) with fewer joins works best for MPP architecture including ELT-based SQL workloads. Consider using a TEMPORARY table for intermediate staging tables as feasible for the ELT process for better write performance, because temporary tables only write a single copy.

A common rule of thumb for ELT workloads is to avoid row-by-row, cursor-based processing (a commonly overlooked finding for stored procedures). This is sub-optimal because such processing needs to happen on the leader node of an MPP database like Amazon Redshift. Instead, the recommendation for such a workload is to look for an alternative distributed processing programming framework, such as Apache Spark.

Several hundreds to thousands of single record inserts, updates, and deletes for highly transactional needs are not efficient using MPP architecture. Instead, stage those records for either a bulk UPDATE or DELETE/INSERT on the table as a batch operation.

With the external table capability of Redshift Spectrum, you can optimize your transformation logic using a single SQL as opposed to loading data first in Amazon Redshift local storage for staging tables and then doing the transformations on those staging tables.

Key considerations for data lake export

When you unload data from Amazon Redshift to your data lake in S3, pay attention to data skew or processing skew in your Amazon Redshift tables. The UNLOAD command uses the parallelism of the slices in your cluster. Hence, if there is a data skew at rest or processing skew at runtime, unloaded files on S3 may have different file sizes, which impacts your UNLOAD command response time and query response time downstream for the unloaded data in your data lake.

You should also control maximum file size to approximately 100 MB or less in the UNLOAD command for better performance for downstream consumption. Similarly, for S3 partitioning, a rule of thumb is to not exceed number of partitions per table on S3 to couple of hundreds by choosing the low cardinality partitioning columns (year, quarter, month, and day are good choices) in the UNLOAD command. This avoids creating too many partitions, which in turn creates a large volume of metadata in the AWS Glue catalog, leading to high query times via Athena and Redshift Spectrum.

To get the best throughput and performance under concurrency for multiple UNLOAD commands running in parallel, create a separate queue for unload queries with Concurrency Scaling turned on. This lets Amazon Redshift burst additional Concurrency Scaling clusters as required.

Key considerations for Redshift Spectrum for ELT

To get the best performance from Redshift Spectrum, pay attention to the maximum pushdown operations possible, such as S3 scan, projection, filtering, and aggregation, in your query plans for a performance boost. This is because you want to utilize the powerful infrastructure underneath that supports Redshift Spectrum. Using predicate pushdown also avoids consuming resources in the Amazon Redshift cluster.

In addition, avoid complex operations like DISTINCT or ORDER BY on more than one column and replace them with GROUP BY as applicable. Amazon Redshift can push down a single column DISTINCT as a GROUP BY to the Spectrum compute layer with a query rewrite capability underneath, whereas multi-column DISTINCT or ORDER BY operations need to happen inside Amazon Redshift cluster.

Amazon Redshift optimizer can use external table statistics to generate more optimal execution plans. Without statistics, an execution plan is generated based on heuristics with the assumption that the S3 table is relatively large. It is recommended to set the table statistics (numRows) manually for S3 external tables.

For more information on Amazon Redshift Spectrum best practices, see Twelve Best Practices for Amazon Redshift Spectrum and How to enable cross-account Amazon Redshift COPY and Redshift Spectrum query for AWS KMS–encrypted data in Amazon S3.

Summary

This post discussed the common use cases and design best practices for building ELT and ETL data processing pipelines for data lake architecture using few key features of Amazon Redshift: Spectrum, Concurrency Scaling, and the recently released support for data lake export with partitioning.

Part 2 of this series, ETL and ELT design patterns for lake house architecture using Amazon Redshift: Part 2, shows you how to get started with a step-by-step walkthrough of a few simple examples using AWS sample datasets.

As always, AWS welcomes feedback. Please submit thoughts or questions in the comments.

 


About the Authors

Asim Kumar Sasmal is a senior data architect – IoT in the Global Specialty Practice of AWS Professional Services. He helps AWS customers around the globe to design and build data driven solutions by providing expert technical consulting, best practices guidance, and implementation services on AWS platform. He is passionate about working backwards from customer ask, help them to think big, and dive deep to solve real business problems by leveraging the power of AWS platform.

 

Maor Kleider is a principal product manager for Amazon Redshift, a fast, simple and cost-effective data warehouse. Maor is passionate about collaborating with customers and partners, learning about their unique big data use cases and making their experience even better. In his spare time, Maor enjoys traveling and exploring new restaurants with his family.

How we used our new GraphQL Analytics API to build Firewall Analytics

Post Syndicated from Nick Downie original https://blog.cloudflare.com/how-we-used-our-new-graphql-api-to-build-firewall-analytics/

How we used our new GraphQL Analytics API to build Firewall Analytics

How we used our new GraphQL Analytics API to build Firewall Analytics

Firewall Analytics is the first product in the Cloudflare dashboard to utilize the new GraphQL Analytics API. All Cloudflare dashboard products are built using the same public APIs that we provide to our customers, allowing us to understand the challenges they face when interfacing with our APIs. This parity helps us build and shape our products, most recently the new GraphQL Analytics API that we’re thrilled to release today.

By defining the data we want, along with the response format, our GraphQL Analytics API has enabled us to prototype new functionality and iterate quickly from our beta user feedback. It is helping us deliver more insightful analytics tools within the Cloudflare dashboard to our customers.

Our user research and testing for Firewall Analytics surfaced common use cases in our customers’ workflow:

  • Identifying spikes in firewall activity over time
  • Understanding the common attributes of threats
  • Drilling down into granular details of an individual event to identify potential false positives

We can address all of these use cases using our new GraphQL Analytics API.

GraphQL Basics

Before we look into how to address each of these use cases, let’s take a look at the format of a GraphQL query and how our schema is structured.

A GraphQL query is comprised of a structured set of fields, for which the server provides corresponding values in its response. The schema defines which fields are available and their type. You can find more information about the GraphQL query syntax and format in the official GraphQL documentation.

To run some GraphQL queries, we recommend downloading a GraphQL client, such as GraphiQL, to explore our schema and run some queries. You can find documentation on getting started with this in our developer docs.

At the top level of the schema is the viewer field. This represents the top level node of the user running the query. Within this, we can query the zones field to find zones the current user has access to, providing a filter argument, with a zoneTag of the identifier of the zone we’d like narrow down to.

{
  viewer {
    zones(filter: { zoneTag: "YOUR_ZONE_ID" }) {
      # Here is where we'll query our firewall events
    }
  }
}

Now that we have a query that finds our zone, we can start querying the firewall events which have occurred in that zone, to help solve some of the use cases we’ve identified.

Visualising spikes in firewall activity

It’s important for customers to be able to visualise and understand anomalies and spikes in their firewall activity, as these could indicate an attack or be the result of a misconfiguration.

Plotting events in a timeseries chart, by their respective action, provides users with a visual overview of the trend of their firewall events.

Within the zones field in the query we’ve created earlier, we can query our firewall event aggregates using the firewallEventsAdaptiveGroups field, providing arguments to limit the count of groups, a filter for the date range we’re looking for (combined with any user-entered filters), and a list of fields to order by; in this case, just the datetimeHour field that we’re grouping by.

Within the zones field in the query we created earlier, we can further query our firewall event aggregates using the firewallEventsAdaptiveGroups field and providing arguments for:

  • A limit for the count of groups
  • A filter for the date range we’re looking for (combined with any user-entered filters)
  • A list of fields to orderBy (in this case, just the datetimeHour field that we’re grouping by).

By adding the dimensions field, we’re querying for groups of firewall events, aggregated by the fields nested within dimensions. In this case, our query includes the action and datetimeHour fields, meaning the response will be groups of firewall events which share the same action, and fall within the same hour. We also add a count field, to get a numeric count of how many events fall within each group.

query FirewallEventsByTime($zoneTag: string, $filter: FirewallEventsAdaptiveGroupsFilter_InputObject) {
  viewer {
    zones(filter: { zoneTag: $zoneTag }) {
      firewallEventsAdaptiveGroups(
        limit: 576
        filter: $filter
        orderBy: [datetimeHour_DESC]
      ) {
        count
        dimensions {
          action
          datetimeHour
        }
      }
    }
  }
}

Note – Each of our groups queries require a limit to be set. A firewall event can have one of 8 possible actions, and we are querying over a 72 hour period. At most, we’ll end up with 567 groups, so we can set that as the limit for our query.

This query would return a response in the following format:

{
  "viewer": {
    "zones": [
      {
        "firewallEventsAdaptiveGroups": [
          {
            "count": 5,
            "dimensions": {
              "action": "jschallenge",
              "datetimeHour": "2019-09-12T18:00:00Z"
            }
          }
          ...
        ]
      }
    ]
  }
}

We can then take these groups and plot each as a point on a time series chart. Mapping over the firewallEventsAdaptiveGroups array, we can use the group’s count property on the y-axis for our chart, then use the nested fields within the dimensions object, using action as unique series and the datetimeHour as the time stamp on the x-axis.

How we used our new GraphQL Analytics API to build Firewall Analytics

Top Ns

After identifying a spike in activity, our next step is to highlight events with commonality in their attributes. For example, if a certain IP address or individual user agent is causing many firewall events, this could be a sign of an individual attacker, or could be surfacing a false positive.

Similarly to before, we can query aggregate groups of firewall events using the firewallEventsAdaptiveGroups field. However, in this case, instead of supplying action and datetimeHour to the group’s dimensions, we can add individual fields that we want to find common groups of.

By ordering by descending count, we’ll retrieve groups with the highest commonality first, limiting to the top 5 of each. We can add a single field nested within dimensions to group by it. For example, adding clientIP will give five groups with the IP addresses causing the most events.

We can also add a firewallEventsAdaptiveGroups field with no nested dimensions. This will create a single group which allows us to find the total count of events matching our filter.

query FirewallEventsTopNs($zoneTag: string, $filter: FirewallEventsAdaptiveGroupsFilter_InputObject) {
  viewer {
    zones(filter: { zoneTag: $zoneTag }) {
      topIPs: firewallEventsAdaptiveGroups(
        limit: 5
        filter: $filter
        orderBy: [count_DESC]
      ) {
        count
        dimensions {
          clientIP
        }
      }
      topUserAgents: firewallEventsAdaptiveGroups(
        limit: 5
        filter: $filter
        orderBy: [count_DESC]
      ) {
        count
        dimensions {
          userAgent
        }
      }
      total: firewallEventsAdaptiveGroups(
        limit: 1
        filter: $filter
      ) {
        count
      }
    }
  }
}

Note – we can add the firewallEventsAdaptiveGroups field multiple times within a single query, each aliased differently. This allows us to fetch multiple different groupings by different fields, or with no groupings at all. In this case, getting a list of top IP addresses, top user agents, and the total events.

How we used our new GraphQL Analytics API to build Firewall Analytics

We can then reference each of these aliases in the UI, mapping over their respective groups to render each row with its count, and a bar which represents the proportion of total events, showing the proportion of all events each row equates to.

Are these firewall events false positives?

After users have identified spikes, anomalies and common attributes, we wanted to surface more information as to whether these have been caused by malicious traffic, or are false positives.

To do this, we wanted to provide additional context on the events themselves, rather than just counts. We can do this by querying the firewallEventsAdaptive field for these events.

Our GraphQL schema uses the same filter format for both the aggregate firewallEventsAdaptiveGroups field and the raw firewallEventsAdaptive field. This allows us to use the same filters to fetch the individual events which summate to the counts and aggregates in the visualisations above.

query FirewallEventsList($zoneTag: string, $filter: FirewallEventsAdaptiveFilter_InputObject) {
  viewer {
    zones(filter: { zoneTag: $zoneTag }) {
      firewallEventsAdaptive(
        filter: $filter
        limit: 10
        orderBy: [datetime_DESC]
      ) {
        action
        clientAsn
        clientCountryName
        clientIP
        clientRequestPath
        clientRequestQuery
        datetime
        rayName
        source
        userAgent
      }
    }
  }
}

How we used our new GraphQL Analytics API to build Firewall Analytics

Once we have our individual events, we can render all of the individual fields we’ve requested, providing users the additional context on event they need to determine whether this is a false positive or not.

That’s how we used our new GraphQL Analytics API to build Firewall Analytics, helping solve some of our customers most common security workflow use cases. We’re excited to see what you build with it, and the problems you can help tackle.

You can find out how to get started querying our GraphQL Analytics API using GraphiQL in our developer documentation, or learn more about writing GraphQL queries on the official GraphQL Foundation documentation.

Introducing the GraphQL Analytics API: exactly the data you need, all in one place

Post Syndicated from Filipp Nisenzoun original https://blog.cloudflare.com/introducing-the-graphql-analytics-api-exactly-the-data-you-need-all-in-one-place/

Introducing the GraphQL Analytics API: exactly the data you need, all in one place

Introducing the GraphQL Analytics API: exactly the data you need, all in one place

Today we’re excited to announce a powerful and flexible new way to explore your Cloudflare metrics and logs, with an API conforming to the industry-standard GraphQL specification. With our new GraphQL Analytics API, all of your performance, security, and reliability data is available from one endpoint, and you can select exactly what you need, whether it’s one metric for one domain or multiple metrics aggregated for all of your domains. You can ask questions like “How many cached bytes have been returned for these three domains?” Or, “How many requests have all the domains under my account received?” Or even, “What effect did changing my firewall rule an hour ago have on the responses my users were seeing?”

The GraphQL standard also has strong community resources, from extensive documentation to front-end clients, making it easy to start creating simple queries and progress to building your own sophisticated analytics dashboards.

From many APIs…

Providing insights has always been a core part of Cloudflare’s offering. After all, by using Cloudflare, you’re relying on us for key parts of your infrastructure, and so we need to make sure you have the data to manage, monitor, and troubleshoot your website, app, or service. Over time, we developed a few key data APIs, including ones providing information regarding your domain’s traffic, DNS queries, and firewall events. This multi-API approach was acceptable while we had only a few products, but we started to run into some challenges as we added more products and analytics. We couldn’t expect users to adopt a new analytics API every time they started using a new product. In fact, some of the customers and partners that were relying on many of our products were already becoming confused by the various APIs.

Following the multi-API approach was also affecting how quickly we could develop new analytics within the Cloudflare dashboard, which is used by more people for data exploration than our APIs. Each time we built a new product, our product engineering teams had to implement a corresponding analytics API, which our user interface engineering team then had to learn to use. This process could take up to several months for each new set of analytics dashboards.

…to one

Our new GraphQL Analytics API solves these problems by providing access to all Cloudflare analytics. It offers a standard, flexible syntax for describing exactly the data you need and provides predictable, matching responses. This approach makes it an ideal tool for:

  1. Data exploration. You can think of it as a way to query your own virtual data warehouse, full of metrics and logs regarding the performance, security, and reliability of your Internet property.
  2. Building amazing dashboards, which allow for flexible filtering, sorting, and drilling down or rolling up. Creating these kinds of dashboards would normally require paying thousands of dollars for a specialized analytics tool. You get them as part of our product and can customize them for yourself using the API.

In a companion post that was also published today, my colleague Nick discusses using the GraphQL Analytics API to build dashboards. So, in this post, I’ll focus on examples of how you can use the API to explore your data. To make the queries, I’ll be using GraphiQL, a popular open-source querying tool that takes advantage of GraphQL’s capabilities.

Introspection: what data is available?

The first thing you may be wondering: if the GraphQL Analytics API offers access to so much data, how do I figure out what exactly is available, and how I can ask for it? GraphQL makes this easy by offering “introspection,” meaning you can query the API itself to see the available data sets, the fields and their types, and the operations you can perform. GraphiQL uses this functionality to provide a “Documentation Explorer,” query auto-completion, and syntax validation. For example, here is how I can see all the data sets available for a zone (domain):

Introducing the GraphQL Analytics API: exactly the data you need, all in one place

If I’m writing a query, and I’m interested in data on firewall events, auto-complete will help me quickly find relevant data sets and fields:

Introducing the GraphQL Analytics API: exactly the data you need, all in one place

Querying: examples of questions you can ask

Let’s say you’ve made a major product announcement and expect a surge in requests to your blog, your application, and several other zones (domains) under your account. You can check if this surge materializes by asking for the requests aggregated under your account, in the 30 minutes after your announcement post, broken down by the minute:

{
 viewer { 
   accounts (filter: {accountTag: $accountTag}) {
     httpRequests1mGroups(limit: 30, filter: {datetime_geq: "2019-09-16T20:00:00Z", datetime_lt: "2019-09-16T20:30:00Z"}, orderBy: [datetimeMinute_ASC]) {
	  dimensions {
		datetimeMinute
	  }
	  sum {
		requests
	  }
	}
   }
 }
}

Here is the first part of the response, showing requests for your account, by the minute:

Introducing the GraphQL Analytics API: exactly the data you need, all in one place

Now, let’s say you want to compare the traffic coming to your blog versus your marketing site over the last hour. You can do this in one query, asking for the number of requests to each zone:

{
 viewer {
   zones(filter: {zoneTag_in: [$zoneTag1, $zoneTag2]}) {
     httpRequests1hGroups(limit: 2, filter: {datetime_geq: "2019-09-16T20:00:00Z",
datetime_lt: "2019-09-16T21:00:00Z"}) {
       sum {
         requests
       }
     }
   }
 }
}

Here is the response:

Introducing the GraphQL Analytics API: exactly the data you need, all in one place

Finally, let’s say you’re seeing an increase in error responses. Could this be correlated to an attack? You can look at error codes and firewall events over the last 15 minutes, for example:

{
 viewer {
   zones(filter: {zoneTag: $zoneTag}) {
     httpRequests1mGroups (limit: 100,
filter: {datetime_geq: "2019-09-16T21:00:00Z",
datetime_lt: "2019-09-16T21:15:00Z"}) {
       sum {
         responseStatusMap {
           edgeResponseStatus
           requests
         }
       }
     }
    firewallEventsAdaptiveGroups (limit: 100,
filter: {datetime_geq: "2019-09-16T21:00:00Z",
datetime_lt: "2019-09-16T21:15:00Z"}) {
       dimensions {
         action
       }
       count
     }
    }
  }
}

Notice that, in this query, we’re looking at multiple datasets at once, using a common zone identifier to “join” them. Here are the results:

Introducing the GraphQL Analytics API: exactly the data you need, all in one place

By examining both data sets in parallel, we can see a correlation: 31 requests were “dropped” or blocked by the Firewall, which is exactly the same as the number of “403” responses. So, the 403 responses were a result of Firewall actions.

Try it today

To learn more about the GraphQL Analytics API and start exploring your Cloudflare data, follow the “Getting started” guide in our developer documentation, which also has details regarding the current data sets and time periods available. We’ll be adding more data sets over time, so take advantage of the introspection feature to see the latest available.

Finally, to make way for the new API, the Zone Analytics API is now deprecated and will be sunset on May 31, 2020. The data that Zone Analytics provides is available from the GraphQL Analytics API. If you’re currently using the API directly, please follow our migration guide to change your API calls. If you get your analytics using the Cloudflare dashboard or our Datadog integration, you don’t need to take any action.

One more thing….

In the API examples above, if you find it helpful to get analytics aggregated for all the domains under your account, we have something else you may like: a brand new Analytics dashboard (in beta) that provides this same information. If your account has many zones, the dashboard is helpful for knowing summary information on metrics such as requests, bandwidth, cache rate, and error rate. Give it a try and let us know what you think using the feedback link above the new dashboard.

New tools to monitor your server and avoid downtime

Post Syndicated from Brian Batraski original https://blog.cloudflare.com/new-tools-to-monitor-your-server-and-avoid-downtime/

New tools to monitor your server and avoid downtime

New tools to monitor your server and avoid downtime

When your server goes down, it’s a big problem. Today, Cloudflare is introducing two new tools to help you understand and respond faster to origin downtime — plus, a new service to automatically avoid downtime.

The new features are:

  • Standalone Health Checks, which notify you as soon as we detect problems at your origin server, without needing a Cloudflare Load Balancer.
  • Passive Origin Monitoring, which lets you know when your origin cannot be reached, with no configuration required.
  • Zero-Downtime Failover, which can automatically avert failures by retrying requests to origin.

Standalone Health Checks

Our first new tool is Standalone Health Checks, which will notify you as soon as we detect problems at your origin server — without needing a Cloudflare Load Balancer.

A Health Check is a service that runs on our edge network to monitor whether your origin server is online. Health Checks are a key part of our load balancing service because they allow us to quickly and actively route traffic to origin servers that are live and ready to serve requests. Standalone Health Checks allow you to monitor the health of your origin even if you only have one origin or do not yet need to balance traffic across your infrastructure.

We’ve provided many dimensions for you to hone in on exactly what you’d like to check, including response code, protocol type, and interval. You can specify a particular path if your origin serves multiple applications, or you can check a larger subset of response codes for your staging environment. All of these options allow you to properly target your Health Check, giving you a precise picture of what is wrong with your origin.

New tools to monitor your server and avoid downtime

If one of your origin servers becomes unavailable, you will receive a notification letting you know of the health change, along with detailed information about the failure so you can take action to restore your origin’s health.  

Lastly, once you’ve set up your Health Checks across the different origin servers, you may want to see trends or the top unhealthy origins. With Health Check Analytics, you’ll be able to view all the change events for a given health check, isolate origins that may be top offenders or not performing at par, and move forward with a fix. On top of this, in the near future, we are working to provide you with access to all Health Check raw events to ensure you have the detailed lens to compare Cloudflare Health Check Event logs against internal server logs.

New tools to monitor your server and avoid downtime

Users on the Pro, Business, or Enterprise plan will have access to Standalone Health Checks and Health Check Analytics to promote top-tier application reliability and help maximize brand trust with their customers. You can access Standalone Health Checks and Health Check Analytics through the Traffic app in the dashboard.

Passive Origin Monitoring

Standalone Health Checks are a super flexible way to understand what’s happening at your origin server. However, they require some forethought to configure before an outage happens. That’s why we’re excited to introduce Passive Origin Monitoring, which will automatically notify you when a problem occurs — no configuration required.

Cloudflare knows when your origin is down, because we’re the ones trying to reach it to serve traffic! When we detect downtime lasting longer than a few minutes, we’ll send you an email.

Starting today, you can configure origin monitoring alerts to go to multiple email addresses. Origin Monitoring alerts are available in the new Notification Center (more on that below!) in the Cloudflare dashboard:

New tools to monitor your server and avoid downtime

Passive Origin Monitoring is available to customers on all Cloudflare plans.

Zero-Downtime Failover

What’s better than getting notified about downtime? Never having downtime in the first place! With Zero-Downtime Failover, we can automatically retry requests to origin, even before Load Balancing kicks in.

How does it work? If a request to your origin fails, and Cloudflare has another record for your origin server, we’ll just try another origin within the same HTTP request. The alternate record could be either an A/AAAA record configured via Cloudflare DNS, or another origin server in the same Load Balancing pool.

Consider an website, example.com, that has web servers at two different IP addresses: 203.0.113.1 and 203.0.113.2. Before Zero-Downtime Failover, if 203.0.113.1 becomes unavailable, Cloudflare would attempt to connect, fail, and ultimately serve an error page to the user. With Zero-Downtime Failover, if 203.0.113.1 cannot be reached, then Cloudflare’s proxy will seamlessly attempt to connect to 203.0.113.2. If the second server can respond, then Cloudflare can avert serving an error to example.com’s user.

Since we rolled Zero-Downtime Failover a few weeks ago, we’ve prevented tens of millions of requests per day from failing!

Zero-Downtime Failover works in conjunction with Load Balancing, Standalone Health Checks, and Passive Origin Monitoring to keep your website running without a hitch. Health Checks and Load Balancing can avert failure, but take time to kick in. Zero-Downtime failover works instantly, but adds latency on each connection attempt. In practice, Zero-Downtime Failover is helpful at the start of an event, when it can instantly recover from errors; once a Health Check has detected a problem, a Load Balancer can then kick in and properly re-route traffic. And if no origin is available, we’ll send an alert via Passive Origin Monitoring.

To see an example of this in practice, consider an incident from a recent customer. They saw a spike in errors at their origin that would ordinarily cause availability to plummet (red line), but thanks to Zero-Downtime failover, their actual availability stayed flat (blue line).

New tools to monitor your server and avoid downtime

During a 30 minute time period, Zero-Downtime Failover improved overall availability from 99.53% to 99.98%, and prevented 140,000 HTTP requests from resulting in an error.

It’s important to note that we only attempt to retry requests that have failed during the TCP or TLS connection phase, which ensures that HTTP headers and payload have not been transmitted yet. Thanks to this safety mechanism, we’re able to make Zero-Downtime Failover Cloudflare’s default behavior for Pro, Business, and Enterprise plans. In other words, Zero-Downtime Failover makes connections to your origins more reliable with no configuration or action required.

Coming soon: more notifications, more flexibility

Our customers are always asking us for more insights into the health of their critical edge infrastructure. Health Checks and Passive Origin monitoring are a significant step towards Cloudflare taking a proactive instead of reactive approach to insights.

To support this work, today we’re announcing the Notification Center as the central place to manage notifications. This is available in the dashboard today, accessible from your Account Home.

From here, you can create new notifications, as well as view any existing notifications you’ve already set up. Today’s release allows you to configure  Passive Origin Monitoring notifications, and set multiple email recipients.

New tools to monitor your server and avoid downtime

We’re excited about today’s launches to helping our customers avoid downtime. Based on your feedback, we have lots of improvements planned that can help you get the timely insights you need:

  • New notification delivery mechanisms
  • More events that can trigger notifications
  • Advanced configuration options for Health Checks, including added protocols, threshold based notifications, and threshold based status changes.
  • More ways to configure Passive Health Checks, like the ability to add thresholds, and filter to specific status codes

Introducing Load Balancing Analytics

Post Syndicated from Brian Batraski original https://blog.cloudflare.com/introducing-load-balancing-analytics/

Introducing Load Balancing Analytics

Introducing Load Balancing Analytics

Cloudflare aspires to make Internet properties everywhere faster, more secure, and more reliable. Load Balancing helps with speed and reliability and has been evolving over the past three years.

Let’s go through a scenario that highlights a bit more of what a Load Balancer is and the value it can provide.  A standard load balancer comprises a set of pools, each of which have origin servers that are hostnames and/or IP addresses. A routing policy is assigned to each load balancer, which determines the origin pool selection process.

Let’s say you build an API that is using cloud provider ACME Web Services. Unfortunately, ACME had a rough week, and their service had a regional outage in their Eastern US region. Consequently, your website was unable to serve traffic during this period, which resulted in reduced brand trust from users and missed revenue. To prevent this from happening again, you decide to take two steps: use a secondary cloud provider (in order to avoid having ACME as a single point of failure) and use Cloudflare’s Load Balancing to take advantage of the multi-cloud architecture. Cloudflare’s Load Balancing can help you maximize your API’s availability for your new architecture. For example, you can assign health checks to each of your origin pools. These health checks can monitor your origin servers’ health by checking HTTP status codes, response bodies, and more. If an origin pool’s response doesn’t match what is expected, then traffic will stop being steered there. This will reduce downtime for your API when ACME has a regional outage because traffic in that region will seamlessly be rerouted to your fallback origin pool(s). In this scenario, you can set the fallback pool to be origin servers in your secondary cloud provider. In addition to health checks, you can use the ‘random’ routing policy in order to distribute your customers’ API requests evenly across your backend. If you want to optimize your response time instead, you can use ‘dynamic steering’, which will send traffic to the origin determined to be closest to your customer.

Our customers love Cloudflare Load Balancing, and we’re always looking to improve and make our customers’ lives easier. Since Cloudflare’s Load Balancing was first released, the most popular customer request was for an analytics service that would provide insights on traffic steering decisions.

Today, we are rolling out Load Balancing Analytics in the Traffic tab of the Cloudflare  dashboard. The three major components in the analytics service are:

  • An overview of traffic flow that can be filtered by load balancer, pool, origin, and region.
  • A latency map that indicates origin health status and latency metrics from Cloudflare’s global network spanning 194 cities and growing!
  • Event logs denoting changes in origin health. This feature was released in 2018 and tracks pool and origin transitions between healthy and unhealthy states. We’ve moved these logs under the new Load Balancing Analytics subtab. See the documentation to learn more.

In this blog post, we’ll discuss the traffic flow distribution and the latency map.

Traffic Flow Overview

Our users want a detailed view into where their traffic is going, why it is going there, and insights into what changes may optimize their infrastructure. With Load Balancing Analytics, users can graphically view traffic demands on load balancers, pools, and origins over variable time ranges.

Understanding how traffic flow is distributed informs the process of creating new origin pools, adapting to peak traffic demands, and observing failover response during origin pool failures.

Introducing Load Balancing Analytics
Figure 1

In Figure 1, we can see an overview of traffic for a given domain. On Tuesday, the 24th, the red pool was created and added to the load balancer. In the following 36 hours, as the red pool handled more traffic, the blue and green pool both saw a reduced workload. In this scenario, the traffic distribution graph did provide the customer with new insights. First, it demonstrated that traffic was being steered to the new red pool. It also allowed the customer to understand the new level of traffic distribution across their network. Finally, it allowed the customer to confirm whether traffic decreased in the expected pools. Over time, these graphs can be used to better manage capacity and plan for upcoming infrastructure needs.

Latency Map

The traffic distribution overview is only one part of the puzzle. Another essential component is understanding request performance around the world. This is useful because customers can ensure user requests are handled as fast as possible, regardless of where in the world the request originates.

The standard Load Balancing configuration contains monitors that probe the health of customer origins. These monitors can be configured to run from a particular region(s) or, for Enterprise customers, from all Cloudflare locations. They collect useful information, such as round-trip time, that can be aggregated to create the latency map.

The map provides a summary of how responsive origins are from around the world, so customers can see regions where requests are underperforming and may need further investigation. A common metric used to identify performance is request latency. We found that the p90 latency for all Load Balancing origins being monitored is 300 milliseconds, which means that 90% of all monitors’ health checks had a round trip time faster than 300 milliseconds. We used this value to identify locations where latency was slower than the p90 latency seen by other Load Balancing customers.

Introducing Load Balancing Analytics
Figure 2

In Figure 2, we can see the responsiveness of the Northeast Asia pool. The Northeast Asia pool is slow specifically for monitors in South America, the Middle East, and Southern Africa, but fast for monitors that are probing closer to the origin pool. Unfortunately, this means users for the pool in countries like Paraguay are seeing high request latency. High page load times have many unfortunate consequences: higher visitor bounce rate, decreased visitor satisfaction rate, and a lower search engine ranking. In order to avoid these repercussions, a site administrator could consider adding a new origin pool in a region closer to underserved regions. In Figure 3, we can see the result of adding a new origin pool in Eastern North America. We see the number of locations where the domain was found to be unhealthy drops to zero and the number of slow locations cut by more than 50%.

Introducing Load Balancing Analytics
Figure 3

Tied with the traffic flow metrics from the Overview page, the latency map arms users with insights to optimize their internal systems, reduce their costs, and increase their application availability.

GraphQL Analytics API

Behind the scenes, Load Balancing Analytics is powered by the GraphQL Analytics API. As you’ll learn later this week, GraphQL provides many benefits to us at Cloudflare. Customers now only need to learn a single API format that will allow them to extract only the data they require. For internal development, GraphQL eliminates the need for customized analytics APIs for each service, reduces query cost by increasing cache hits, and reduces developer fatigue by using a straightforward query language with standardized input and output formats. Very soon, all Load Balancing customers on paid plans will be given the opportunity to extract insights from the GraphQL API.  Let’s walk through some examples of how you can utilize the GraphQL API to understand your Load Balancing logs.

Suppose you want to understand the number of requests the pools for a load balancer are seeing from the different locations in Cloudflare’s global network. The query in Figure 4 counts the number of unique (location, pool ID) combinations every fifteen minutes over the course of a week.

Introducing Load Balancing Analytics
Figure 4

For context, our example load balancer, lb.example.com, utilizes dynamic steering. Dynamic steering directs requests to the most responsive, available, origin pool, which is often the closest. It does so using a weighted round-trip time measurement. Let’s try to understand why all traffic from Singapore (SIN) is being steered to our pool in Northeast Asia (asia-ne). We can run the query in Figure 5. This query shows us that the asia-ne pool has an avgRttMs value of 67ms, whereas the other two pools have avgRttMs values that exceed 150ms. The lower avgRttMs value explains why traffic in Singapore is being routed to the asia-ne pool.

Introducing Load Balancing Analytics
Figure 5

Notice how the query in Figure 4 uses the loadBalancingRequestsGroups schema, whereas the query in Figure 5 uses the loadBalancingRequests schema. loadBalancingRequestsGroups queries aggregate data over the requested query interval, whereas loadBalancingRequests provides granular information on individual requests. For those ready to get started, Cloudflare has written a helpful guide. The GraphQL website is also a great resource. We recommend you use an IDE like GraphiQL to make your queries. GraphiQL embeds the schema documentation into the IDE, autocompletes, saves your queries, and manages your custom headers, all of which help make the developer experience smoother.

Conclusion

Now that the Load Balancing Analytics solution is live and available to all Pro, Business, Enterprise customers, we’re excited for you to start using it! We’ve attached a survey to the Traffic overview page, and we’d love to hear your feedback.

Matching patient records with the AWS Lake Formation FindMatches transform

Post Syndicated from Dhawalkumar Patel original https://aws.amazon.com/blogs/big-data/matching-patient-records-with-the-aws-lake-formation-findmatches-transform/

Patient matching is a major obstacle in achieving healthcare interoperability. Mismatched patient records and inability to retrieve patient history can cause significant barriers to informed clinical decision-making and result in missed diagnoses or delayed treatments. Additionally, healthcare providers often invest in patient data deduplication, especially when the number of patient records is growing rapidly in their databases. Electronic Health Records (EHRs) have significantly improved patient safety and care coordination in recent years; however, accurate patient matching remains a challenge for many healthcare organizations.

Duplicate patient records emerge for a variety of reasons, including human-generated number insertion, deletion, substitution, or transposition errors. Optical Character Recognition (OCR) software, which digitizes patient records, may also introduce errors.

Multiple types of record-matching algorithms exist to solve this problem. These include basic deterministic methods such as grouping and comparing relevant fields (such as SSN, name, or date of birth), phonetic encoding systems, and more advanced algorithms using machine learning (ML).

AWS Lake Formation is a HIPAA-eligible service that helps you build a secure data lake in a few simple steps. Lake Formation also contains FindMatches, an ML transform that enables you to match records across different datasets and identify and remove duplicate records, with little to no human intervention.

This post shows you how to use the FindMatches ML transform to identify matching patient records in a synthetically generated dataset. To use FindMatches, you don’t have to write code or know how ML works. This is useful for finding matches in your data when it doesn’t include a reliable, unique personal identifier, even if the fields don’t match exactly.

Patient dataset

Various regulations in different countries govern patient data due to its sensitive nature. As such, the availability of patient data on which to train matching algorithms is often scarce, which complicates model development. A common method to get around these challenges is synthetic data. This post generates patient data based on the Open Source Freely Extensible Biomedical Record Linkage Program (FEBRL). FEBRL uses Hidden Markov Models (HMMs) to prepare the name and address data for patient record matching. It also allows for mimicking real-life patient datasets that lead to duplicates, which may have the following mismatches:

  • Blank fields.
  • Typographical errors such as misspelling, transposing the characters, or swapping the fields.
  • Shortened middle names versus records complete middle names.
  • Various formats of mailing address and its components.
  • OCR-related errors.
  • Phonetical errors.
  • No globally unique patient or person identifier. Every healthcare provider may have a patient identifier assigned to the same person but may not have a person identifier like SSN, so they have datasets without keys.

FEBRL can generate these types of datasets based on configurable parameters to change the probabilities of each type of error, and thus incorporate a variety of scenarios leading to duplicates. The generation of the synthetic dataset is beyond the scope of this post; this post provides a pre-generated dataset for you to explore. In brief, below are the steps to generate the synthetic dataset which will be used to run FindMatches:

  1. Download and install FEBRL.
  2. Modify the parameters to create a dataset mimicking your expectations. For more information, see the dataset generation instructions of FEBRL.
  3. Cleanse the dataset (this confirms the same schema for each record, and removes single quotes and family role).

The dataset for this post uses AWS Region US East (N. Virginia).

FRBRL patient data structure

The following table shows the structure of FEBRL patient data. The data includes 40,000 records.

The original record and duplicate records are grouped together. The patient_id values are generated in a specific format: rec-<record number>-org/dup-<duplicate record number>, followed by the FEBRL data generator tool.

The following table is a preview of what you’re working to achieve with the FindMatches ML transform. Once you match datasets, the resulting table mirrors the input table’s structure and data and adds a match_id column. Matched records display the same match_id value. It is still possible to have false positives and false negatives, but the transform is still extremely beneficial.

Prerequisites

The sample synthetic patient dataset for this post uses Region US East (N. Virginia), and thus all the steps mentioned in this post must be performed in the same AWS region i.e. us-east-1 but the steps are easily modifiable if your data is in a different region.

Solution architecture

The following diagram shows the solution’s architecture.

 

 

Solution overview

At a high level, the matching process includes the following steps:

  1. Upload raw patient dataset in csv format on Amazon S3 Bucket
  2. Crawl the uploaded patient dataset using AWS Glue crawler
  3. Catalog your patient data with the AWS Glue Data Catalog and create a FindMatches ML transform.
  4. Create a label set, either using ML transform or manually, and teach FindMatches by providing labeled examples of matching and non-matching records. Upload your labels and estimate the quality of the prediction. Add more labelsets and repeat this step as required to get the required Precision, Accuracy and Recall rate.
  5. Create and execute an AWS Glue ETL job that uses your FindMatches transform.
  6. Store the results of FindMatches transform on Amazon S3 bucket
  7. Create an AWS Glue data catalog of FindMatches ML transform results.
  8. Review the transform results with Amazon Athena.

Cataloging your data and creating a FindMatches ML transform

FindMatches operates on tables defined in the AWS Glue Data Catalog. Use AWS Glue crawlers to discover and catalog the patient data. You can use the FEBRL patient data generated for this post.

The Cloudformation stack provided below creates the resources in the AWS region - us-east-1 (US East (N. Virginia)

To create the Catalog and FindMatches ML transform in AWS Glue, launch the following stack:


This stack creates the following resources:

  • An Amazon S3 bucket that stores the ML transform results (configurable as part of the launch). You can find the name of the bucket on the AWS CloudFormation stacks console, under the Outputs This post uses the name S3BucketName.
  • An IAM role to allow AWS Glue to access other services, including S3.
  • An AWS Glue database (configurable as part of the launch).
  • An AWS Glue table that represents the Public Original Synthetic Patients Dataset (configurable as part of the launch).
  • An AWS Glue ML transform with the source as your AWS Glue table, with Accuracy set to 1 and Precision set to 0.9.

For more information, see Integrate and deduplicate datasets using AWS Lake Formation FindMatches.

Tuning the ML transform

The safety risks of false positive matches, in which clinicians believe incorrect information about the patient to be accurate, may be greater than the safety risks of false negative matches, in which clinicians lack access to the existing information about the patient. (For more information, see the related study on the NCBI website.) Therefore, moving the Recall vs. Prevision slider toward Precision has a higher level of confidence to identify if the records belong to the same patient and minimizes the safety risks of false positive matches.

A higher Accuracy setting helps achieve higher recall, at the expense of a longer runtime (and thus cost) necessary to compare more records.

To achieve the comparatively better results for this particular dataset, the launch stack already created the transform for you with the Recall vs. Precision slider set to 0.9 toward Precision and the Lower Cost vs. Accuracy slider set to Accuracy. If needed, you can adjust these values later by selecting the transform and using the Tune menu.

Teaching FindMatches using labeled data

After running the launch stack successfully, you can train the transform by supplying it with matching and non-matching records using labelsets.

Creating a labeling set

You can create a labeling set yourself or allow AWS Glue to generate the labeling set based on heuristics.

AWS Glue extracts records from your source data and suggests potential matching records. The generated labelset file contains approximately 100 data samples for you to work on.

This post provides you with an AWS Glue generated labeled data file that you can use, with a fully populated label column. This file is fully ready for consumption.

If you choose to use the pre-generated labeled data file provided in this post, skip the below labeling file generation steps

To create the training set, complete the following steps:

  1. On the AWS Glue console, Under ETL, Jobs and ML transforms you will find the ML transform with name cfn-findmatches-ml-transform-demo created for you by the stack provided.
  2. Choose the ML transform cfn-findmatches-ml-transform-demo and click on Action and select Teach transform
  3. For Teach the transform using labels, choose I do not have labels.
  4. Choose Generate labeling file.
  5. Provide the S3 path to store the generated label file.
  6. Choose Next.

The following table shows the generated labeled data file with an empty label column.

You need to populate the label column by marking the records that are a real match with the same value. Each labelset should contain positive and negative match examples.

This post provides you with a labeled data file that you can use, with a fully populated label column. This file is fully ready for consumption.

The following table shows the table with a completed label column.

The labeling file has the same schema as the input data, plus two additional columns: labeling_set_id and label.

The training dataset is divided into labeling sets. Each labeling set displays a labeling_set_id value. This identification simplifies the labeling process, enabling you to focus on the match relationship of records within the same labeling set, rather than having to scan the entire file. For the preceding dataset, extract the label values from patient_id by removing the suffix -org and -dup using regular expression. But in general, you would assign these labels according to which records should match based on the attribute values.

If you specify the same label value for two or more records within a labeling set, you teach the FindMatches transform to consider these records a match. On the other hand, when two or more records have different labels within the same labeling set, FindMatches learns that these records aren’t considered a match. The transform evaluates record relationships between records within the same labeling set, not across labeling sets.

You should label a few hundred records to achieve a modest match quality, and a few thousand records to achieve a high match quality.

Uploading your labels and reviewing match quality

After you create the labeled dataset (which needs to be in .csv format), teach FindMatches where to find it. Complete the following steps:

  1. On the AWS Glue console, select the transform that you created earlier.
  2. Choose Action.
  3. Choose Teach transform.
  4. For Upload labels, choose I have labels.
  5. Choose Upload labeling file from S3.
  6. Choose Next.
  7. If you want to use the labelset provided in this blog post, download the labelset here.
  8. Create a folder with name trainingset in the same S3 bucket created by the previously launched cloudformation template above.
  9. Upload the above labelset in the trainingset folder in the same S3 bucket
  10. Choose Overwrite my existing labels.You are only using one set of labels. If adding labels iteratively, choose the Append to my existing labels option.
  11. Choose Upload.With the labels uploaded, your transform is now ready to use. Though not strictly required, check the transform match quality by reviewing the metrics of matching and non-matching records.
  12. Choose Estimate transform quality.The transform quality estimate learns using 70% of your labels. After it’s trained, the quality estimate tests how well the transform learned to identify matching records against the remaining 30%. Finally, the transform generates quality metrics by comparing the matches and non-matches predicted by the algorithm vs. your actual labels. This process may take up to several minutes.

Your results should be similar to those in the following screenshot.

Consider these metrics approximate, because the test uses only a small subset of data for estimating quality. If you’re satisfied with the metrics, proceed with creating and running a record-matching job. Or, to improve matching quality further, upload more labeled records.

Creating and running an AWS Glue ETL job

After you create a FindMatches transform and verify that it learned to identify matching records in your data, you’re ready to identify matches in your complete dataset. To create and run a record-matching job, complete the following steps:

  1. Create a transformresults folder inside the S3 bucket that the AWS CloudFormation template created when you launched the stack.This folder stores the MLTransform results from your AWS Glue job.
  2. On the AWS Glue console, under Jobs, choose Add job.
  3. Under Configure the job properties, for Name, enter a name for the job.
  4. For IAM role, choose your role from the dropdown menu.Choose the IAM role that the AWS CloudFormation stack created, called AWSGlueServiceRoleMLTransform. For more information, see Create an IAM Role for AWS Glue.
  5. Select Spark as the Type with Glue version as Spark 2.2, Python 2 (Glue version 0.9)
  6. Select job run as A proposed script generated by AWS Glue.
  7. For Choose a data source, choose the transform data source.This post uses the data source cfn_table_patient.
  8. Under Choose a transform type, choose Find matching records.
  9. For Worker type, choose G.2X.
  10. For Number of workers, enter 10.You can add more workers based on the size of the datasets by increasing this number.
  11. To review records identified as duplicate, do not select Remove duplicate records.
  12. Choose Next.
  13. Choose the transform that you created.
  14. Choose Next.
  15. For Choose a data target, choose Create tables in your data target.
  16. For Data store, choose Amazon S3.
  17. For Format, choose CSV.
  18. For Compression type, choose None.
  19. For Target path, choose a path for the job’s output.The target path is the S3 bucket that AWS CloudFormation created, along with the folder named transformresults you created previously.
  20. Choose Save job and edit script.The script is now generated for your job and ready to use. Alternatively, you can customize the script further to suit your specific ETL needs.
  21. To start identifying matches in this dataset, choose Run job as shown in the below screen.For now, leave the job parameters with the default settings, and close this page after starting the job.The following screenshot shows the proposed Python Spark script generated by AWS Glue using the ML Transform.If the execution is successful, FindMatches shows the run status as Succeeded. The execution might take a few minutes to finish.

FindMatches saves your output data as multi-part .csv files in the target path that you specified during the job creation. The resulting .csv files mirror the input table’s structure and data, but have a match_id column. Matched records display the same match_id value.

Creating a Data Catalog of your transform results

To view the output, you can either download the multi-part .csv files from the S3 bucket directly and review it via your preferred editor, or you can run SQL-like queries against the data stored on S3 using Athena. To view the data using Athena, you need to crawl the folder with the multi-part .csv files that you created as part of the output of your FindMatches ETL job.

Go to AWS Glue and create a new table using AWS Glue crawlers in the existing database for patient matching that holds the records from the output of your FindMatches ETL job with the source data as the folder of your S3 bucket containing multi-part .csv files. For this post, the source data is the folder transformresults in the bucket created by the AWS CloudFormation stack.

To create a crawler, complete the following steps:

  1. On the AWS Glue console, under Data Catalog, choose Crawlers.
  2. Click Add crawler to create a new crawler to crawl the transform results.
  3. Provide the name of the crawler and click Next.
  4. Choose Data stores as the Crawler source type and click Next.
  5. In the Add a data store section, for Choose a data store, choose S3.
  6. For Crawl data in, choose Specified path in my account.
  7. For Include path, enter the name of your path. This should be the same S3 bucket created by cloudformation previously along with the folder named transformresults you created. Verify the folder has multi-part csv files created.
  8. Choose Next.
  9. In the Choose an IAM role section, choose Choose the IAM role.
  10. For IAM role, enter the name of the crawler.
  11. Choose Next.
  12. Select Run on demand for Frequency.
  13. Configure the crawler’s output with Database set to cfn-database-patient.
  14. Set the Prefix added to tables value to be table_results_. This will help identify the table containing the transform results.
  15. Click on Finish.
  16. Select the same crawler and click on Run the crawler.After crawler execution is successful, you should see a new table created corresponding to the crawler settings in the respective database you selected during crawler configuration.
  17. From the AWS Glue console, under Databases, choose Tables.
  18. Choose Action.
  19. Choose Edit table details.
  20. For Serde Serialization lib, enter org.apache.hadoop.hive.serde2.OpenCSVSerde.
  21. Under Serde parameters, add key escapeChar, with value as \\.
  22. Add key quoteChar with value as " (double quotes).
  23. Set key field.delim with value as ,
  24. Add key separatorChar with value as ,You can set the Serde parameters as per your requirements based on the type of datasets you have.
  25. Edit the schema of the table by setting the data types of all columns to String. To edit the schema of the table, click on the table and click on the Edit schema button.

You can also choose to retain the inferred data types by crawler as per your requirements. This post sets all to the String datatype for the sake of simplicity, except for the match_id column, which is set as bigint.

Reviewing the output with Amazon Athena

To review the output with Amazon Athena, complete the following steps:

  1. From the Data Catalog, choose Tables.
  2. Choose the table name created by your crawler for the results.
  3. Choose Action.
  4. Choose View data.

    The Athena console opens.  If you are running Amazon Athena for the first time, you might have to click on Get Started. Before you run your first query, you will also need to set up a query result location in Amazon S3. Click on set up a query result location in Amazon S3 on Amazon Athena console and set the location of the query results. You can create additional folder in the same Amazon S3 bucket created previously by the cloudformation. Please make sure the S3 path ends with a /.
  5. Choose the appropriate database.For this post, choose cfn-database-patient. You might need to refresh the data source if you do not see the database in the drop down.
  6. Choose the results table that contains the FindMatches output containing the patient records with the match_id column. In this case, it will be table_results_transformresults. If you chose a different name for the results table, the below query needs to be changed to reflect the correct table name.
  7. Run the below query by choosing Run query.
    SELECT * FROM "cfn-database-patient"."table_results_transformresults" order by match_id;

The following screenshot shows your output.

Security Considerations

AWS Lake Formation helps protect your data by giving you a central location in which you can configure granular data access policies, regardless of which services you use to access it.

To centralize data access policy controls using Lake Formation, first shut down direct access to your buckets in S3 so Lake Formation manages all data access. Configure data protection and access policies using Lake Formation, which enforces those policies across all the AWS services accessing data in your lake. You can configure users and roles and define the data these roles can access, down to the table and column level.

AWS Lake Formation provides a permissions model that is based on a simple grant/revoke mechanism. Lake Formation permissions combine with IAM permissions to control access to data stored in data lakes and to the metadata that describes that data. For more information, see Security and Access Control to Metadata and Data in Lake Formation.

Lake Formation currently supports Server-Side-Encryption on S3 (SSE-S3, AES-265). Lake Formation also supports private endpoints in your VPC and records all activity in AWS CloudTrail, so you have network isolation and auditability.

AWS Lake Formation service is a HIPAA eligible service.

Summary

This post demonstrated how to find matching records in a patient database using the Lake Formation FindMatches ML transform. It allows you to find matches when the records in two datasets don’t share a common identifier or include duplicates. This method helps you find matches between dataset rows when fields don’t match exactly or attributes are missing or corrupted.

You are now ready to start building with Lake Formation and try FindMatches on your data. Please share your feedback and questions in the comments.

 


About the Authors

Dhawalkumar Patel is a Senior Solutions Architect at Amazon Web Services. He has worked with organizations ranging from large enterprises to mid-sized startups on problems related to distributed computing, and Artificial Intelligence. He is currently focused on Machine Learning and Serverless technologies

 

 

 

Ujjwal Ratan is a principal machine learning specialist solution architect in the Global Healthcare and Lifesciences team at Amazon Web Services. He works on the application of machine learning and deep learning to real world industry problems like medical imaging, unstructured clinical text, genomics, precision medicine, clinical trials and quality of care improvement. He has expertise in scaling machine learning/deep learning algorithms on the AWS cloud for accelerated training and inference. In his free time, he enjoys listening to (and playing) music and taking unplanned road trips with his family.

 

 

 

 

Firewall Analytics: Now available to all paid plans

Post Syndicated from Alex Cruz Farmer original https://blog.cloudflare.com/updates-to-firewall-analytics/

Firewall Analytics: Now available to all paid plans

Firewall Analytics: Now available to all paid plans

Our Firewall Analytics tool enables customers to quickly identify and investigate security threats using an intuitive interface. Until now, this tool had only been available to our Enterprise customers, who have been using it to get detailed insights into their traffic and better tailor their security configurations. Today, we are excited to make Firewall Analytics available to all paid plans and share details on several recent improvements we have made.

All paid plans are now able to take advantage of these capabilities, along with several important enhancements we’ve made to improve our customers’ workflow and productivity.

Firewall Analytics: Now available to all paid plans

Increased Data Retention and Adaptive Sampling

Previously, Enterprise customers could view 14 days of Firewall Analytics for their domains. Today we’re increasing that retention to 30 days, and again to 90 days in the coming months. Business and Professional plan zones will get 30 and 3 days of retention, respectively.

In addition to the extended retention, we are introducing adaptive sampling to guarantee that Firewall Analytics results are displayed in the Cloudflare Dashboard quickly and reliably, even when you are under a massive attack or otherwise receiving a large volume of requests.

Adaptive sampling works similar to Netflix: when your internet connection runs low on bandwidth, you receive a slightly downscaled version of the video stream you are watching. When your bandwidth recovers, Netflix then upscales back to the highest quality available.

Firewall Analytics does this sampling on each query, ensuring that customers see the best precision available in the UI given current load on the zone. When results are sampled, the sampling rate will be displayed as shown below:

Firewall Analytics: Now available to all paid plans

Event-Based Logging

As adoption of our expressive Firewall Rules engine has grown, one consistent ask we’ve heard from customers is for a more streamlined way to see all Firewall Events generated by a specific rule. Until today, if a malicious request matched multiple rules, only the last one to execute was shown in the Activity Log, requiring customers to click into the request to see if the rule they’re investigating was listed as an “Additional match”.

To streamline this process, we’ve changed how the Firewall Analytics UI interacts with the Activity Log. Customers can now filter by a specific rule (or any other criteria) and see a row for each event generated by that rule. This change also makes it easier to review all requests that would have been blocked by a rule by creating it in Log mode first before changing it to Block.

Firewall Analytics: Now available to all paid plans

Challenge Solve Rates to help reduce False Positives

When our customers write rules to block undesired, automated traffic they want to make sure they’re not blocking or challenging desired traffic, e.g., humans wanting to make a purchase should be allowed but not bots scraping pricing.

To help customers determine what percent of CAPTCHA challenges returned to users may have been unnecessary, i.e., false positives, we are now showing the Challenge Solve Rate (CSR) for each rule. If you’re seeing rates higher than expected, e.g., for your Bot Management rules, you may want to relax the rule criteria. If the rate you see is 0% indicating that no CAPTCHAs are being solved, you may want to change the rule to Block outright rather than challenge.

Firewall Analytics: Now available to all paid plans

Hovering over the CSR rate will reveal the number of CAPTCHAs issued vs. solved:

Firewall Analytics: Now available to all paid plans

Exporting Firewall Events

Business and Enterprise customers can now export a set of 500 events from the Activity Log. The data exported are those events that remain after any selected filters have been applied.

Firewall Analytics: Now available to all paid plans

Column Customization

Sometimes the columns shown in the Activity Log do not contain the details you want to see to analyze the threat. When this happens, you can now click “Edit Columns” to select the fields you want to see. For example, a customer diagnosing a Bot related issue may want to also view the User-Agent and the source country whereas a customer investigating a DDoS attack may want to see IP addresses, ASNs, Path, and other attributes. You can now customize what you’d like to see as shown below.

Firewall Analytics: Now available to all paid plans

We would love to hear your feedback and suggestions, so feel free to reach out to us via our Community forums or through your Customer Success team.

If you’d like to receive more updates like this one directly to your inbox, please subscribe to our Blog!

Announcing deeper insights and new monitoring capabilities from Cloudflare Analytics

Post Syndicated from Filipp Nisenzoun original https://blog.cloudflare.com/announcing-deeper-insights-and-new-monitoring-capabilities/

Announcing deeper insights and new monitoring capabilities from Cloudflare Analytics

Announcing deeper insights and new monitoring capabilities from Cloudflare Analytics

This week we’re excited to announce a number of new products and features that provide deeper security and reliability insights, “proactive” analytics when there’s a problem, and more powerful ways to explore your data.

If you’ve been a user or follower of Cloudflare for a little while, you might have noticed that we take pride in turning technical challenges into easy solutions. Flip a switch or run a few API commands, and the attack you’re facing is now under control or your site is now 20% faster. However, this ease of use is even more helpful if it’s complemented by analytics. Before you make a change, you want to be sure that you understand your current situation. After the change, you want to confirm that it worked as intended, ideally as fast as possible.

Because of the front-line position of Cloudflare’s network, we can provide comprehensive metrics regarding both your traffic and the security and performance of your Internet property. And best of all, there’s nothing to set up or enable. Cloudflare Analytics is automatically available to all Cloudflare users and doesn’t rely on Javascript trackers, meaning that our metrics include traffic from APIs and bots and are not skewed by ad blockers.

Here’s a sneak peek of the product launches. Look out for individual blog posts this week for more details on each of these announcements.

  • Product Analytics:
    • Today, we’re making Firewall Analytics available to Business and Pro plans, so that more customers understand how well Cloudflare mitigates attacks and handles malicious traffic. And we’re highlighting some new metrics, such as the rate of solved captchas (useful for Bot Management), and features, such as customizable reports to facilitate sharing and archiving attack information.
    • We’re introducing Load Balancing Analytics, which shows traffic flows by load balancer, pool, origin, and region, and helps explain why a particular origin was selected to receive traffic.
  • Monitoring:
    • We’re announcing tools to help you monitor your origin either actively or passively and automatically reroute your traffic to a different server when needed. Because Cloudflare sits between your end users and your origin, we can spot problems with your servers without the use of external monitoring services.
  • Data tools:
    • The product analytics we’ll be featuring this week use a new API behind the scenes. We’re making this API generally available, allowing you to easily build custom dashboards and explore all of your Cloudflare data the same way we do, so you can easily gain insights and identify and debug issues.
  • Account Analytics:
    • We’re releasing (in beta) a new dashboard that shows aggregated information for all of the domains under your account, allowing you to know what’s happening at a glance.

We’re excited to tell you about all of these new products in this week’s posts and would love to hear your thoughts. If you’re not already subscribing to the blog, sign up now to receive daily updates in your inbox.

New for Amazon Redshift – Data Lake Export and Federated Query

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/new-for-amazon-redshift-data-lake-export-and-federated-queries/

A data warehouse is a database optimized to analyze relational data coming from transactional systems and line of business applications. Amazon Redshift is a fast, fully managed data warehouse that makes it simple and cost-effective to analyze data using standard SQL and existing Business Intelligence (BI) tools.

To get information from unstructured data that would not fit in a data warehouse, you can build a data lake. A data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale. With a data lake built on Amazon Simple Storage Service (S3), you can easily run big data analytics and use machine learning to gain insights from your semi-structured (such as JSON, XML) and unstructured datasets.

Today, we are launching two new features to help you improve the way you manage your data warehouse and integrate with a data lake:

  • Data Lake Export to unload data from a Redshift cluster to S3 in Apache Parquet format, an efficient open columnar storage format optimized for analytics.
  • Federated Query to be able, from a Redshift cluster, to query across data stored in the cluster, in your S3 data lake, and in one or more Amazon Relational Database Service (RDS) for PostgreSQL and Amazon Aurora PostgreSQL databases.

This architectural diagram gives a quick summary of how these features work and how they can be used together with other AWS services.

Let’s explain the interactions you see in the diagram better, starting from how you can use these features, and the advantages they provide.

Using Redshift Data Lake Export

You can now unload the result of a Redshift query to your S3 data lake in Apache Parquet format. The Parquet format is up to 2x faster to unload and consumes up to 6x less storage in S3, compared to text formats. This enables you to save data transformation and enrichment you have done in Redshift into your S3 data lake in an open format.

You can then analyze the data in your data lake with Redshift Spectrum, a feature of Redshift that allows you to query data directly from files on S3. Or you can use different tools such as Amazon Athena, Amazon EMR, or Amazon SageMaker.

To try this new feature, I create a new cluster from the Redshift console, and follow this tutorial to load sample data that keeps track of sales of musical events across different venues. I want to correlate this data with social media comments on the events stored in my data lake. To understand their relevance, each event should have a way of comparing its relative sales to other events.

Let’s build a query in Redshift to export the data to S3. My data is stored across multiple tables. I need to create a query that gives me a single view of what is going on with sales. I want to join the content of the  sales and date tables, adding information on the gross sales for an event (total_price in the query), and the percentile in terms of all time gross sales compared to all events.

To export the result of the query to S3 in Parquet format, I use the following SQL command:

UNLOAD ('SELECT sales.*, date.*, total_price, percentile
           FROM sales, date,
                (SELECT eventid, total_price, ntile(1000) over(order by total_price desc) / 10.0 as percentile
                   FROM (SELECT eventid, sum(pricepaid) total_price
                           FROM sales
                       GROUP BY eventid)) as percentile_events
          WHERE sales.dateid = date.dateid
            AND percentile_events.eventid = sales.eventid')
TO 's3://MY-BUCKET/DataLake/Sales/'
FORMAT AS PARQUET
CREDENTIALS 'aws_iam_role=arn:aws:iam::123412341234:role/myRedshiftRole';

To give Redshift write access to my S3 bucket, I am using an AWS Identity and Access Management (IAM) role. I can see the result of the UNLOAD command using the AWS Command Line Interface (CLI). As expected, the output of the query is exported using the Parquet columnar data format:

$ aws s3 ls s3://MY-BUCKET/DataLake/Sales/
2019-11-25 14:26:56 1638550 0000_part_00.parquet
2019-11-25 14:26:56 1635489 0001_part_00.parquet
2019-11-25 14:26:56 1624418 0002_part_00.parquet
2019-11-25 14:26:56 1646179 0003_part_00.parquet

To optimize access to data, I can specify one or more partition columns so that unloaded data is automatically partitioned into folders in my S3 bucket. For example, I can unload sales data partitioned by year, month, and day. This enables my queries to take advantage of partition pruning and skip scanning irrelevant partitions, improving query performance and minimizing cost.

To use partitioning, I need to add to the previous SQL command the PARTITION BY option, followed by the columns I want to use to partition the data in different directories. In my case, I want to partition the output based on the year and the calendar date (caldate in the query) of the sales.

UNLOAD ('SELECT sales.*, date.*, total_price, percentile
           FROM sales, date,
                (SELECT eventid, total_price, ntile(1000) over(order by total_price desc) / 10.0 as percentile
                   FROM (SELECT eventid, sum(pricepaid) total_price
                           FROM sales
                       GROUP BY eventid)) as percentile_events
          WHERE sales.dateid = date.dateid
            AND percentile_events.eventid = sales.eventid')
TO 's3://MY-BUCKET/DataLake/SalesPartitioned/'
FORMAT AS PARQUET
PARTITION BY (year, caldate)
CREDENTIALS 'aws_iam_role=arn:aws:iam::123412341234:role/myRedshiftRole';

This time, the output of the query is stored in multiple partitions. For example, here’s the content of a folder for a specific year and date:

$ aws s3 ls s3://MY-BUCKET/DataLake/SalesPartitioned/year=2008/caldate=2008-07-20/
2019-11-25 14:36:17 11940 0000_part_00.parquet
2019-11-25 14:36:17 11052 0001_part_00.parquet
2019-11-25 14:36:17 11138 0002_part_00.parquet
2019-11-25 14:36:18 12582 0003_part_00.parquet

Optionally, I can use AWS Glue to set up a Crawler that (on demand or on a schedule) looks for data in my S3 bucket to update the Glue Data Catalog. When the Data Catalog is updated, I can easily query the data using Redshift Spectrum, Athena, or EMR.

The sales data is now ready to be processed together with the unstructured and semi-structured  (JSON, XML, Parquet) data in my data lake. For example, I can now use Apache Spark with EMR, or any Sagemaker built-in algorithm to access the data and get new insights.

Using Redshift Federated Query
You can now also access data in RDS and Aurora PostgreSQL stores directly from your Redshift data warehouse. In this way, you can access data as soon as it is available. Straight from Redshift, you can now perform queries processing data in your data warehouse, transactional databases, and data lake, without requiring ETL jobs to transfer data to the data warehouse.

Redshift leverages its advanced optimization capabilities to push down and distribute a significant portion of the computation directly into the transactional databases, minimizing the amount of data moving over the network.

Using this syntax, you can add an external schema from an RDS or Aurora PostgreSQL database to a Redshift cluster:

CREATE EXTERNAL SCHEMA IF NOT EXISTS online_system
FROM POSTGRES
DATABASE 'online_sales_db' SCHEMA 'online_system'
URI ‘my-hostname' port 5432
IAM_ROLE 'iam-role-arn'
SECRET_ARN 'ssm-secret-arn';

Schema and port are optional here. Schema will default to public if left unspecified and default port for PostgreSQL databases is 5432. Redshift is using AWS Secrets Manager to manage the credentials to connect to the external databases.

With this command, all tables in the external schema are available and can be used by Redshift for any complex SQL query processing data in the cluster or, using Redshift Spectrum, in your S3 data lake.

Coming back to the sales data example I used before, I can now correlate the trends of my historical data of musical events with real-time sales. In this way, I can understand if an event is performing as expected or not, and calibrate my marketing activities without delays.

For example, after I define the online commerce database as the online_system external schema in my Redshift cluster, I can compare previous sales with what is in the online commerce system with this simple query:

SELECT eventid,
       sum(pricepaid) total_price,
       sum(online_pricepaid) online_total_price
  FROM sales, online_system.current_sales
 GROUP BY eventid
 WHERE eventid = online_eventid;

Redshift doesn’t import database or schema catalog in its entirety. When a query is run, it localizes the metadata for the Aurora and RDS tables (and views) that are part of the query. This localized metadata is then used for query compilation and plan generation.

Available Now
Amazon Redshift data lake export is a new tool to improve your data processing pipeline and is supported with Redshift release version 1.0.10480 or later. Refer to the AWS Region Table for Redshift availability, and check the version of your clusters.

The new federation capability in Amazon Redshift is released as a public preview and allows you to bring together data stored in Redshift, S3, and one or more RDS and Aurora PostgreSQL databases. When creating a cluster in the Amazon Redshift management console, you can pick three tracks for maintenance: Current, Trailing, or Preview. Within the Preview track, preview_features should be chosen to participate to the Federated Query public preview. For example:

These features simplify data processing and analytics, giving you more tools to react quickly, and a single point of view for your data. Let me know what you are going to use them for!

Danilo

Highlight the breadth of your data and analytics technical expertise with new AWS Certification beta

Post Syndicated from Beth Shepherd original https://aws.amazon.com/blogs/big-data/highlight-the-breadth-of-your-data-and-analytics-technical-expertise-with-new-aws-certification-beta/

AWS offers the broadest set of analytic tools and engines that analyzes data using open formats and open standards. To validate expertise with AWS data analytics solutions, builders can now take the beta for the AWS Certified Data Analytics — Specialty certification.

The AWS Certified Data Analytics — Specialty certification validates technical expertise with designing, building, securing, and maintaining analytics solutions on AWS. This certification first launched in 2017 as AWS Certified Big Data — Specialty. The new name highlights the breadth of the data and analytics technical skills and experience validated by the certification. Candidates who take and pass the beta exam available for registration now or the general availability release in April 2020 will earn a certification with the new name.

The new exam version includes updated content across categories from collection to visualization. View the exam guide to learn more about what’s on the exam.

This is the only AWS Certification to specifically focus on data analytics expertise. This certification demonstrates your ability to design and implement analytics solutions that provide insight through visualizing data with the appropriate security measures and automation in place.

You can take the AWS Certified Data Analytics — Specialty beta exam at testing centers worldwide through January 10, 2020, and onsite at re:Invent 2019. Space is limited, so register today. The beta exam is available in English for 150 USD, which is 50% off the standard pricing for Specialty-level certifications. Results for the beta exam will be available approximately 90 days after the end of the beta period. If you miss the beta, the standard version is expected in April 2020.


About the Authors

Beth Shepherd is the Product Marketing Manager for AWS Certification. She joined Amazon in 2019 and is based in Boston.

 

 

 

 

Bill Baldwin is the Head of AWS Data Days and the WW Tech Leader AWS Databases. He has been with Amazon since 2016 and is based in Atlanta.

 

 

 

Extract, Transform and Load data into S3 data lake using CTAS and INSERT INTO statements in Amazon Athena

Post Syndicated from Pathik Shah original https://aws.amazon.com/blogs/big-data/extract-transform-and-load-data-into-s3-data-lake-using-ctas-and-insert-into-statements-in-amazon-athena/

Amazon Athena is an interactive query service that makes it easy to analyze the data stored in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. You can reduce your per-query costs and get better performance by compressing, partitioning, and converting your data into columnar formats. To learn more about best practices to boost query performance and reduce costs, see Top 10 Performance Tuning Tips for Amazon Athena.

Overview

This blog post discusses how to use Athena for extract, transform and load (ETL) jobs for data processing. This example optimizes the dataset for analytics by partitioning it and converting it to a columnar data format using Create Table as Select (CTAS) and INSERT INTO statements.

CTAS statements create new tables using standard SELECT queries to filter data as required. You can also partition the data, specify compression, and convert the data into columnar formats like Apache Parquet and Apache ORC using CTAS statements. As part of the execution, the resultant tables and partitions are added to the AWS Glue Data Catalog, making them immediately available for subsequent queries.

INSERT INTO statements insert new rows into a destination table based on a SELECT query statement that runs on a source table. If the source table’s underlying data is in CSV format and destination table’s data is in Parquet format, then INSERT INTO can easily transform and load data into destination table’s format. CTAS and INSERT INTO statements can be used together to perform an initial batch conversion of data as well as incremental updates to the existing table.

Here is an overview of the ETL steps to be followed in Athena for data conversion:

  1. Create a table on the original dataset.
  2. Use a CTAS statement to create a new table in which the format, compression, partition fields and location of the new table can be specified.
  3. Add more data into the table using an INSERT INTO statement.

This example uses a subset of NOAA Global Historical Climatology Network Daily (GHCN-D), a publicly available dataset on Amazon S3, in this example.

This subset of data is available at the following S3 location:

s3://aws-bigdata-blog/artifacts/athena-ctas-insert-into-blog/
Total objects: 41727 
Size of CSV dataset: 11.3 GB
Region: us-east-1

Procedure

Follow these steps to use Athena for an ETL job.

Create a table based on original dataset

The original data is in CSV format with no partitions in Amazon S3. The following files are stored at the Amazon S3 location:

2019-10-31 13:06:57  413.1 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0000
2019-10-31 13:06:57  412.0 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0001
2019-10-31 13:06:57   34.4 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0002
2019-10-31 13:06:57  412.2 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0100
2019-10-31 13:06:57  412.7 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0101

Note that the file sizes are pretty small. Merging them into larger files and reducing total number of files would lead to faster query execution. CTAS and INSERT INTO can help achieve this.

To execute queries in the Athena console (preferably in us-east-1 to avoid inter-region Amazon S3 data transfer charges). First, create a database for this demo:

CREATE DATABASE blogdb

Now, create a table from the data above.

CREATE EXTERNAL TABLE `blogdb`.`original_csv` (
  `id` string, 
  `date` string, 
  `element` string, 
  `datavalue` bigint, 
  `mflag` string, 
  `qflag` string, 
  `sflag` string, 
  `obstime` bigint)
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY ',' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://aws-bigdata-blog/artifacts/athena-ctas-insert-into-blog/'

Use CTAS to partition data and convert into parquet format with snappy compression

Now, convert the data to Parquet format with Snappy compression and partition the data on a yearly basis. All these actions are performed using the CTAS statement. For the purpose of this blog, the initial table only includes data from 2015 to 2019. You can add new data to this table using the INSERT INTO command.

The table created in Step 1 has a date field with the date formatted as YYYYMMDD (e.g. 20100104). The new table is partitioned on year. Extract the year value from the date field using the Presto function substr(“date”,1,4).

CREATE table new_parquet
WITH (format='PARQUET', 
parquet_compression='SNAPPY', 
partitioned_by=array['year'], 
external_location = 's3://your-bucket/optimized-data/') 
AS
SELECT id,
         date,
         element,
         datavalue,
         mflag,
         qflag,
         sflag,
         obstime,
         substr("date",1,4) AS year
FROM original_csv
WHERE cast(substr("date",1,4) AS bigint) >= 2015
        AND cast(substr("date",1,4) AS bigint) <= 2019

Once the query is successful, check the Amazon S3 location specified in the CTAS statement above. You should be able to see partitions and parquet files in each of these partitions, as shown in the following examples:

  1. Partitions:
    $ aws s3 ls s3://your-bucket/optimized-data/
                               PRE year=2015/
                               PRE year=2016/
                               PRE year=2017/
                               PRE year=2018/
                               PRE year=2019/

  2. Parquet files:
    $ aws s3 ls s3://your-bucket/optimized-data/ --recursive --human-readable | head -5
    
    2019-10-31 14:51:05    7.3 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_1be48df2-3154-438b-b61d-8fb23809679d
    2019-10-31 14:51:05    7.0 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_2a57f4e2-ffa0-4be3-9c3f-28b16d86ed5a
    2019-10-31 14:51:05    9.9 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_34381db1-00ca-4092-bd65-ab04e06dc799
    2019-10-31 14:51:05    7.5 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_354a2bc1-345f-4996-9073-096cb863308d
    2019-10-31 14:51:05    6.9 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_42da4cfd-6e21-40a1-8152-0b902da385a1

Add more data into table using INSERT INTO statement

Now, add more data and partitions into the new table created above. The original dataset has data from 2010 to 2019. Since you added 2015 to 2019 using CTAS, add the rest of the data now using an INSERT INTO statement:

INSERT INTO new_parquet
SELECT id,
         date,
         element,
         datavalue,
         mflag,
         qflag,
         sflag,
         obstime,
         substr("date",1,4) AS year
FROM original_csv
WHERE cast(substr("date",1,4) AS bigint) < 2015

List the Amazon S3 location of the new table:

 $ aws s3 ls s3://your-bucket/optimized-data/
                           PRE year=2010/
                           PRE year=2011/
                           PRE year=2012/
                           PRE year=2013/
                           PRE year=2014/
                           PRE year=2015/
                           PRE year=2016/
                           PRE year=2017/
                           PRE year=2018/
                           PRE year=2019/ 

You can see that INSERT INTO is able to determine that “year” is a partition column and writes the data to Amazon S3 accordingly. There is also a significant reduction in the total size of the dataset thanks to compression and columnar storage in the Parquet format:

Size of dataset after parquet with snappy compression - 1.2 GB

You can also run INSERT INTO statements if more CSV data is added to original table. Assume you have new data for the year 2020 added to the original Amazon S3 dataset. In that case, you can run the following INSERT INTO statement to add this data and the relevant partition(s) to the new_parquet table:

INSERT INTO new_parquet
SELECT id,
         date,
         element,
         datavalue,
         mflag,
         qflag,
         sflag,
         obstime,
         substr("date",1,4) AS year
FROM original_csv
WHERE cast(substr("date",1,4) AS bigint) = 2020

Query the results

Now that you have transformed data, run some queries to see what you gained in terms of performance and cost optimization:

First, find the number of distinct IDs for every value of the year:

    1. Query on the original table:
      SELECT substr("date",1,4) as year, 
             COUNT(DISTINCT id) 
      FROM original_csv 
      GROUP BY 1 ORDER BY 1 DESC

    2. Query on the new table:
      SELECT year, 
        COUNT(DISTINCT id) 
      FROM new_parquet 
      GROUP BY  1 ORDER BY 1 DESC

      Original table

      New table

      Savings

      Run timeData scannedCost

      Run

      Time

      Data

      Scanned

      Cost
      16.88 seconds11.35 GB$0.05673.79 seconds428.05 MB$0.00214577.5% faster and 96.2% cheaper

       

Next, calculate the average maximum temperature (Celsius), average minimum temperature (Celsius), and average rainfall (mm) for the Earth in 2018:

      1. Query on the original table:
        SELECT element, round(avg(CAST(datavalue AS real)/10),2) AS value
        FROM original_csv
        WHERE element IN ('TMIN', 'TMAX', 'PRCP') AND substr("date",1,4) = '2018'
        GROUP BY  1

      1. Query on the new table:
        SELECT element, round(avg(CAST(datavalue AS real)/10),2) AS value
        FROM new_parquet 
        WHERE element IN ('TMIN', 'TMAX', 'PRCP') and year = '2018'
        GROUP BY  1

        Original tableNew tableSavings
        Run timeData scannedCost

        Run

        Time

        Data

        Scanned

        Cost
        18.65 seconds11.35 GB$0.05671.92 seconds68.08 MB$0.00034590% faster and 99.4% cheaper

         

Conclusion

This post showed you how to perform ETL operations using CTAS and INSERT INTO statements in Athena. You can perform the first set of transformations using a CTAS statement. When new data arrives, use an INSERT INTO statement to transform and load data to the table created by the CTAS statement. Using this approach, you converted data to the Parquet format with Snappy compression, converted a non-partitioned dataset to a partitioned dataset, reduced the overall size of the dataset and lowered the costs of running queries in Athena.

 


About the Author

 Pathik Shah is a big data architect for Amazon EMR at AWS.