Tag Archives: AWS Big Data

How FactSet automated exporting data from Amazon DynamoDB to Amazon S3 Parquet to build a data analytics platform

Post Syndicated from Arvind Godbole original https://aws.amazon.com/blogs/big-data/how-factset-automated-exporting-data-from-amazon-dynamodb-to-amazon-s3-parquet-to-build-a-data-analytics-platform/

This is a guest post by Arvind Godbole, Lead Software Engineer with FactSet and Tarik Makota, AWS Principal Solutions Architect. In their own words “FactSet creates flexible, open data and software solutions for tens of thousands of investment professionals around the world, which provides instant access to financial data and analytics that investors use to make crucial decisions. At FactSet, we are always working to improve the value that our products provide.”

One area that we’ve been looking into is the relevancy of search results for our clients. Given the wide variety of client use cases and the large number of searches per day, we needed a platform to store anonymized usage data and allow us to analyze that data to boost results using our custom scoring algorithm. Amazon EMR was the obvious choice to host the calculations, but the question arose on how to get our anonymized data into a form that Amazon EMR could use. We worked with AWS and chose to use Amazon DynamoDB to prepare the data for usage in Amazon EMR.

This post walks you through how FactSet takes data from a DynamoDB table and converts that data into Apache Parquet. We store the Parquet files in Amazon S3 to enable near real-time analysis with Amazon EMR. Along the way, we encountered challenges related to data type conversion, which we will explain and show how we were able to overcome these.

Workflow overview

Our workflow contained the following steps:

  1. Anonymized log data is stored into DynamoDB tables. These entries have different fields, depending on how the logs were generated. Whenever we create items in the tables, we use DynamoDB Streams to write out a record. The stream records contain information from a single item in a DynamoDB table.
  2. An AWS Lambda function is hooked into the DynamoDB stream to capture the new items stored in a DynamoDB table. We built our Lambda function off of the lambda-streams-to-firehose project on GitHub to convert the DynamoDB stream image to JSON, which we stringify and push to Amazon Kinesis Data Firehose.
  3. Kinesis Data Firehose transforms the JSON data into Parquet using data contained within an AWS Glue Data Catalog table.
  4. Kinesis Data Firehose stores the Parquet files in S3.
  5. An AWS Glue crawler discovers the schema of DynamoDB items and stores the associated metadata into the Data Catalog.

The following diagram illustrates this workflow.

AWS Glue provides tools to help with data preparation and analysis. A crawler can run on a DynamoDB table to take inventory of the table data and store that information in a Data Catalog. Other services can use the Data Catalog as an index to the location, schema, and types of the table data. There are other ways to add metadata into a Data Catalog, but the key idea is that you can update and modify the metadata easily. For more information, see Populating the AWS Glue Data Catalog.

Problem: Data type disparities

Using a variety of technologies to build a solution often requires mapping and converting data types between these technologies. The cloud is no exception. In our case, log items stored in DynamoDB contained attributes of type String Set. String Set values caused data conversion exceptions when Kinesis tried to transform the data to Parquet. After investigating the problem, we found the following:

  • As the crawler indexes the DynamoDB table, Set data types (StringSet, NumberSet) are stored in the Glue metadata catalog as set<string> and set<bigint>.
  • Kinesis Data Firehose uses that same catalog when it performs the conversion to Apache Parquet. The conversion requires valid Hive data types.
  • set<string> and set<bigint> are not valid Hive data types, so the conversion fails, and an exception is generated. The exception looks similar to the following code:
    [{
       "lastErrorCode": "DataFormatConversion.InvalidSchema",
       "lastErrorMessage": "The schema is invalid. Error parsing the schema: Error: type expected at the position 38 of 'array,used:bigint>>' but 'set' is found."
    }]

Solution: Construct data mapping

While working with the AWS team, we confirmed that the Kinesis Data Firehose converter needs valid Hive data types in the Data Catalog to succeed. When it comes to complex data types, Hive doesn’t support set<data_type>, but it does support the following:

  • ARRAY<data_type>
  • MAP<primitive_type, data_type
  • STRUCT<col_name : data_type [COMMENT col_comment], ...>
  • UNIONTYPE<data_type, data_type, ...>

In our case, this meant that we must convert set<string> and set<bigint> into array<string> and array<bigint>. Our first step was to manually change the types directly in the Data Catalog. After we updated the Data Catalog to change all occurrences of set<data_type> to array<data_type>, the Kinesis transformation to Parquet completed successfully.

Our business case calls for a data store that can store items with different attributes in the same table and the addition of new attributes on-the-fly. We took advantage of DynamoDB’s schema-less nature and ability to scale up and down on-demand so we could focus on our functionality and not the management of the underlying infrastructure. For more information, see Should Your DynamoDB Table Be Normalized or Denormalized?

If our data had a static schema, a manual change would be good enough. Given our business case, a manual solution wasn’t going to scale. Every time we introduced new attributes to the DynamoDB table, we needed to run the crawler, which re-created the metadata and overwrote the change.

Serverless event architecture

To automate the data type updates to the Data Catalog, we used Amazon EventBridge and Lambda to implement the modifications to the data type mapping. EventBridge is a serverless event bus that connects applications using events. An event is a signal that a system’s state has changed, such as the status of a Data Catalog table.

The following diagram shows the previous workflow with the new architecture.

  1. The crawler stays as-is and crawls the DynamoDB table to obtain the metadata.
  2. The metadata obtained by the crawler is stored in the Data Catalog. Previous metadata is updated or removed, and changes (manual or automated) are overwritten.
  3. The event GlueTableChanged in EventBridge listens to any changes to the Data Catalog tables. After we receive the event that there was a change to the table, we trigger the Lambda function.
  4. The Lambda function uses AWS SDK to update the Glue Catalog table using the glue.update_table() API to replace occurrences of set<data_type> with array<data_type>.

To set up EventBridge, we set Event pattern to be “Pre-defined pattern by service”. For service provider, we selected AWS and Glue as service. Event Type we selected “Glue Data Catalog Table State Change”. The following screenshot shows the EventBridge configuration that sends events to the Lambda function that updates the Data Catalog.

The following is the baseline Lambda code:

# This is NOT production worthy code please modify and implement error handling routines as appropriate
import json
import logging
import boto3

glue = boto3.client('glue')

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Define subsegments manually
def table_contains_set(databaseName, tableName):
    
    # returns Glue Catalog description for Table structure
    response = glue.get_table( DatabaseName=databaseName,Name=tableName)
    logger.info(response)  
    
    # loop thru all the Columns of the table 
    isModified = False
    for i in response['Table']['StorageDescriptor']['Columns']: 
        logger.info("## Column: " + str(i['Name']))
        # if Column datatype starts with set< then change it to array<
        if i['Type'].find("set<") != -1:
            i['Type'] = i['Type'].replace("set<", "array<")
            isModified = True
            logger.info(i['Type'])
    
    if isModified:
        # following 3 statements simply clean up the response JSON so that update_table API call works
        del response['Table']['DatabaseName']
        del response['Table']['CreateTime']
        del response['Table']['UpdateTime']
        glue.update_table(DatabaseName=databaseName,TableInput=response['Table'],SkipArchive=True)
        
    logger.info("============ ### =============") 
    logger.info(response)
    
    return True
    
def lambda_handler(event, context):
    logger.info('## EVENT')
    # logger.info(event)
    # This is Sample of the event payload that would be received
    # { 'version': '0', 
    #   'id': '2b402842-21f5-1d76-1a9a-c90076d1d7da', 
    #   'detail-type': 'Glue Data Catalog Table State Change', 
    #   'source': 'aws.glue', 
    #   'account': '1111111111', 
    #   'time': '2019-08-18T02:53:41Z', 
    #   'region': 'us-east-1', 
    #   'resources': ['arn:aws:glue:us-east-1:111111111:table/ddb-glue-fh/ddb_glu_fh_sample'], 
    #   'detail': {
    #           'databaseName': 'ddb-glue-fh', 
    #           'changedPartitions': [], 
    #           'typeOfChange': 'UpdateTable', 
    #           'tableName': 'ddb_glu_fh_sample'
    #    }
    # }
    
    # get the database and table name of the Glue table triggered the event
    databaseName = event['detail']['databaseName']
    tableName = event['detail']['tableName']
    logger.info("DB: " + databaseName + " | Table: " + tableName)
    
    table_contains_set(databaseName, tableName)
   
    # TODO implement and modify
    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }

The Lambda function is straightforward; this post provides a basic skeleton. You can use this as a template to implement your own functionality for your specific data.

Conclusion

Simple things such as data type conversion and mapping can create unexpected outcomes and challenges when data crosses service boundaries. One of the advantages of AWS is the wide variety of tools with which you can create robust and scalable solutions tailored to your needs. Using event-driven architecture, we solved our data type conversion errors and automated the process to eliminate the issue as we move forward.

 


About the Authors

Arvind Godbole is a Lead Software Engineer at FactSet Research Systems. He has experience in building high-performance, high-availability client facing products and services, ranging from real-time financial applications to search infrastructure. He is currently building an analytics platform to gain insights into client workflows. He holds a B.S. in Computer Engineering from the University of California, San Diego

 

 

 

Tarik Makota is a Principal Solutions Architect with the Amazon Web Services. He provides technical guidance, design advice and thought leadership to AWS’ customers across US Northeast. He holds an M.S. in Software Development and Management from Rochester Institute of Technology.

 

 

Amazon Redshift at re:Invent 2019

Post Syndicated from Corina Radovanovich original https://aws.amazon.com/blogs/big-data/amazon-redshift-at-reinvent-2019/

The annual AWS re:Invent learning conference is an exciting time full of new product and program launches. At the first re:Invent conference in 2012, AWS announced Amazon Redshift. Since then, tens of thousands of customers have started using Amazon Redshift as their cloud data warehouse. In 2019, AWS shared several significant launches and dozens of sessions. This post presents highlights of what happened with Amazon Redshift at re:Invent 2019.

Andy Jassy’s AWS re:Invent 2019 keynote

When Andy Jassy takes the stage to talk about what’s new at AWS, he launches the new Amazon Redshift node type, RA3 with managed storage; the new Federated Query (preview) feature, Export to Data Lake; and Advanced Query Accelerator (AQUA) (preview) for Amazon Redshift. Watch AWS re:Invent 2019 – Keynote with Andy Jassy on YouTube, or jump ahead for the Amazon RedShift announcements.

Deep dive and best practices for Amazon Redshift

Every year the Amazon Redshift deep dive session rates highly, and people continue to watch and re-watch it after the event. This year was no different. Specialist Solution Architects Harshida Patel and Tony Gibbs take an in-depth look at best practices for data warehousing with Amazon Redshift. It’s a must-see for existing Amazon Redshift users. Watch AWS re:Invent 2019: Deep dive and best practices for Amazon Redshift (ANT418) on YouTube.

What’s new with Amazon Redshift, featuring Yelp and Workday

With over 200 new features and capabilities launched in the last 18 months, there’s a lot to cover in a session about what’s new with Amazon Redshift. Join one of the Product Managers driving RA3 and managed storage, Himanshu Raja, to catch up on the recent performance, concurrency, elasticity, and manageability enhancements behind Amazon Redshift’s record price-to-performance ratio. You also get more insight into the architectural evolution of Amazon Redshift with RA3 and managed storage, and how it uses machine learning to create a self-optimizing data warehouse. In the second half of the session, you hear from Steven Moy, a software engineer at Yelp, about how Amazon Redshift’s latest features have helped Yelp achieve optimization and scale for an organization with an enormous about of data and sophisticated analytics. Watch AWS re:Invent 2019: What’s new with Amazon Redshift, featuring Yelp (ANT320-R1) on YouTube.

The session repeated with Michalis Petropoulos, Director of Engineering, and Erol Guney of Workday. Watch the full session to get a slightly different take on what’s new, or jump to the customer presentation to hear from Erol Guney, Architect, Data Platform, at Workday about how Amazon Redshift empowers their Data as a Service product team to focus on architecture goals and business logic.

Migrate your data warehouse to the cloud in record time, featuring Nielsen and Fannie Mae

In this session, you learn about important concepts and tips for migrating your legacy on-premise data warehouse to the cloud. You hear from Tejas Desai, VP of Technology at Neilsen, about their migration journey and benefits. Watch AWS re:Invent 2019: Migrate your data warehouse to the cloud, featuring Nielsen (ANT334-R1) on YouTube.

The repeat of this session features Amy Tseng from Fannie Mae. If you don’t want to listen to Tony’s overview again, skip ahead to learn how Fannie Mae embraced a data lake architecture with Amazon Redshift for analytics to save costs, maximize performance, and scale. Amy’s presentation was a crowd favorite, with some of the most positive customer feedback and a wealth of great information about how Fannie Mae managed their migration.

How to scale data analytics with Amazon Redshift, featuring Duolingo and Warner Brothers Interactive Entertainment

Data is growing fast, and so is the value that business users need to gain from business data. When AWS first announced Amazon Redshift in 2012, it could handle up to 640 TB of compressed data. It can now scale to 8 PB of compressed data. Learn more about Amazon Redshift’s unique ability to deliver top performance at the lowest and most predictable cost from Vinay Shukla, Principal Product Manager. This is an especially important session if you want to learn more about the newest Amazon Redshift node type RA3. You also hear from Jonathan Burket of Duolingo about their experience in the preview of RA3 nodes and how Duolingo uses Amazon Redshift. Duolingo is a wildly popular language-learning platform and the most downloaded education app in the world, with over 300 million users. Enabling data-driven decisions with A/B tests and ad hoc analysis has been a driver of their success. Watch AWS re:Invent 2019: How to scale data analytics with Amazon Redshift (ANT335-R2) on YouTube.

The repeat session features Redshift Product Manager Maor Kleider with an in-depth case study from Matt Howell, Executive Director, Analytics, and Kurt Larson, Technical Director, Analytics, at Warner Brothers Interactive Entertainment. Watch the full session for another perspective about how to scale with the latest Amazon Redshift features, with unique insights about analytics across Amazon Redshift and your data lake. You can also jump to the customer presentation. Not only is this session packed with interesting insights about how data analytics drives the success of games like Batman and Mortal Kombat, it also has an action-packed trailer about all the awesome Warner Brothers games.

If you prefer to see a session without the announcements from the keynote and with demos, watch Debu Panda showcase the new Amazon Redshift console and share practical tips about using Amazon Redshift.

Amazon Redshift reimagined: RA3 and AQUA

This embargoed session is the first opportunity to learn more about AQUA for Amazon Redshift, and how it improves query performance to up to 10 times faster. Britt Johnston, Director of Product Management, kicks off with an intro into the next generation of Amazon Redshift, and Senior Principal Engineer Andy Caldwell jumps in to share the origin and vision of the exciting new technology. The enthusiasm Andy feels about sharing AQUA with customers for the first time is palpable. Watch AWS re:Invent 2019: [NEW LAUNCH!] Amazon Redshift reimagined: RA3 and AQUA (ANT230) on YouTube.

State-of-the-art cloud data warehousing, featuring Asurion and Comcast

This session serves as a great introduction to cloud data warehousing at AWS, with insightful presentations from a different customer in each delivery. You can hear from Asurion about how they use data analytics to serve over 300 million people with excellent customer satisfaction scores. You learn about how to use AWS services with Amazon Redshift and why Asurion believes in their data lake-based architecture. Watch AWS re:Invent 2019: State-of-the-art cloud data warehousing, featuring Asurion (ANT213-R1) on YouTube.

In the repeat session, Rajat Garg, Senior Principal Architect from Comcast, talks about moving to Amazon Redshift from a legacy on-premise Oracle Exadata environment. He shares their strategy, approach, and performance improvements.

What’s next and more information

In addition to these sessions at re:Invent, there are also hands-on workshops, intimate builder roundtables, and interactive chalk talks that weren’t recorded.

Keep exploring the following links for more information about new releases:

We hope to see you in Las Vegas for re:Invent 2020, or at one of the hundreds of other AWS virtual and in-person events running around the world. For more information, see AWS Events and Webinars.


About the authors

Corina Radovanovich leads product marketing for cloud data warehousing at AWS.

 

 

 

How Verizon Media Group migrated from on-premises Apache Hadoop and Spark to Amazon EMR

Post Syndicated from Lev Brailovskiy original https://aws.amazon.com/blogs/big-data/how-verizon-media-group-migrated-from-on-premises-apache-hadoop-and-spark-to-amazon-emr/

This is a guest post by Verizon Media Group.

At Verizon Media Group (VMG), one of the major problems we faced was the inability to scale out computing capacity in a required amount of time—hardware acquisitions often took months to complete. Scaling and upgrading hardware to accommodate workload changes was not economically viable, and upgrading redundant management software required significant downtimes and carried a large amount of risk.

At VMG, we depend on technologies such as Apache Hadoop and Apache Spark to run our data processing pipelines. We previously managed our clusters with Cloudera Manager, which was subject to slow release cycles. As a result, we ran older versions of available open-source releases and couldn’t take advantage of the latest bug fixes and performance improvements on Apache projects. These reasons, combined with our already existing investment in AWS, made us explore migrating our distributed computing pipelines to Amazon EMR.

Amazon EMR is a managed cluster platform that simplifies running big data frameworks, such as Apache Hadoop and Apache Spark.

This post discusses the issues we encountered and solved while building a pipeline to address our data processing needs.

About us

Verizon Media is, ultimately, an online advertising company. Most online advertising today is done through display ads, also known as banners or video ads. Regardless of format, all internet ads usually fire various kinds of beacons to tracking servers, which are usually highly scalable web server deployments with a sole responsibility to log received beacons to one or multiple event sinks.

Pipeline architecture

In our group, which deals mostly with video advertising, we use NGINX web servers deployed in multiple geographical locations, which log events fired from our video player directly to Apache Kafka for real-time processing and to Amazon S3 for batch processing. A typical data pipeline in our group involves processing such input feeds, applying validation and enrichment routines, aggregating resulting data, and replicating it to further destinations for reporting purposes. The following diagram shows a typical pipeline that we created.

We start getting data on our NGINX beacon servers. The data is stored in 1-minute intervals on local disk in gzip files. Every minute, we move the data from NGINX servers to raw data location in S3. Upon landing on S3, the file sends a message to Amazon SQS. Apache NiFi is listening to SQS messages to start working on files. During this time, NiFi groups smaller files into larger files and stores the outcome in a special path on a temporary location on S3. The path name is combined using an inverse timestamp to make sure we store data in a random location to avoid reading bottlenecks.

Every hour, we scale out a Spark cluster on Amazon EMR to process the raw data. This processing includes enriching and validating the data. This data is stored in a permanent location folder on S3 in an Apache ORC columnar format. We also update the AWS Glue Data Catalog to expose this data in Amazon Athena in case we need to investigate it for issues. After raw data processing is finished, we downscale the Spark EMR cluster and start aggregating data based on pre-defined aggregation templates using Presto on Amazon EMR. The aggregated data is stored in ORC format in a special location on S3 for aggregated data.

We also update our Data Catalog with the location of the data so we can query it with Athena. Additionally, we replicate the data from S3 into Vertica for our reporting to expose the data to internal and external customers. In this scenario, we use Athena as the disaster recovery (DR) solution for Vertica. Every time our reporting platform sees that Vertica is in bad health, we automatically fail over to Amazon Athena. This solution proved to be extremely cost-effective for us. We have another use case for Athena in our real-time analytics that we do not discuss in this post.

Migration challenges

Migration to Amazon EMR required us to make some design changes to get the best results. When running big data pipelines on the cloud, operational cost optimization is the name of the game. The two major costs are storage and compute. In traditional on-premises Hadoop warehouses, these are coupled as storage nodes that also serve as computation nodes. This can provide a performance advantage due to data locality. However, the disadvantage of this coupling is that any changes to the storage layer, such as maintenance, can also affect the computational layer. In an environment such as AWS, we can decouple storage and computation by using S3 for storage and Amazon EMR for computation. This provides a major flexibility advantage when dealing with cluster maintenance because all clusters are ephemeral.

To further save costs, we had to figure out how to achieve maximum utilization on our computational layer. This meant that we had to switch our platform to using multiple clusters for different pipelines, where each cluster is automatically scaled depending on the pipeline’s needs.

Switching to S3

Running a Hadoop data warehouse on S3 introduces additional considerations. S3 is not a file system like HDFS and does not provide the same immediate consistency guarantees. You can consider S3 as an eventually consistent object store with a REST API to access it.

The rename

A key difference with S3 is that rename is not an atomic operation. All rename operations on S3 run a copy followed by a delete operation. Executing renames on S3 is undesirable due to running time costs. To use S3 efficiently, you must remove the use of any rename operations. Renames are commonly used in Hadoop warehouses at various commit stages, such as moving a temporary directory to its final destination as an atomic operation. The best approach is to avoid any rename operations and instead write data once.

Output committers

Both Spark and Apache MapReduce jobs have commit stages that commit output files produced by multiple distributed workers to final output directories. Explaining how output committers work is beyond the scope of this post, but the important thing is that standard default output committers designed to work on HDFS depend on rename operations, which as explained previously have a performance penalty on storage systems like S3. A simple strategy that worked for us was disabling speculative execution and switching the output committer’s algorithm version. It is also possible to write your own custom committers, which do not depend on renames. For example, as of Amazon EMR 5.19.0, AWS released a custom OutputCommitter for Spark that optimizes writes to S3.

Eventual consistency

One of the major challenges working with S3 is that it is eventually consistent, whereas HDFS is strongly consistent. S3 does offer read-after-write guarantees for PUTS of new objects, but this is not always enough to build consistent distributed pipelines on. One common scenario that comes up a lot in big data processing is one job outputting a list of files to a directory and another job reading from that directory. For the second job to run, it has to list the directory to find all the files it has to read. In S3, there are no directories; we simply list files with the same prefix, which means you might not see all the new files immediately after your first job is finished running.

To address this issue, AWS offers EMRFS, which is a consistency layer added on top of S3 to make it behave like a consistent file system. EMRFS uses Amazon DynamoDB and keeps metadata about every file on S3. In simple terms, with EMRFS enabled when listing an S3 prefix, the actual S3 response is compared to metadata on DynamoDB. If there’s a mismatch, the S3 driver polls a little longer and waits for data to show up on S3.

In general, we found that EMRFS was necessary to ensure data consistency. For some of our data pipelines, we use PrestoDB to aggregate data that is stored on S3, where we chose to run PrestoDB without EMRFS support. While this has exposed us to the eventual consistency risk for our upstream jobs, we found that we can work around these issues by monitoring for discrepancies between downstream and upstream data and rerunning the upstream jobs if needed. In our experience, consistency issues happen very rarely, but they are possible. If you choose to run without EMRFS, you should design your system accordingly.

Automatic scaling strategies

An important and yet in some ways trivial challenge was figuring out how to take advantage of Amazon EMR automatic scaling capabilities. To achieve optimal operational costs, we want to make sure no server is sitting idle.

To achieve that, the answer might seem obvious—create a long-running EMR cluster and use readily available automatic scaling features to control a cluster’s size based on a parameter, such as available free memory on the cluster. However, some of our batch pipelines start every hour, run for exactly 20 minutes, and are computationally very intensive. Because processing time is very important, we want to make sure we don’t waste any time. The optimal strategy for us is to preemptively resize the cluster through custom scripts before particular big batch pipelines start.

Additionally, it would be difficult to run multiple data pipelines on a single cluster and attempt to keep it at optimal capacity at any given moment because every pipeline is slightly different. We have instead opted to run all our major pipelines on independent EMR clusters. This has a lot of advantages and only a minor disadvantage. The advantages are that each cluster can be resized at exactly the required time, run the software version required by its pipeline, and be managed without affecting other pipelines. The minor disadvantage is that there’s a small amount of computational waste by running extra name nodes and task nodes.

When developing an automatic scaling strategy, we first tried to create and drop clusters every time we need to run our pipelines. However, we quickly found that bootstrapping a cluster from scratch can take more time than we’d like. We instead keep these clusters always running, and we upsize the cluster by adding task nodes before the pipeline starts and remove the task nodes as soon as the pipeline ends. We found that by simply adding task nodes, we can start running our pipelines much faster. If we run into issues with long-running clusters, we can quickly recycle and create a new one from scratch. We continue to work with AWS on these issues.

Our custom automatic scaling scripts are simple Python scripts, which usually run before a pipeline starts. For example, assume that our pipeline consists of a simple MapReduce job with a single mapping and reduce phase. Also assume that the mapping phase is more computationally expensive. We can write a simple script that looks at the amount of data that needs to be processed the next hour and figures out the amount of mappers that are needed to process this data in the same way that a Hadoop job does. When we know the amount of mapping tasks, we can decide how many servers we want to run all the mapper tasks in parallel.

When running Spark real-time pipelines, things are a little trickier because we sometimes have to remove computational resources while the application is running. A simple strategy that worked for us is to create a separate real-time cluster in parallel to the existing one, scale it up to a required size based on amount of data processed during the last hour with some extra capacity, and restart the real-time application on the new cluster.

Operational costs

You can evaluate all AWS costs up front with the EC2 calculator. The main costs when running big data pipelines are storage and computation, with some extra minor costs such as DynamoDB when using EMRFS.

Storage costs

The first cost to consider is storage. Because HDFS has a default replication factor of 3, it would require 3 PB of actual storage capacity instead of 1 PB.

Storing 1 GB on S3 costs ±$0.023 per month. S3 is already highly redundant so you don’t need to take the replication factor into account, which reduces our costs immediately by 67%. You should also consider the other costs for write or read requests, but these usually tend to be small.

Computation costs

The second-largest cost after storage is the cost of computation. To reduce computation costs, you should take advantage of reserved instance pricing as much as possible. An m4.4xlarge instance type with 16 VCPUs on AWS costs $0.301 an hour when it is reserved for 3 years, with all fees up-front. An On-Demand Instance costs $0.8 an hour, which is a 62% difference in price. This is easier to achieve in larger organizations that perform regular capacity planning. An extra hourly fee of $0.24 is added to every Amazon EMR machine for the use of the Amazon EMR platform. It is possible to reduce costs even further by using Amazon EC2 Spot Instances. For more information, see Instance Purchasing Options.

To achieve optimal operational costs, try to make sure that your computation clusters are never sitting idle and try to downscale dynamically based on the amount of work your clusters are doing at any given moment.

Final thoughts

We have been operating our big data pipelines on Amazon EMR for over a year and storing all our data on S3. At times, our real-time processing pipelines have peaked at handling more than 2 million events per second, with a total processing latency from the initial event to updated aggregates of 1 minute. We’ve been enjoying the flexibility around Amazon EMR and its ability to tear down and recreate clusters in a matter of minutes. We are satisfied with the overall stability of the Amazon EMR platform and we will continue working with AWS to improve it.

As we have mentioned before, cost is a major factor to consider, and you could argue that it could be cheaper to run Hadoop in your own data centers. However, this argument hinges on your organization’s ability to do so efficiently; it may have hidden operational costs as well as reduce elasticity. We know through first-hand experience that running on-premises is not an undertaking that you should take lightly and requires a lot of planning and maintenance. We believe that platforms such as Amazon EMR bring a lot of advantages when designing big data systems.

Disclaimer: The content and opinions in this post are those of the third-party author and AWS is not responsible for the content or accuracy of this post.


About the authors

Lev Brailovskiy is Director of Engineering leading Service Engineering Group in Supply Side Platform (SSP) at Verizon Media. He has over 15 years of experience designing and building software systems. In the past six years, Lev spent time designing, developing, and running large-scale reporting and data processing software both in private Data Centers and in the public Cloud. He can be be contacted via LinkedIn.

 

 

Zilvinas Shaltys is Technical Lead for the Video Syndication cloud data warehouse platform at Verizon. Zilvinas has years of experience working with a wide variety of big data technologies deployed at considerable scale. He was responsible for migrating big data pipelines from AOL data centers to Amazon EMR. Zilvinas is currently working on improving stability and scalability of existing batch and realtime big data systems. He can be contacted via LinkedIn.

 

Working with nested data types using Amazon Redshift Spectrum

Post Syndicated from Juan Yu original https://aws.amazon.com/blogs/big-data/working-with-nested-data-types-using-amazon-redshift-spectrum/

Redshift Spectrum is a feature of Amazon Redshift that allows you to query data stored on Amazon S3 directly and supports nested data types. This post discusses which use cases can benefit from nested data types, how to use Amazon Redshift Spectrum with nested data types to achieve excellent performance and storage efficiency, and some of the limitations of nested data types.

This post uses a data set generated with dummy data. You can view its table schema. If you’d like to try the dataset, deploy a Redshift cluster, execute the DDLs there, and use the example queries from this post or build your own.

Data modeling

In many scenarios, data is generated in a hierarchy. For example, assume a customer bought several items. For analytic purposes, there are various data modeling approaches to save storage or speed up data processing. One popular approach to achieve storage efficiency is the dimensional model.

The following table shows dummy customer data.

usernamenamesexaddressmailbirthdate
1erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/10
2shepherdlisaMark LeeM754 Michelle Gateway Port Johnstad, ME 35695[email protected]11/10/32
3palmerpaulJennifer MarshallF869 Harrell Forges Apt. 111 East Monica, MO 01243[email protected]3/11/07
4brettmcgeeTravis WilsonM535 Lisa Flat East Andrew, ID 43332[email protected]3/22/10
5torresdianaAshley HoffmanF7815 Lauren Ranch Ambertown, FL 93225[email protected]5/14/60

The following table contains dummy order data, which is linked to the customer table via a foreign key username.

usernametransaction_dateshipping_dateitemsprice
1erin1510/11/1910/13/19104794
2erin1510/11/1910/12/1971697
3erin1510/7/1910/9/19215
4erin1510/6/1910/10/1951744
5erin1510/5/1910/10/1976346

In the dimensional model, each customer’s information is stored only one time. There is no duplicated data, even though a customer could order multiple items at various times.

The dimensional model is optimal for storage. However, it can be challenging to process data efficiently. To get a full picture of your data, you need to join the two tables together to restore the hierarchy.

For example, to find out how many items customer Mark Lee bought and his total spending in the last three months, the query needs to join the customers and orders table. See the following code:

Select c.username, o.transaction_date, o.shipping_date, sum(items), sum(price) 
from customers c inner join orders o on (c.username = o.username) 
where c.name = ‘Mark Lee’ 
and transaction_date > DATEADD(month, -3, GETDATE())
group by 1,2,3;

When there are millions of customers who might buy multiple items in each transaction, the join can be very expensive. A fast-growing dataset can be so large that you need to store it in a distributed system. To perform the join, you need to shuffle data through the network, and the cost becomes even more significant.

As storage becomes cheaper and cheaper, people are starting to use a flattened model. In this model, data is pre-joined to gain processing efficiency. The following table shows that the customer and order information is stored in one record and ready to be analyzed.

usernamenamesexaddressmailbirthdatetransaction_dateshipping_dateitemsprice
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/109/14/1910/12/1921237
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/109/16/1910/9/1984824
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/109/17/1910/10/1994392
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/109/17/1910/9/1931079
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/109/25/1910/7/191208
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/1010/2/1910/5/19103689
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/1010/5/1910/10/1976346
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/1010/6/1910/10/1951744
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/1010/7/1910/9/19215
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/1010/11/1910/13/19104794
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/1010/11/1910/12/1971697
palmerpaulJennifer MarshallF869 Harrell Forges Apt. 111 East Monica, MO 01243[email protected]3/11/079/14/199/22/1964642
palmerpaulJennifer MarshallF869 Harrell Forges Apt. 111 East Monica, MO 01243[email protected]3/11/079/17/199/21/191527
palmerpaulJennifer MarshallF869 Harrell Forges Apt. 111 East Monica, MO 01243[email protected]3/11/0710/9/1910/12/195408
torresdianaAshley HoffmanF7815 Lauren Ranch Ambertown, FL 93225[email protected]5/14/609/17/199/28/1995452

This model also works well on a distributed system. Because each row contains complete information, you can process it on any node, and don’t need to shuffle data. You can also use the columnar format to store data, which allows the query engine to read only the needed columns instead of the whole row. This technique improves analytics performance and is storage efficient.

Both models have their pros and cons. The dimensional model trades compute power for storage efficiency, and the flattened model trades storage for processing efficiency.

Some new data types are available that achieve the best of both. Instead of putting child records into another table, you can nest them into the parent record and get the full information without performing a join. It effectively denormalizes the data without duplicating the parent record.

The following diagram illustrates this workflow.

You can apply this model to a schemaful hierarchy dataset. Continuing with the customer and order example, although a customer might buy multiple items, each order item contains the same type of information, such as product ID, price, and vendor.

The hierarchy is clear and consistent. You can map data to a nested structured schema, which you can store and access efficiently via SQL language.

The following table is a nested data presentation of the previous example.

usernamenamesexaddressmailbirthdatetransaction_dateshipping_dateitemsprice
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/109/14/1910/12/1921237
9/16/1910/9/1984824
9/17/1910/10/1994392
9/17/1910/9/1931079
9/25/1910/7/191208
10/2/1910/5/19103689
10/5/1910/10/1976346
10/6/1910/10/1951744
10/7/1910/9/19215
10/11/1910/13/19104794
10/11/1910/12/1971697
palmerpaulJennifer MarshallF869 Harrell Forges Apt. 111 East Monica, MO 01243[email protected]3/11/079/14/199/22/1964642
9/17/199/21/191527
10/9/1910/12/195408
torresdianaAshley HoffmanF7815 Lauren Ranch Ambertown, FL 93225[email protected]5/14/609/17/199/28/1995452

The following graph compares the storage usage for the three models (all in parquet format).

The graph shows that nested structure is as storage efficient as the dimensional model.

Using nested data types

Nested data types are structured data types for some common data patterns. Nested data types support structs, arrays, and maps.

A struct is similar to a relational table. It groups object properties together. For example, if a customer profile contains their name, address, email, and birthdate, it appears as the following schema:

customer struct<
              username:string,
              name:string,
              sex:string,
              address:string,
              mail:string,
              birthdate:string>

The data appears as the following code:

    "customer": {
        "username": "kevin35",
        "name": "Nancy Alvarez",
        "sex": "F",
        "address": "05472 Kathleen Turnpike\nNew Ashley, NV 84430",
        "mail": "[email protected]",
        "birthdate": "1961-02-05"
    }

An array stores one-to-many relationships. For example, a customer may have multiple shipping addresses or phone numbers. If a customer has several phone numbers, it appears as the following schema:

Phonenumbers array<string>

The data appears as the following code:

[‘555-5555’, ‘555-1234’]

A map is a collection of key-value pairs. You can consider it as a list of struct<key, value> elements. For example, if a customer has particular reward preferences, it appears as the following schema:

preference map<string,boolean> 

The data appears as the following code:

{one_day_delivery=true, 
 coupon=false, 
 free_shipping=true}

Nested data could have another nested data type as a member. The most common one is an array of structs. For example, an order containing multiple items could appear as the following schema:

orders array<
            struct<
              product_id:string,
              price:int,
              onsale:boolean,
              tax:int,
              weight:int,
              others:int,
              vendor:string>
            > 

You can create a complex object by combining them. For example, a customer’s online transaction appears as the following schema:

customer struct<
              username:string,
              name:string,
              sex:string,
              address:string,
              mail:string,
              birthdate:string> , 
  shipping_address array<
                      struct<
                        name:string,
                        street_address:string,
                        city:string,
                        postcode:string>
                      > , 
  creditcard string , 
  transaction_date string , 
  shipping_date string , 
  membership string , 
  preference map<string,boolean> , 
  orders array<
            struct<
              product_id:string,
              price:int,
              onsale:boolean,
              tax:int,
              weight:int,
              others:int,
              vendor:string>
            > , 
  platform string , 
  comments string 

Popular query engines such as Hive, Spark, Presto, and Redshift Spectrum support nested data types. The SQL syntax those engines support can be different. To make it straightforward and consistent, all query examples in this post use Amazon Redshift Spectrum. For more information, see Tutorial: Querying Nested Data with Amazon Redshift Spectrum.

Use cases for nested data types

Nested data types have many benefits: simplify your ETL, data modeling, and achieve the good performance. The following are some common use cases that can benefit from nested data types.

Parent-child relationship

Nested data types keep the parent-child (summary-details) relationship by storing them collocated. This often matches how you want to analyze the data. For example, to analyze customers’ purchasing habits, you may need to find the following:

  • Customers who purchase often but buy only a few items each time. They likely want an annual membership that covers the shipping cost.
  • Customers who purchase less frequently but buy many items in one transaction. They likely expect a free shipping benefit or discount.

You need support information from the orders data, such as how many items, on average, a customer buys per transaction.

To find a list of customers who order online at least once per week, with fewer than four items each time, use the following code:

with purchases as (
select co.customer.username as customer, co.transaction_date as transaction_date, co.customer.address as address,
  (select count(*) from co.orders) as total_items, 
  (select sum(case when onsale = true then 1 else 0 end) from co.orders) as items_onsale
from demo.customer_order_nested_parq co )
select customer, count(transaction_date) as tran_cnt, avg(total_items) 
from purchases 
where total_items <= 3 and items_onsale > 0 
      and transaction_date >= '2019-09-01' 
group by 1 having tran_cnt >= 4;

With the nested order details, per item information is already grouped by customer per transaction. Children aggregation is straightforward; you can aggregate order details to categorize a customer. If you use a denormalized table, you have to do GROUP BY two times. The query could also take longer. See the following code:

with purchases as (
select cc_username as customer, transaction_date, cc_address as address,
  count(*) as total_items, 
  sum(case when co_onsale = true then 1 else 0 end) as items_onsale
from demo.customer_order_flatten_parq 
group by 1,2,3)
select customer, count(transaction_date) as tran_cnt, avg(total_items) 
from purchases 
where total_items <= 3 and items_onsale > 0 
      and transaction_date > '2019-09-01'
group by 1 having tran_cnt >= 4;

To find customers who order only once per quarter with at least 10 items and high total spending, use the following code:

with purchases as (
select co.customer.username as customer, co.transaction_date as transaction_date, 
  (select count(*) from co.orders) as total_items, 
  (select sum(price) from co.orders) as total_spending
from demo.customer_order_nested_parq co )
select customer, count(transaction_date) as tran_cnt, avg(total_spending) from purchases 
where total_items >= 10 and total_spending > 5000 and transaction_date > '2019-07-01' transaction_date < '2019-09-30'
group by 1 having tran_cnt < 2
order by 3 desc;

Another benefit of using nested data types for parent-child data analysis is resource usage reduction. If there are one million customer transactions, there could be over five times the item orders. For example, to find each day how many goods ship to Michigan, use the following code:

select co.shipping_date, sum(coo.weight)
from demo.customer_order_nested_parq co, co.orders coo
where co.customer.address like '%MI 012__'
group by 1
order by 1;

Assuming that 3% of customers ship orders to Michigan, after filtering the customer data, there could be approximately 3% of matching transactions. You only need to process 150 thousand item orders instead of 5 million. This greatly reduces the data to process and the resources to use when compared to a flattened model.

For the parent-child use case, nested data types provide straightforward aggregation on children, more efficient filtering, group by, windowing, and storage saving.

Many-to-many relationship

Customers could buy many items from various vendors, and a vendor could sell a product to many customers. This is a many-to-many relationship.

In a dimensional model, you need three tables: a customers table, an orders table, and a transactions table. To find the top vendors who have the most customers, you need to join the three tables. See the following code:

select vendor, transaction_date, count(distinct cc.username)
from customers cc,
     transactions tt,
     orders oo
where cc.username = tt.username
and oo.transaction_id = tt.transaction_id
and tt.transaction_date >= '2019-01-01' 
group by 1,2
order by 3 desc;

With nested data types, the query is similar to the one using the dimensional model. However, because the orders data is collocated with customer transactions, you can join them on-the-fly without paying the cost. See the following code:

select coo.vendor, co.transaction_date, count(distinct co.customer.username)
from demo.customer_order_nested_parq co, 
co.orders coo
where co.transaction_date > '2019-01-01'
group by 1,2
order by 3 desc;

As another example, your vendor, Smith PLC, had a big sale event on October 10, 2019. You want to find out which customers bought your product during this sale and the top customers who spent the most. To do so, use the following code:

select co.customer.username, count(coo.product_id), sum(coo.price)
from demo.customer_order_nested_parq co, co.orders coo
where co.transaction_date = '2019-10-10'
and (select count(*) from co.orders 
     where vendor = 'Smith PLC' and onsale = true) > 0
group by 1
order by 3 desc;

Compared to the dimensional model query, the nested model is two-to-three times faster. This is on a relatively small dataset with only a few million rows. For a larger dataset, the performance improvement is even greater, and with less resource usage.

Sparse and frequently changed data

Assume that you want to reward customers who order from your online store. For each transaction, the customer can choose one or more rewards, such as free shipping, one-day delivery, a discount, or a coupon. Depending on how effective a reward is, you have to frequently modify the reward types, add new ones, or remove ones that aren’t popular.

If you store the data in a flattened model, there are two common options to track this data. The first method is creating a table with one column for each type of reward. You have to think of all possible rewards at the outset and create those columns. This could lead to a wide table and very sparse data. Alternatively, you can modify your table schema when you want to add or remove a reward type. That adds more maintenance work and you may lose history data. The following table demonstrates this method (all transaction_id data in below table examples are faked one).

transaction_idfree_shippingone_day_deliverydiscountcoupon
pklein35966659391853535FALSETRUETRUE
rebeccawiliams228880139768961FALSETRUE
brooke39180013629693040TRUEFALSETRUETRUE
jchapman4283556333561927FALSETRUEFALSEFALSE
mariamartin3515336516983566FALSEFALSETRUE

The second option is storing one reward per row. This avoids the wide table issue and the burden of constantly updating the schema. The approach is suitable if you only need to analyze a single reward. If you want to see whether there is any correlation between rewards, such as if more customers prefer free shipping and one-day delivery more than a discount and coupon, this option is more complicated. This model also needs more storage. The following table demonstrates this method.

transaction_idrewordtypevalue
pklein35966659391853535free_shippingFALSE
pklein35966659391853535one_day_deliveryTRUE
pklein35966659391853535couponTRUE
rebeccawiliams228880139768961one_day_deliveryFALSE
rebeccawiliams228880139768961couponTRUE
brooke39180013629693040free_shippingTRUE
brooke39180013629693040one_day_deliveryFALSE
brooke39180013629693040discountTRUE
brooke39180013629693040couponTRUE

A compromise is to use a JSON string to store selected rewards together in one column, which avoids schema change. See the following code:

preference varchar(65535)	

The following table shows how the data is stored in JSON string:

transaction_idpreference
pklein35966659391853535{“coupon”:true, “free_shipping”:false,”one_day_delivery”:true}
rebeccawiliams228880139768961{“coupon”:true, one_day_delivery”:false}
brooke39180013629693040{“coupon”:true, “discount”:true, “free_shipping”:true,”one_day_delivery”:false}
jchapman4283556333561927{“coupon”:false, “discount”:false, “free_shipping”:false, “one_day_delivery”:true}
mariamartin3515336516983566{“discount”:true, “free_shipping”:false,”one_day_delivery”:false}

You can analyze it by using a JSON function to extract the reward data. See the following code:

select correlation, count(username) from (
select username,
(case when 
    (json_extract_path_text(preference,'free_shipping')  = 'true' and  
     json_extract_path_text(preference,'one_day_delivery')  = 'true') 
     then 1
 when 
    (json_extract_path_text(preference,'discount') = 'true' and  
     json_extract_path_text(preference,'coupon')  = 'true') 
     then 2
else 0 
 end) as correlation
from demo.transactions
  )
group by 1;

This solution is acceptable, but you could be more storage efficient and more performant by using the nested data type map. See the following code:

preference map<string, boolean>

The following table shows how the data is stored in map:

transaction_idpreference
pklein35966659391853535{coupon=true, free_shipping=false,one_day_delivery=true}
rebeccawiliams228880139768961{coupon=true, one_day_delivery=false}
brooke39180013629693040{coupon=true, discount=true, free_shipping=true,one_day_delivery=false}
jchapman4283556333561927{coupon=false, discount=false, free_shipping=false, one_day_delivery=true}
mariamartin3515336516983566{discount=true, free_shipping=false,one_day_delivery=false}

 

You can analyze a single reward or multiple rewards using SQL. For example, to find how many customers prefer free shipping, use the following code:

select count(distinct co.customer.username)
from demo.customer_order_nested_parq co, co.preference cm
where cm.key = 'free_shipping' and cm.value = true;

To find how many customers prefer free shipping and one-day delivery more than a coupon or discount, use the following code:

with customer_rewards as (
select co.customer.username as customer, 
 (select count(*) from co.preference cm 
where cm.key = 'free_shipping' and cm.value = true) as shipping_pref,
 (select count(*) from co.preference cm 
where cm.key = 'one_day_delivery' and cm.value = true) as delivery_pref,
 (select count(*) from co.preference cm 
where cm.key = 'coupon' and cm.value = true) as coupon_pref,
 (select count(*) from co.preference cm 
where cm.key = 'discount' and cm.value = true) as discount_pref
from demo.customer_order_nested_parq co;
select case when shipping_pref > 0 and delivery_pref > 0 then 1
            when coupon_pref > 0 and discount_pref > 20 then 2
            else 0
       end as correlation, count(customer)
from customer_rewards
group by 1;

The map type allows you to add any key-value pair. You can add a new reward type at any time without a schema change, and you can analyze the new reward right away.

The main advantage of the map type is that it supports flexible schema and eliminates the need to update the schema frequently. However, there is not much performance benefit. If performance is your top priority, a flattened table is recommended. You can also flatten the most-often accessed columns, and use map for the less frequently accessed columns.

Limitations of nested data types

Although nested data types are useful in many use cases, they have the following limitations:

  • There is a hard limit on children size.
  • You can only append, and updating data is difficult and slow. You need to rewrite the entire nested object even if you want to modify one child attribute.
  • Processing is split at the parent record level. You may run into problems if the children data is heavily skewed.
  • The query engine may not support all types of analytics on nested data.
  • Amazon Redshift Spectrum Nested Data Limitations.

Summary

This post discussed the benefits of nested data types and use cases in which nested data types can help improve storage efficiency, performance, or simplify analysis. There are many more use cases in which nested data types can be an ideal solution. Try it out and share your experiences!

 


About the Author

 Juan Yu is a Data Warehouse Specialist Solutions Architect at AWS.

 

 

 

 

Under the hood: Scaling your Kinesis data streams

Post Syndicated from Ahmed Gaafar original https://aws.amazon.com/blogs/big-data/under-the-hood-scaling-your-kinesis-data-streams/

Real-time delivery of data and insights enables businesses to pivot quickly in response to changes in demand, user engagement, and infrastructure events, among many others. Amazon Kinesis offers a managed service that lets you focus on building your applications, rather than managing infrastructure. Scalability is provided out-of-the-box, allowing you to ingest and process gigabytes of streaming data per second. Data replication to three Availability Zones offers high availability and durability. Pricing is based on usage and requires no upfront costs, making Kinesis a cost-effective solution.

Amazon Kinesis Data Streams use a provisioned capacity model. Each data stream is composed of one or more shards that act as units of capacity. Shards make it easy for you to design and scale a streaming pipeline by providing a predefined write and read capacity. As workloads grow, an application may read or write to a shard at a rate that exceeds its capacity, creating a hot shard and requiring you to add capacity quickly. Shards also enable you to parallelize the processing of large datasets and compute results quickly.

This post discusses how to scale your data streams and avoid hot shards. The post first shows you how to estimate the number of shards you need in your data stream as you design your streaming pipeline. Then it looks at reasons that lead to hot shards and how to avoid those using Kinesis Data Streams scaling mechanisms and reviews important metrics for monitoring.

Estimating your stream capacity

The following diagram shows a streaming data pipeline connected to a multiplayer video game. Kinesis Data Streams ingest player scores and other stats. You can filter and enrich the data, and write it to DynamoDB tables that populate the game’s various leaderboards.

As you embark on designing your streaming pipeline, it’s important to set up the data stream with enough capacity to handle producers ingesting the data records producers create, and handle users consuming the same records. You can ingest up to 1 MB per second per shard or 1,000 data records per second per shard for writes. Read capacity is up to 2 MB per second per shard or five read transactions per second. All applications reading from the stream share the read capacity. You can use the enhanced fan-out feature to scale the number of consuming applications and make sure that each has a dedicated 2 MB per second connection.

This post uses the preceding application as an example. It’s estimated that producers create data records at a rate of 20,000 KB per second, and your consumer nodes need to process this same amount of data at the other end of the stream. In addition to handling these rates, it is a good idea to add extra capacity to give the stream headroom for growth.

This headroom also helps your application recover faster in scenarios that could cause a delay or a pause in ingesting or processing data. These scenarios may include:

  • Deploying a new version of a consumer application
  • Transient network issues

As these nodes work to catch up after recovery, they produce or consume records at a higher than standard rate, requiring higher capacity. For this example, you can add 25% percent, or five shards, for headroom. Shards are cost-efficient, but it is up to you how many you want to add.

Scaling scenario

At the time of the game’s release, this capacity is deemed sufficient for the application. Ingestion and processing of the data are both running smoothly, and the game’s leaderboards are populated with current data. It’s now a few weeks after release; the game is steadily gaining popularity and concurrent player numbers are increasing. In a scenario such as this, it’s important to have sufficient monitoring to detect the increased load so you can increase throughput by scaling the stream.

The following diagram provides a simplified view of using CloudWatch metrics to monitor your data streams and triggering scaling operations

In this example, these scaling issues could manifest in delayed leaderboard update reports. Because shards are the capacity units in a data stream, each shard’s capacity is independent of other shards. If the producers write to a single shard at a rate higher than 1 MB per second or 1,000 records per second, that shard becomes a hot shard and requests exceeding that capacity get throttled, leading to a delay in leaderboard updates. This condition can happen while other shards in the stream are underutilized, so if you are monitoring metrics at the stream level, you may not see any cause for concern, because the stream overall is ingesting data at a rate below its total capacity of 25 MB per second. Amazon Kinesis enables you to seamlessly scale your stream without interrupting your streaming pipeline.

Core concepts that enable scaling

You can write records to a data stream using Put APIs. To write a single record, use PutRecord; to write multiple records, use PutRecords. When executing either one, the request to the Kinesis API has to include the following three components:

  • The stream name.
  • The data record to write to the stream. For this post, this is the scoring result of a particular round in the game.
  • A partition key (for example, the fame session).

The following diagram shows multiple producers writing to a Kinesis data stream. The partition key value is used in combination with a hash function to determine the shard a given record will be written to.

The partition key determines to which shard the record is written. The partition key is a Unicode string with a maximum length of 256 bytes. Kinesis runs the partition key value that you provide in the request through an MD5 hash function. The resulting value maps your record to a specific shard within the stream, and Kinesis writes the record to that shard. Partition keys dictate how to distribute data across the stream and use shards.

Certain use cases require you to partition data based on specific criteria for efficient processing by the consuming applications. As an example, if you use player ID pk1234 as the hash key, all scores related to that player route to shard1. The consuming application can use the fact that data stored in shard1 has an affinity with the player ID and can efficiently calculate the leaderboard. An increase in traffic related to players mapped to shard1 can lead to a hot shard. Kinesis Data Streams allows you to handle such scenarios by splitting or merging shards without disrupting your streaming pipeline.

If your use cases do not require data stored in a shard to have high affinity, you can achieve high overall throughput by using a random partition key to distribute data. Random partition keys help distribute the incoming data records evenly across all the shards in the stream and reduce the likelihood of one or more shards getting hit with a disproportionate number of records. You can use a universally unique identifier (UUID) as a partition key to achieve this uniform distribution of records across shards. This strategy can increase the latency of record processing if the consumer application has to aggregate data from multiple shards.

Kinesis Data Streams scaling mechanisms

The stream remains fully functional during these actions. Producers and consumers can continue to read and write to the stream during the scaling process.

Upon receiving the scaling request, Kinesis sets the stream status to Updating. You can use the DescribeStreams API to check the stream status. When the operation is complete, the stream status shows as Active.

The SplitShard  action splits one active shard into two shards, increasing the read and write capacity of the stream. This can be helpful if there is an expected increase in the number of records ingested into the stream or so more Kinesis Data Streams applications can simultaneously read data from the stream for real-time processing.

SplitShard facilitates that process. The hash key space of the parent shard also splits. The two new shards accept new data records and the parent shard stops accepting new records. Existing records in the parent shard are retained for the duration of the stream retention period (the default is 24 hours, configurable up to 7 days). You must specify the new-starting-hash-key value when issuing this command. This value determines the point of the split within the parent shard’s hash key space. In most cases, you want to do an even split. However, you might need to do an uneven split if you have unbalanced shards that you want to rebalance, for example. The following diagram shows the SplitShard process in action.

Many streaming workloads have variable data flow rates that fluctuate over time, sometimes following daily, weekly, or seasonal patterns. As you monitor your data flow rates, you may see underutilized shards that, if merged, still have a data flow rate below the shard limits, yet reduce the cost of the stream.

The MergeShards action merges two adjacent shards, producing one shard. Two shards are considered adjacent if the hash key spaces for the two form a contiguous set with no gaps. When the two shards merge, their hash key spaces also merge. The new shard starts accepting new data records. The two parent shards stop accepting new records and retain existing records up to the stream’s configured retention period. The following diagram shows the MergeShards process in action.

The UpdateShardCount action is useful when you need to scale your stream, up or down, to a specific number of shards, and you provide that number as a parameter in the API call. Scaling in increments of 25% of the current capacity (25%, 50%, 75%, 100%) helps the operation complete faster, but is not required. The command executes a series of SplitShard and MergeShards actions as needed to reach the explicit number of shards you specified. This command splits shard hash key space evenly, creating shards of equal size. There is no option to choose a different value.

Additionally, the Kinesis Scaling Utility available on GitHub provides autoscaling for Kinesis Data Streams by monitoring a stream’s Amazon CloudWatch metrics and scaling it up or down accordingly. It can scale a stream by an explicit shard count or as a percentage of the total fleet. There is no requirement for you to manage the allocation of the key space to shards when using this API; it happens automatically.

Balancing shards

After completing a scaling operation, check the distribution of the hash key space in your stream. In most use cases, the hash key space should be evenly distributed across the shards in the stream. Errors in calculating or inputting a shard’s hash key space starting value can lead to creating new shards with an unusually large or small hash key space. Unusually large shards receive a high number of read and write requests, leading to throttling and underutilizing the unusually small shards.

The output of the ListShards API lists the starting and ending hash key value for each shard in the stream. You can use these values to identify unbalanced shards, and perform the necessary splits or merges to balance them. The Kinesis Scaling Utility can also generate a report of shard key space sizes that can help you achieve the same result. See the following code:

{
    "Shards": [
        {
            "ShardId": "shardId-000000000000", 
            "HashKeyRange": {
                "EndingHashKey": "170141183460469231731687303715884105727", 
                "StartingHashKey": "0"
            }, 
            "SequenceNumberRange": {
                "StartingSequenceNumber": "49600965817078608863948980125442188478720910276534730754"
            }
        }, 
        {
            "ShardId": "shardId-000000000001", 
            "HashKeyRange": {
                "EndingHashKey": "340282366920938463463374607431768211455", 
                "StartingHashKey": "170141183460469231731687303715884105728"
            }, 
            "SequenceNumberRange": {
                "StartingSequenceNumber": "49600965817100909609147510748583724196993558638040711186"
            }
        }
    ]
}

Monitoring your streams to preempt hot shards

As the hot shard scenario has demonstrated, monitoring your data streams at only the stream level does not prepare you for issues at the shard level. Kinesis offers a multitude of stream level and shard level metrics. At the shard level, IncomingBytes and IncomingRecords show you the ingestion rate into the shard. WriteProvisionedThroughputExceeded and ReadProvisionedThroughputExceeded indicate throttled Put and Get requests, respectively. At the stream level, keep an eye on PutRecord.Success, in which the average value reflects the percentage of PutRecord success over time. Paired with proper thresholds for alerting, they should help you take scaling actions proactively in response to flow changes in and out of your streams, and reduce the possibility of developing hot shards.

The following image shows a snapshot of a CloudWatch dashboard with several metrics of a Kinesis data stream.

Conclusion

This post discussed how to simplify the scaling and monitoring of your Kinesis Streams. It’s important to spend some time considering the expected data flow rate for your stream to find the appropriate capacity. Choosing a good partition key strategy helps you take full advantage of the capacity you provision and avoid hot shards. Monitoring your stream metrics and setting alarm thresholds helps you gain the visibility you need to make better scaling decisions.

For more information, see What Is Amazon Kinesis Data Streams? For more information about Kinesis API actions, see Actions.

 


About the Author

 Ahmed Gaafar is a senior technical account manager at AWS.

ETL and ELT design patterns for lake house architecture using Amazon Redshift: Part 2

Post Syndicated from Asim Kumar Sasmal original https://aws.amazon.com/blogs/big-data/etl-and-elt-design-patterns-for-lake-house-architecture-using-amazon-redshift-part-2/

Part 1 of this multi-post series, ETL and ELT design patterns for lake house architecture using Amazon Redshift: Part 1, discussed common customer use cases and design best practices for building ELT and ETL data processing pipelines for data lake architecture using Amazon Redshift Spectrum, Concurrency Scaling, and recent support for data lake export.

This post shows you how to get started with a step-by-step walkthrough of a few ETL and ELT design patterns of Amazon Redshift using AWS sample datasets.

Prerequisites

Before getting started, make sure that you meet the following prerequisites:

  1. This post uses two publicly available AWS sample datasets from the US-West-2 (Oregon) Region. Use the US-West-2 (Oregon) Region for your test run to reduce cross-region network latency and cost due to the data movement.
  2. You have an AWS account in the same Region.
  3. You have the AdministratorAccess policy granted to your AWS account (for production, you should restrict this further).
  4. You have an existing Amazon S3 bucket named eltblogpost in your data lake to store unloaded data from Amazon Redshift. Because bucket names are unique across AWS accounts, replace eltblogpost with your unique bucket name as applicable in the sample code provided.
  5. You have AWS CLI installed and configured to use with your AWS account.
  6. You have an IAM policy named redshift-elt-test-s3-policy with the following read and write permissions for the Amazon S3 bucket named eltblogpost:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Action": [
                    "s3:GetBucketLocation",
                    "s3:GetObject",
                    "s3:ListBucket",
                    "s3:ListBucketMultipartUploads",
                    "s3:ListMultipartUploadParts",
                    "s3:AbortMultipartUpload",
                    "s3:PutObject",
                    "s3:DeleteObject"
                ],
                "Resource": [
                    "arn:aws:s3:::eltblogpost",
                    "arn:aws:s3:::eltblogpost/*"
                ],
                "Effect": "Allow"
            }
        ]
    }

  7. You have an IAM policy named redshift-elt-test-sampledata-s3-read-policy with read only permissions for the Amazon S3 bucket named awssampledbuswest2, hosting the sample data used for this walkthrough.
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:Get*",
                    "s3:List*"
                ],
                "Resource": [
                    "arn:aws:s3:::awssampledbuswest2",
                    "arn:aws:s3:::awssampledbuswest2/*"
                ]
            }
        ]
    }

  8. You have an IAM role named redshift-elt-test-role that has a trust relationship with redshift.amazonaws.com and glue.amazonaws.com and the following IAM policies (for production, you should restrict this further as needed):
    • redshift-elt-test-s3-policy
    • redshift-elt-test-sampledata-s3-read-policy
    • AWSGlueServiceRole
    • AWSGlueConsoleFullAccess
  9. Make a note of the ARN for redshift-elt-test-role IAM role.
  10. You have an existing Amazon Redshift cluster with the following parameters:
    • Cluster name as rseltblogpost.
    • Database name as rselttest.
    • Four dc2.large nodes.
    • An associated IAM role named redshift-elt-test-role.
    • A publicly available endpoint.
    • A cluster parameter group named eltblogpost-parameter-group, which you use to change the Concurrency Scaling
    • Cluster workload management set to manual.
  11. You have SQL Workbench/J (or another tool of your choice) and can connect successfully to the cluster.
  12. You have an EC2 instance in the same Region with PostgreSQL client CLI (psql) and can connect successfully to the cluster.
  13. You have an AWS Glue catalog database named eltblogpost as the metadata catalog for Amazon Athena and Redshift Spectrum queries.

Loading data to Amazon Redshift local storage

This post uses the Star Schema Benchmark (SSB) dataset. It is provided publicly in an S3 bucket, which any authenticated AWS user with access to Amazon S3 can access.

To load data to Amazon Redshift local storage, complete the following steps:

  1. Connect to the cluster from the SQL Workbench/J.
  2. Execute the CREATE TABLE statements from the Github repo from your SQL Workbench/J to create tables from the SSB dataset. The following diagram shows the list of tables.
  3. Execute the COPY statements from the Github repo. This step loads data into the tables you created using the sample data available in s3://awssampledbuswest2/ssbgz/. Remember to replace your IAM role ARN noted previously.
  4. To verify that each table loaded correctly, run the following commands:
    select count(*) from LINEORDER; 
    select count(*) from PART;
    select count(*) from CUSTOMER;
    select count(*) from SUPPLIER;
    select count(*) from DWDATE;

    The following results table shows the number of rows for each table in the SSB dataset:

    Table Name    Record Count
    LINEORDER     600,037,902
    PART            1,400,000
    CUSTOMER        3,000,000
    SUPPLIER        1,000,000
    DWDATE              2,556

In addition to the record counts, you can also check for a few sample records from each table.

Performing ELT and ETL using Amazon Redshift and unload to S3

The following are the high-level steps for this walkthrough:

  1. You are looking to pre-aggregate some commonly asked measures by your end-users on the point of sales (POS) data you loaded in Amazon Redshift local storage.
  2. You want to then unload the aggregated data from Amazon Redshift to your data lake (S3) in an open and analytics optimized and compressed Parquet file format. You also want to consider an optimized partitioning for the unloaded data in your data lake to help with the end-user query performance and eventually lower cost.
  3. You want to query the unloaded data from your data lake with Redshift Spectrum. You also want to share the data with other AWS services such as Athena with its pay-per-use and serverless ad hoc and on-demand query model, AWS Glue and Amazon EMR for performing ETL operations on the unloaded data and data integration with your other datasets (such as ERP, finance, or third-party data) stored in your data lake, and Amazon SageMaker for machine learning.

Complete the following steps:

  1. To compute the necessary pre-aggregates, execute the following three ELT queries available on the Github repo from your SQL Workbench/J:
    • ELT Query 1 – This query summarizes the revenue by manufacturer, category, and brand per month per year per supplier region.
    • ELT Query 2 – This query summarizes the revenue by brand per month per year by supplier region and city.
    • ELT Query 3 – This query drills down in time by customer city, supplier city, month, and year.
  2. To unload the aggregated data to S3 with Parquet file format and proper partitioning to help with the access patterns of the unloaded data in the data lake, execute the three UNLOAD queries available on the Github repo from your SQL Workbench/J. To use Redshift Spectrum for querying the unloaded data, you need the following:
    • An Amazon Redshift cluster and a SQL client (SQL Workbench/J or another tool of your choice) that can connect to your cluster and execute SQL commands. The cluster and the data files in S3 must be in the same Region.
    • An external schema in Amazon Redshift that references a database in the external data catalog and provides the IAM role ARN that authorizes your cluster to access S3 on your behalf. It is a best practice to have an external data catalog in AWS Glue.You are now ready to create an AWS Glue crawler.
  3. From AWS CLI, run the following code (replace <Your AWS Account>):
    aws glue create-crawler --cli-input-json file://mycrawler.json --region us-west-2
    Where the file mycrawler.json contains:
    {
        "Name": "eltblogpost_redshift_spectrum_etl_elt_glue_crawler",
        "Role": "arn:aws:iam::<Your AWS Account>:role/redshift-elt-test-role",
        "DatabaseName": "eltblogpost",
        "Description": "",
        "Targets": {
            "S3Targets": [
                {
                    "Path": "s3://eltblogpost/unload_parquet/monthly_revenue_by_region_manufacturer_category_brand"
                },
                {
                    "Path": "s3://eltblogpost/unload_parquet/monthly_revenue_by_region_city_brand"
                },
                {
                    "Path": "s3://eltblogpost/unload_parquet/yearly_revenue_by_city"
                }
            ]
        }
    }

    You can also schedule the crawler to run periodically based on your use case. For example, you can schedule the crawler every 35 minutes to keep the AWS Glue catalog tables up to date with the data being unloaded every 30 minutes. However, this post does not configure any scheduling.

  4. After you create the AWS Glue crawler, run it manually from AWS CLI with the following command:
    aws glue start-crawler --name "eltblogpost_redshift_spectrum_etl_elt_glue_crawler" --region us-west-2
  5. When the AWS Glue crawler run is complete, go to the AWS Glue console to see the following three AWS Glue catalog tables under the database eltblogpost:
    • monthly_revenue_by_region_manufacturer_category_brand
    • monthly_revenue_by_region_city_brand
    • yearly_revenue_by_city
  6. Now that you have an external data catalog in AWS Glue named etlblogpost, create an external schema in the persistent cluster named eltblogpost using the following SQL from your SQL Workbench/J (replace <Your AWS Account>):
    create external schema spectrum_eltblogpost 
    from data catalog 
    database 'eltblogpost' 
    iam_role 'arn:aws:iam::<Your AWS Account>:role/redshift-elt-test-role'
    create external database if not exists;

    Using Spectrum, you can now query the three AWS Glue catalog tables you set up earlier.

  7. Go to SQL Workbench/J and run the following sample queries:
    • Top 10 brands by category and manufacturer contributing to revenue for the region AFRICA for the year of 1992 and month of March:
      SELECT brand, category, manufacturer, revenue 
      from "spectrum_eltblogpost"."monthly_revenue_by_region_manufacturer_category_brand"
      where year = '1992'
      and month = 'March' 
      and supplier_region = 'AFRICA'
      order by revenue desc
      limit 10;
      
      brand | category | manufacturer | revenue
      ----------+----------+--------------+-----------
      MFGR#1313 | MFGR#13 | MFGR#1 | 5170356068
      MFGR#5325 | MFGR#53 | MFGR#5 | 5106463527
      MFGR#3428 | MFGR#34 | MFGR#3 | 5055551376
      MFGR#2425 | MFGR#24 | MFGR#2 | 5046250790
      MFGR#4126 | MFGR#41 | MFGR#4 | 5037843130
      MFGR#219 | MFGR#21 | MFGR#2 | 5018018040
      MFGR#159 | MFGR#15 | MFGR#1 | 5009626205
      MFGR#5112 | MFGR#51 | MFGR#5 | 4994133558
      MFGR#5534 | MFGR#55 | MFGR#5 | 4984369900
      MFGR#5332 | MFGR#53 | MFGR#5 | 4980619214

    • Monthly revenue for the region AMERICA for the year 1995 across all brands:
      SELECT month, sum(revenue) revenue
      FROM "spectrum_eltblogpost"."monthly_revenue_by_region_city_brand"
      where year = '1992'
      and supplier_region = 'AMERICA'
      group by month;
      
      month | revenue
      ----------+--------------
      April | 4347703599195
      January | 4482598782080
      September | 4332911671240
      December | 4489411782480
      May | 4479764212732
      August | 4485519151803
      October | 4493509053843
      June | 4339267242387
      March | 4477659286311
      February | 4197523905580
      November | 4337368695526
      July | 4492092583189

    • Yearly revenue for supplier city ETHIOPIA 4 for the years 1992–1995 and month of December:
      SELECT year, supplier_city, sum(revenue) revenue
      FROM "spectrum_eltblogpost"."yearly_revenue_by_city"
      where supplier_city in ('ETHIOPIA 4')
      and year between '1992' and '1995'
      and month = 'December'
      group by year, supplier_city
      order by year, supplier_city;
      
      year | supplier_city | revenue
      -----+---------------+------------
      1992 | ETHIOPIA 4 | 91006583025
      1993 | ETHIOPIA 4 | 90617597590
      1994 | ETHIOPIA 4 | 92015649529
      1995 | ETHIOPIA 4 | 89732644163

When the data is in S3 and cataloged in the AWS Glue catalog, you can query the same catalog tables using Athena, AWS Glue, Amazon EMR, Amazon SageMaker, Amazon QuickSight, and many more AWS services that have seamless integration with S3.

Accelerating ELT and ETL using Redshift Spectrum and unload to S3

Assume that you need to pre-aggregate a set of commonly requested metrics from your end-users on a large dataset stored in the data lake (S3) cold storage using familiar SQL and unload the aggregated metrics in your data lake for downstream consumption.

The following are the high-level steps for this walkthrough:

  1. This is a batch workload that requires standard SQL joins and aggregations on a fairly large volume of relational and structured data. You want to use the power of Redshift Spectrum to perform the required SQL transformations on the data stored in S3 and unload the transformed results back to S3.
  2. You want to query the unloaded data from your data lake using Redshift Spectrum if you have an existing cluster, Athena with its pay-per-use and serverless ad hoc and on-demand query model, AWS Glue and Amazon EMR for performing ETL operations on the unloaded data and data integration with your other datasets in your data lake, and Amazon SageMaker for machine learning.

Because Redshift Spectrum allows you to query the data directly from your data lake without needing to load into Amazon Redshift local storage, you can spin up a short-lived cluster to perform ELT at a massive scale using Redshift Spectrum and terminate the cluster when the work is complete. You can automate spinning up and terminating the short-lived cluster using AWS CloudFormation. That way, you only pay for the few minutes or hours of use. A short-lived cluster can also avoid overloading the current persistent cluster serving interactive queries from live users. For this post, use your existing cluster rseltblogpost.

This post uses a publicly available sample dataset named tickit provided by AWS,  which any authenticated AWS user with access to S3 can access:

  • Sales – s3://awssampledbuswest2/tickit/spectrum/sales/
  • Event – s3://awssampledbuswest2/tickit/allevents_pipe.txt
  • Date – s3://awssampledbuswest2/tickit/date2008_pipe.txt
  • Users – s3://awssampledbuswest2/tickit/allusers_pipe.txt

It is a best practice of Redshift Spectrum for performance reasons to load the dimension tables in the local storage of your short-lived cluster and use an external table for the fact table Sales.

Complete the following steps:

  1. Connect to the cluster from the SQL Workbench/J.To use Redshift Spectrum for querying data from data lake (S3), you need to have the following:
    • An Amazon Redshift cluster and a SQL client (SQL Workbench/J or another tool of your choice) that can connect to your cluster and execute SQL commands. The cluster and the data files in S3 must be in the same Region.
    • An external schema in Amazon Redshift that references a database in the external data catalog and provides the IAM role ARN that authorizes your cluster to access S3 on your behalf. It is a best practice to have an external data catalog in AWS Glue.
    • An AWS Glue catalog database named eltblogpost that you already created.
    • An external schema in your Redshift cluster named spectrum_eltblogpost that you already created.
  2. Execute the SQL available on the Github repo to create an external table named sales in the same external schema named spectrum_eltblogpost. As shown in the previous section, you can also use an AWS Glue crawler to create the external table.
  3. Execute the SQLs available on the Github repo to create the dimension tables to load the data into Amazon Redshift local storage for Redshift Spectrum performance best practices.
  4. Execute the COPY statements available on the Github repo to load the dimension tables using the sample data available in s3://awssampledbuswest2/tickit/. Replace the IAM role ARN with the IAM role ARN you noted earlier, which is associated with your cluster.
  5. To verify that each table has the correct record count, execute the following commands:
    select count(*) from date;
    select count(*) from users;
    select count(*) from event;
    select count(*) from spectrum_eltblogpost.sales;

    The following results table shows the number of rows for each table in the tickit dataset:

    Table Name                    Record Count
    DATE                           365
    USERS                       49,990
    EVENT                        8,798
    spectrum_eltblogpost.sales 172,456

    In addition to the record counts, you can also check for a few sample records from each table.

  6. To compute the necessary pre-aggregates, execute the following three ELT queries available on the Github repo from your SQL Workbench/J:
    • ELT Query 1 – Total quantity sold on a given calendar date.
    • ELT Query 2 – Total quantity sold to each buyer.
    • ELT Query 3 – Events in the 99.9 percentile in terms of all-time gross sales.
  7. To unload the aggregated data to S3 with Parquet file format and proper partitioning to help with the access patterns of the unloaded data in the data lake, execute the three UNLOAD queries available on the Github repo from your SQL Workbench/J.
  8. To use Redshift Spectrum for querying the unloaded data, you can either create a new AWS Glue crawler or modify the previous crawler named eltblogpost_redshift_spectrum_etl_elt_glue_crawler. Update the existing crawler using the following code from AWS CLI (replace <Your AWS Account>):
    aws glue update-crawler --cli-input-json file://mycrawler.json --region us-west-2
    Where the file mycrawler.json contains:
    {
        "Name": "eltblogpost_redshift_spectrum_etl_elt_glue_crawler",
        "Role": "arn:aws:iam::<Your AWS Account>:role/redshift-elt-test-role",
        "DatabaseName": "eltblogpost",
        "Description": "",
        "Targets": {
            "S3Targets": [
                {
                    "Path": "s3://eltblogpost/unload_parquet/monthly_revenue_by_region_manufacturer_category_brand"
                },
                {
                    "Path": "s3://eltblogpost/unload_parquet/monthly_revenue_by_region_city_brand"
                },
                {
                    "Path": "s3://eltblogpost/unload_parquet/yearly_revenue_by_city"
                },
                {
                    "Path": "s3://eltblogpost/unload_parquet/total_quantity_sold_by_date"
                },
                {
                    "Path": "s3://eltblogpost/unload_parquet/total_quantity_sold_by_buyer_by_date"
                },
                {
                    "Path": "s3://eltblogpost/unload_parquet/total_price_by_eventname"
                }
            ]
        }
    }

  9. After you create the crawler successfully, run it manually from AWS CLI with the following command:
    aws glue start-crawler --name "eltblogpost_redshift_spectrum_etl_elt_glue_crawler" --region us-west-2
  10. When the crawler run is complete, go to the AWS Glue console. The following additional catalog tables are in the catalog database eltblogpost:
    • total_quantity_sold_by_date
    • total_quantity_sold_by_buyer_by_date
    • total_price_by_eventname
  11. Using Spectrum, you can now query the three preceding catalog tables. Go to SQL Workbench/J and run the following sample queries:
      • Top 10 days for quantities sold in February and March 2008:
        SELECT caldate, total_quantity
        FROM "spectrum_eltblogpost"."total_quantity_sold_by_date"
        where caldate between '2008-02-01' and '2008-03-30'
        order by total_quantity desc
        limit 10;
        
        caldate | total_quantity
        -----------+---------------
        2008-02-20 | 1170
        2008-02-25 | 1146
        2008-02-19 | 1145
        2008-02-24 | 1141
        2008-03-26 | 1138
        2008-03-22 | 1136
        2008-03-17 | 1129
        2008-03-08 | 1129
        2008-02-16 | 1127
        2008-03-23 | 1121

      • Top 10 buyers for quantities sold in February and March 2008:
        SELECT firstname,lastname,total_quantity
        FROM "spectrum_eltblogpost"."total_quantity_sold_by_buyer_by_date"
        where caldate between '2008-02-01' and '2008-03-31'
        order by total_quantity desc
        limit 10;
        
        firstname | lastname | total_quantity
        ----------+------------+---------------
        Laurel | Clay | 9
        Carolyn | Valentine | 8
        Amelia | Osborne | 8
        Kai | Gill | 8
        Gannon | Summers | 8
        Ignacia | Nichols | 8
        Ahmed | Mcclain | 8
        Amanda | Mccullough | 8
        Blair | Medina | 8
        Hadley | Bennett | 8

      • Top 10 event names for total price:
        SELECT eventname, total_price
        FROM "spectrum_eltblogpost"."total_price_by_eventname"
        order by total_price desc
        limit 10;
        
        eventname | total_price
        ---------------------+------------
        Adriana Lecouvreur | 51846.00
        Janet Jackson | 51049.00
        Phantom of the Opera | 50301.00
        The Little Mermaid | 49956.00
        Citizen Cope | 49823.00
        Sevendust | 48020.00
        Electra | 47883.00
        Mary Poppins | 46780.00
        Live | 46661.00

After the data is in S3 and cataloged in the AWS Glue catalog, you can query the same catalog tables using Amazon Athena, AWS Glue, Amazon EMR, Amazon SageMaker, Amazon QuickSight, and many more AWS services that have seamless integration with S3.

Scaling ELT and unload running in parallel using Concurrency Scaling

Assume that you have a mixed workload under concurrency when the UNLOAD queries and the ELT jobs run in parallel in your cluster with Concurrency Scaling turned on. When Concurrency Scaling is enabled, Amazon Redshift automatically adds additional cluster capacity when you need to process an increase in concurrent read queries including UNLOAD queries. By default, Concurrency Scaling mode is turned off for your cluster. In this post, you enable the Concurrency Scaling mode for your cluster.

Complete the following steps:

  1. Go to your cluster parameter group named eltblogpost-parameter-group and complete the following:
    • Update max_concurrency_scaling_clusters to 5.
    • Create a new queue named Queue 1 with Concurrency Scaling mode set to Auto and a query group named unload_query for the UNLOAD jobs in the next steps.
  2. After you make these changes, reboot your cluster for the changes to take effect.
  3. For this post, use psql client to connect to your cluster rseltblogpost from the EC2 instance that you set up earlier.
  4. Open an SSH session to your EC2 instance and copy the nine files as shown below from the concurrency folder in the Github repo to /home/ec2-user/eltblogpost/ in your EC2 instance.
  5. Review the concurrency-elt-unload.sh script that runs the following eight jobs in parallel:
    • An ELT script for SSB dataset, which kicks off one query at a time.
    • An ELT script for the tickit dataset, which kicks off one query at a time.
    • Three unload queries for the SSB datasets kicked off in parallel.
    • Three unload queries for the tickit datasets kicked off in parallel.
  6. Run the concurrency-elt-unload.sh While the script is running, you will see the following sample output:             The following are the response times taken by the script:
    real 2m40.245s
    user 0m0.104s
    sys 0m0.000s
  7. Run the following query to validate that some of the UNLOAD queries ran in the Concurrency Scaling clusters (look for “which_cluster = Concurrency Scaling” in the query output below):
    SELECT query,
    Substring(querytxt,1,90) query_text,
    starttime starttime_utc,
    (endtime-starttime)/(1000*1000) elapsed_time_secs,
    case when aborted= 0 then 'complete' else 'error' end status,
    case when concurrency_scaling_status = 1 then 'Concurrency Scaling' else 'Main' end which_cluster
    FROM stl_query
    WHERE database = 'rselttest'
    AND starttime between '2019-10-20 22:53:00' and '2019-10-20 22:56:00’
    AND userid=100
    AND querytxt NOT LIKE 'padb_fetch_sample%'
    AND (querytxt LIKE 'create%' or querytxt LIKE 'UNLOAD%')
    ORDER BY query DESC;

    See the following output from the query: 

  8. Comment out the following SET statement in the six UNLOAD query files (ssb-unload<1-3>.sql and tickit-unload<1-3>.sql) to force all six UNLOAD queries to run in the main cluster:
    set query_group to 'unload_query';

    In other words, disable Concurrency Scaling mode for the UNLOAD queries.

  9. Run the concurrency-elt-unload.sh script. While the script is running, you will see the following sample output:              The following are the response times taken by the script:
    real 3m40.328s
    user 0m0.104s
    sys 0m0.000s

    The following shows the Workload Management settings for the Redshift cluster: 

  10. Run the following query to validate that all the queries ran in the main cluster (look for “which_cluster = Main” in the query output below):
    SELECT query,
    Substring(querytxt,1,90) query_text,
    starttime starttime_utc,
    (endtime-starttime)/(1000*1000) elapsed_time_secs,
    case when aborted= 0 then 'complete' else 'error' end status,
    case when concurrency_scaling_status = 1 then 'Concurrency Scaling' else 'Main' end which_cluster
    FROM stl_query
    WHERE database = 'rselttest'
    AND starttime between '2019-10-20 23:19:00' and '2019-10-20 23:24:00’
    AND userid=100
    AND querytxt NOT LIKE 'padb_fetch_sample%'
    AND (querytxt LIKE 'create%' or querytxt LIKE 'UNLOAD%')
    ORDER BY query DESC;

    See the following output from the query:With Concurrency Scaling, the end-to-end runtime improved by 37.5% (60 seconds faster).

    Summary

    This post provided a step-by-step walkthrough of a few straightforward examples of the common ELT and ETL design patterns of Amazon Redshift using some key Amazon Redshift features such as Amazon Redshift Spectrum, Concurrency Scaling, and recent support for data lake export.

    As always, AWS welcomes feedback. Please submit thoughts or questions in the comments.

     


    About the Authors

    Asim Kumar Sasmal is a senior data architect – IoT in the Global Specialty Practice of AWS Professional Services. He helps AWS customers around the globe to design and build data driven solutions by providing expert technical consulting, best practices guidance, and implementation services on AWS platform. He is passionate about working backwards from customer ask, help them to think big, and dive deep to solve real business problems by leveraging the power of AWS platform.

     

    Maor Kleider is a principal product manager for Amazon Redshift, a fast, simple and cost-effective data warehouse. Maor is passionate about collaborating with customers and partners, learning about their unique big data use cases and making their experience even better. In his spare time, Maor enjoys traveling and exploring new restaurants with his family.

ETL and ELT design patterns for lake house architecture using Amazon Redshift: Part 1

Post Syndicated from Asim Kumar Sasmal original https://aws.amazon.com/blogs/big-data/etl-and-elt-design-patterns-for-lake-house-architecture-using-amazon-redshift-part-1/

Part 1 of this multi-post series discusses design best practices for building scalable ETL (extract, transform, load) and ELT (extract, load, transform) data processing pipelines using both primary and short-lived Amazon Redshift clusters. You also learn about related use cases for some key Amazon Redshift features such as Amazon Redshift Spectrum, Concurrency Scaling, and recent support for data lake export.

Part 2 of this series, ETL and ELT design patterns for lake house architecture using Amazon Redshift: Part 2, shows a step-by-step walkthrough to get started using Amazon Redshift for your ETL and ELT use cases.

ETL and ELT

There are two common design patterns when moving data from source systems to a data warehouse. The primary difference between the two patterns is the point in the data-processing pipeline at which transformations happen. This also determines the set of tools used to ingest and transform the data, along with the underlying data structures, queries, and optimization engines used to analyze the data. The first pattern is ETL, which transforms the data before it is loaded into the data warehouse. The second pattern is ELT, which loads the data into the data warehouse and uses the familiar SQL semantics and power of the Massively Parallel Processing (MPP) architecture to perform the transformations within the data warehouse.

In the following diagram, the first represents ETL, in which data transformation is performed outside of the data warehouse with tools such as Apache Spark or Apache Hive on Amazon EMR or AWS Glue. This pattern allows you to select your preferred tools for data transformations. The second diagram is ELT, in which the data transformation engine is built into the data warehouse for relational and SQL workloads. This pattern is powerful because it uses the highly optimized and scalable data storage and compute power of MPP architecture.

Redshift Spectrum

Amazon Redshift is a fully managed data warehouse service on AWS. It uses a distributed, MPP, and shared nothing architecture. Redshift Spectrum is a native feature of Amazon Redshift that enables you to run the familiar SQL of Amazon Redshift with the BI application and SQL client tools you currently use against all your data stored in open file formats in your data lake (Amazon S3).

A common pattern you may follow is to run queries that span both the frequently accessed hot data stored locally in Amazon Redshift and the warm or cold data stored cost-effectively in Amazon S3, using views with no schema binding for external tables. This enables you to independently scale your compute resources and storage across your cluster and S3 for various use cases.

Redshift Spectrum supports a variety of structured and unstructured file formats such as Apache Parquet, Avro, CSV, ORC, JSON to name a few. Because the data stored in S3 is in open file formats, the same data can serve as your single source of truth and other services such as Amazon Athena, Amazon EMR, and Amazon SageMaker can access it directly from your S3 data lake.

For more information, see Amazon Redshift Spectrum Extends Data Warehousing Out to Exabytes—No Loading Required.

Concurrency Scaling

Using Concurrency Scaling, Amazon Redshift automatically and elastically scales query processing power to provide consistently fast performance for hundreds of concurrent queries. Concurrency Scaling resources are added to your Amazon Redshift cluster transparently in seconds, as concurrency increases, to serve sudden spikes in concurrent requests with fast performance without wait time. When the workload demand subsides, Amazon Redshift automatically shuts down Concurrency Scaling resources to save you cost.

The following diagram shows how the Concurrency Scaling works at a high-level:

For more information, see New – Concurrency Scaling for Amazon Redshift – Peak Performance at All Times.

Data lake export

Amazon Redshift now supports unloading the result of a query to your data lake on S3 in Apache Parquet, an efficient open columnar storage format for analytics. The Parquet format is up to two times faster to unload and consumes up to six times less storage in S3, compared to text formats. You can also specify one or more partition columns, so that unloaded data is automatically partitioned into folders in your S3 bucket to improve query performance and lower the cost for downstream consumption of the unloaded data. For example, you can choose to unload your marketing data and partition it by year, month, and day columns. This enables your queries to take advantage of partition pruning and skip scanning of non-relevant partitions when filtered by the partitioned columns, thereby improving query performance and lowering cost. For more information, see UNLOAD.

Use cases

You may be using Amazon Redshift either partially or fully as part of your data management and data integration needs. You likely transitioned from an ETL to an ELT approach with the advent of MPP databases due to your workload being primarily relational, familiar SQL syntax, and the massive scalability of MPP architecture.

This section presents common use cases for ELT and ETL for designing data processing pipelines using Amazon Redshift.

ELT

Consider a batch data processing workload that requires standard SQL joins and aggregations on a modest amount of relational and structured data. You selected initially a Hadoop-based solution to accomplish your SQL needs. However, over time, as data continued to grow, your system didn’t scale well. You now find it difficult to meet your required performance SLA goals and often refer to ever-increasing hardware and maintenance costs. Relational MPP databases bring an advantage in terms of performance and cost, and lowers the technical barriers to process data by using familiar SQL.

Amazon Redshift has significant benefits based on its massively scalable and fully managed compute underneath to process structured and semi-structured data directly from your data lake in S3.

The following diagram shows how Redshift Spectrum allows you to simplify and accelerate your data processing pipeline from a four-step to a one-step process with the CTAS (Create Table As) command.

The preceding architecture enables seamless interoperability between your Amazon Redshift data warehouse solution and your existing data lake solution on S3 hosting other Enterprise datasets such as ERP, finance, and third-party for a variety of data integration use cases.

The following diagram shows the seamless interoperability between your Amazon Redshift and your data lake on S3:

When you use an ELT pattern, you can also use your existing ELT-optimized SQL workload while migrating from your on-premises data warehouse to Amazon Redshift. This eliminates the need to rewrite relational and complex SQL workloads into a new compute framework from scratch. With Amazon Redshift, you can load, transform, and enrich your data efficiently using familiar SQL with advanced and robust SQL support, simplicity, and seamless integration with your existing SQL tools. You also need the monitoring capabilities provided by Amazon Redshift for your clusters.

ETL

You have a requirement to unload a subset of the data from Amazon Redshift back to your data lake (S3) in an open and analytics-optimized columnar file format (Parquet). You then want to query the unloaded datasets from the data lake using Redshift Spectrum and other AWS services such as Athena for ad hoc and on-demand analysis, AWS Glue and Amazon EMR for ETL, and Amazon SageMaker for machine learning.

You have a requirement to share a single version of a set of curated metrics (computed in Amazon Redshift) across multiple business processes from the data lake. You can use ELT in Amazon Redshift to compute these metrics and then use the unload operation with optimized file format and partitioning to unload the computed metrics in the data lake.

You also have a requirement to pre-aggregate a set of commonly requested metrics from your end-users on a large dataset stored in the data lake (S3) cold storage using familiar SQL and unload the aggregated metrics in your data lake for downstream consumption. In other words, consider a batch workload that requires standard SQL joins and aggregations on a fairly large volume of relational and structured cold data stored in S3 for a short duration of time. You can use the power of Redshift Spectrum by spinning up one or many short-lived Amazon Redshift clusters that can perform the required SQL transformations on the data stored in S3, unload the transformed results back to S3 in an optimized file format, and terminate the unneeded Amazon Redshift clusters at the end of the processing. This way, you only pay for the duration in which your Amazon Redshift clusters serve your workloads.

As shown in the following diagram, once the transformed results are unloaded in S3, you then query the unloaded data from your data lake either using Redshift Spectrum if you have an existing Amazon Redshift cluster, Athena with its pay-per-use and serverless ad hoc and on-demand query model, AWS Glue and Amazon EMR for performing ETL operations on the unloaded data and data integration with your other datasets (such as ERP, finance, and third-party data) stored in your data lake, and Amazon SageMaker for machine learning.

You can also scale the unloading operation by using the Concurrency Scaling feature of Amazon Redshift. This provides a scalable and serverless option to bulk export data in an open and analytics-optimized file format using familiar SQL.

Best practices

The following recommended practices can help you to optimize your ELT and ETL workload using Amazon Redshift.

Analyze requirements to decide ELT versus ETL

MPP architecture of Amazon Redshift and its Spectrum feature is efficient and designed for high-volume relational and SQL-based ELT workload (joins, aggregations) at a massive scale. A common practice to design an efficient ELT solution using Amazon Redshift is to spend sufficient time to analyze the following:

  • Type of data from source systems (structured, semi-structured, and unstructured)
  • Nature of the transformations required (usually encompassing cleansing, enrichment, harmonization, transformations, and aggregations)
  • Row-by-row, cursor-based processing needs versus batch SQL
  • Performance SLA and scalability requirements considering the data volume growth over time
  • Cost of the solution

This helps to assess if the workload is relational and suitable for SQL at MPP scale.

Key considerations for ELT

For ELT and ELT both, it is important to build a good physical data model for better performance for all tables, including staging tables with proper data types and distribution methods. A dimensional data model (star schema) with fewer joins works best for MPP architecture including ELT-based SQL workloads. Consider using a TEMPORARY table for intermediate staging tables as feasible for the ELT process for better write performance, because temporary tables only write a single copy.

A common rule of thumb for ELT workloads is to avoid row-by-row, cursor-based processing (a commonly overlooked finding for stored procedures). This is sub-optimal because such processing needs to happen on the leader node of an MPP database like Amazon Redshift. Instead, the recommendation for such a workload is to look for an alternative distributed processing programming framework, such as Apache Spark.

Several hundreds to thousands of single record inserts, updates, and deletes for highly transactional needs are not efficient using MPP architecture. Instead, stage those records for either a bulk UPDATE or DELETE/INSERT on the table as a batch operation.

With the external table capability of Redshift Spectrum, you can optimize your transformation logic using a single SQL as opposed to loading data first in Amazon Redshift local storage for staging tables and then doing the transformations on those staging tables.

Key considerations for data lake export

When you unload data from Amazon Redshift to your data lake in S3, pay attention to data skew or processing skew in your Amazon Redshift tables. The UNLOAD command uses the parallelism of the slices in your cluster. Hence, if there is a data skew at rest or processing skew at runtime, unloaded files on S3 may have different file sizes, which impacts your UNLOAD command response time and query response time downstream for the unloaded data in your data lake.

You should also control maximum file size to approximately 100 MB or less in the UNLOAD command for better performance for downstream consumption. Similarly, for S3 partitioning, a rule of thumb is to not exceed number of partitions per table on S3 to couple of hundreds by choosing the low cardinality partitioning columns (year, quarter, month, and day are good choices) in the UNLOAD command. This avoids creating too many partitions, which in turn creates a large volume of metadata in the AWS Glue catalog, leading to high query times via Athena and Redshift Spectrum.

To get the best throughput and performance under concurrency for multiple UNLOAD commands running in parallel, create a separate queue for unload queries with Concurrency Scaling turned on. This lets Amazon Redshift burst additional Concurrency Scaling clusters as required.

Key considerations for Redshift Spectrum for ELT

To get the best performance from Redshift Spectrum, pay attention to the maximum pushdown operations possible, such as S3 scan, projection, filtering, and aggregation, in your query plans for a performance boost. This is because you want to utilize the powerful infrastructure underneath that supports Redshift Spectrum. Using predicate pushdown also avoids consuming resources in the Amazon Redshift cluster.

In addition, avoid complex operations like DISTINCT or ORDER BY on more than one column and replace them with GROUP BY as applicable. Amazon Redshift can push down a single column DISTINCT as a GROUP BY to the Spectrum compute layer with a query rewrite capability underneath, whereas multi-column DISTINCT or ORDER BY operations need to happen inside Amazon Redshift cluster.

Amazon Redshift optimizer can use external table statistics to generate more optimal execution plans. Without statistics, an execution plan is generated based on heuristics with the assumption that the S3 table is relatively large. It is recommended to set the table statistics (numRows) manually for S3 external tables.

For more information on Amazon Redshift Spectrum best practices, see Twelve Best Practices for Amazon Redshift Spectrum and How to enable cross-account Amazon Redshift COPY and Redshift Spectrum query for AWS KMS–encrypted data in Amazon S3.

Summary

This post discussed the common use cases and design best practices for building ELT and ETL data processing pipelines for data lake architecture using few key features of Amazon Redshift: Spectrum, Concurrency Scaling, and the recently released support for data lake export with partitioning.

Part 2 of this series, ETL and ELT design patterns for lake house architecture using Amazon Redshift: Part 2, shows you how to get started with a step-by-step walkthrough of a few simple examples using AWS sample datasets.

As always, AWS welcomes feedback. Please submit thoughts or questions in the comments.

 


About the Authors

Asim Kumar Sasmal is a senior data architect – IoT in the Global Specialty Practice of AWS Professional Services. He helps AWS customers around the globe to design and build data driven solutions by providing expert technical consulting, best practices guidance, and implementation services on AWS platform. He is passionate about working backwards from customer ask, help them to think big, and dive deep to solve real business problems by leveraging the power of AWS platform.

 

Maor Kleider is a principal product manager for Amazon Redshift, a fast, simple and cost-effective data warehouse. Maor is passionate about collaborating with customers and partners, learning about their unique big data use cases and making their experience even better. In his spare time, Maor enjoys traveling and exploring new restaurants with his family.

Matching patient records with the AWS Lake Formation FindMatches transform

Post Syndicated from Dhawalkumar Patel original https://aws.amazon.com/blogs/big-data/matching-patient-records-with-the-aws-lake-formation-findmatches-transform/

Patient matching is a major obstacle in achieving healthcare interoperability. Mismatched patient records and inability to retrieve patient history can cause significant barriers to informed clinical decision-making and result in missed diagnoses or delayed treatments. Additionally, healthcare providers often invest in patient data deduplication, especially when the number of patient records is growing rapidly in their databases. Electronic Health Records (EHRs) have significantly improved patient safety and care coordination in recent years; however, accurate patient matching remains a challenge for many healthcare organizations.

Duplicate patient records emerge for a variety of reasons, including human-generated number insertion, deletion, substitution, or transposition errors. Optical Character Recognition (OCR) software, which digitizes patient records, may also introduce errors.

Multiple types of record-matching algorithms exist to solve this problem. These include basic deterministic methods such as grouping and comparing relevant fields (such as SSN, name, or date of birth), phonetic encoding systems, and more advanced algorithms using machine learning (ML).

AWS Lake Formation is a HIPAA-eligible service that helps you build a secure data lake in a few simple steps. Lake Formation also contains FindMatches, an ML transform that enables you to match records across different datasets and identify and remove duplicate records, with little to no human intervention.

This post shows you how to use the FindMatches ML transform to identify matching patient records in a synthetically generated dataset. To use FindMatches, you don’t have to write code or know how ML works. This is useful for finding matches in your data when it doesn’t include a reliable, unique personal identifier, even if the fields don’t match exactly.

Patient dataset

Various regulations in different countries govern patient data due to its sensitive nature. As such, the availability of patient data on which to train matching algorithms is often scarce, which complicates model development. A common method to get around these challenges is synthetic data. This post generates patient data based on the Open Source Freely Extensible Biomedical Record Linkage Program (FEBRL). FEBRL uses Hidden Markov Models (HMMs) to prepare the name and address data for patient record matching. It also allows for mimicking real-life patient datasets that lead to duplicates, which may have the following mismatches:

  • Blank fields.
  • Typographical errors such as misspelling, transposing the characters, or swapping the fields.
  • Shortened middle names versus records complete middle names.
  • Various formats of mailing address and its components.
  • OCR-related errors.
  • Phonetical errors.
  • No globally unique patient or person identifier. Every healthcare provider may have a patient identifier assigned to the same person but may not have a person identifier like SSN, so they have datasets without keys.

FEBRL can generate these types of datasets based on configurable parameters to change the probabilities of each type of error, and thus incorporate a variety of scenarios leading to duplicates. The generation of the synthetic dataset is beyond the scope of this post; this post provides a pre-generated dataset for you to explore. In brief, below are the steps to generate the synthetic dataset which will be used to run FindMatches:

  1. Download and install FEBRL.
  2. Modify the parameters to create a dataset mimicking your expectations. For more information, see the dataset generation instructions of FEBRL.
  3. Cleanse the dataset (this confirms the same schema for each record, and removes single quotes and family role).

The dataset for this post uses AWS Region US East (N. Virginia).

FRBRL patient data structure

The following table shows the structure of FEBRL patient data. The data includes 40,000 records.

The original record and duplicate records are grouped together. The patient_id values are generated in a specific format: rec-<record number>-org/dup-<duplicate record number>, followed by the FEBRL data generator tool.

The following table is a preview of what you’re working to achieve with the FindMatches ML transform. Once you match datasets, the resulting table mirrors the input table’s structure and data and adds a match_id column. Matched records display the same match_id value. It is still possible to have false positives and false negatives, but the transform is still extremely beneficial.

Prerequisites

The sample synthetic patient dataset for this post uses Region US East (N. Virginia), and thus all the steps mentioned in this post must be performed in the same AWS region i.e. us-east-1 but the steps are easily modifiable if your data is in a different region.

Solution architecture

The following diagram shows the solution’s architecture.

 

 

Solution overview

At a high level, the matching process includes the following steps:

  1. Upload raw patient dataset in csv format on Amazon S3 Bucket
  2. Crawl the uploaded patient dataset using AWS Glue crawler
  3. Catalog your patient data with the AWS Glue Data Catalog and create a FindMatches ML transform.
  4. Create a label set, either using ML transform or manually, and teach FindMatches by providing labeled examples of matching and non-matching records. Upload your labels and estimate the quality of the prediction. Add more labelsets and repeat this step as required to get the required Precision, Accuracy and Recall rate.
  5. Create and execute an AWS Glue ETL job that uses your FindMatches transform.
  6. Store the results of FindMatches transform on Amazon S3 bucket
  7. Create an AWS Glue data catalog of FindMatches ML transform results.
  8. Review the transform results with Amazon Athena.

Cataloging your data and creating a FindMatches ML transform

FindMatches operates on tables defined in the AWS Glue Data Catalog. Use AWS Glue crawlers to discover and catalog the patient data. You can use the FEBRL patient data generated for this post.

The Cloudformation stack provided below creates the resources in the AWS region - us-east-1 (US East (N. Virginia)

To create the Catalog and FindMatches ML transform in AWS Glue, launch the following stack:


This stack creates the following resources:

  • An Amazon S3 bucket that stores the ML transform results (configurable as part of the launch). You can find the name of the bucket on the AWS CloudFormation stacks console, under the Outputs This post uses the name S3BucketName.
  • An IAM role to allow AWS Glue to access other services, including S3.
  • An AWS Glue database (configurable as part of the launch).
  • An AWS Glue table that represents the Public Original Synthetic Patients Dataset (configurable as part of the launch).
  • An AWS Glue ML transform with the source as your AWS Glue table, with Accuracy set to 1 and Precision set to 0.9.

For more information, see Integrate and deduplicate datasets using AWS Lake Formation FindMatches.

Tuning the ML transform

The safety risks of false positive matches, in which clinicians believe incorrect information about the patient to be accurate, may be greater than the safety risks of false negative matches, in which clinicians lack access to the existing information about the patient. (For more information, see the related study on the NCBI website.) Therefore, moving the Recall vs. Prevision slider toward Precision has a higher level of confidence to identify if the records belong to the same patient and minimizes the safety risks of false positive matches.

A higher Accuracy setting helps achieve higher recall, at the expense of a longer runtime (and thus cost) necessary to compare more records.

To achieve the comparatively better results for this particular dataset, the launch stack already created the transform for you with the Recall vs. Precision slider set to 0.9 toward Precision and the Lower Cost vs. Accuracy slider set to Accuracy. If needed, you can adjust these values later by selecting the transform and using the Tune menu.

Teaching FindMatches using labeled data

After running the launch stack successfully, you can train the transform by supplying it with matching and non-matching records using labelsets.

Creating a labeling set

You can create a labeling set yourself or allow AWS Glue to generate the labeling set based on heuristics.

AWS Glue extracts records from your source data and suggests potential matching records. The generated labelset file contains approximately 100 data samples for you to work on.

This post provides you with an AWS Glue generated labeled data file that you can use, with a fully populated label column. This file is fully ready for consumption.

If you choose to use the pre-generated labeled data file provided in this post, skip the below labeling file generation steps

To create the training set, complete the following steps:

  1. On the AWS Glue console, Under ETL, Jobs and ML transforms you will find the ML transform with name cfn-findmatches-ml-transform-demo created for you by the stack provided.
  2. Choose the ML transform cfn-findmatches-ml-transform-demo and click on Action and select Teach transform
  3. For Teach the transform using labels, choose I do not have labels.
  4. Choose Generate labeling file.
  5. Provide the S3 path to store the generated label file.
  6. Choose Next.

The following table shows the generated labeled data file with an empty label column.

You need to populate the label column by marking the records that are a real match with the same value. Each labelset should contain positive and negative match examples.

This post provides you with a labeled data file that you can use, with a fully populated label column. This file is fully ready for consumption.

The following table shows the table with a completed label column.

The labeling file has the same schema as the input data, plus two additional columns: labeling_set_id and label.

The training dataset is divided into labeling sets. Each labeling set displays a labeling_set_id value. This identification simplifies the labeling process, enabling you to focus on the match relationship of records within the same labeling set, rather than having to scan the entire file. For the preceding dataset, extract the label values from patient_id by removing the suffix -org and -dup using regular expression. But in general, you would assign these labels according to which records should match based on the attribute values.

If you specify the same label value for two or more records within a labeling set, you teach the FindMatches transform to consider these records a match. On the other hand, when two or more records have different labels within the same labeling set, FindMatches learns that these records aren’t considered a match. The transform evaluates record relationships between records within the same labeling set, not across labeling sets.

You should label a few hundred records to achieve a modest match quality, and a few thousand records to achieve a high match quality.

Uploading your labels and reviewing match quality

After you create the labeled dataset (which needs to be in .csv format), teach FindMatches where to find it. Complete the following steps:

  1. On the AWS Glue console, select the transform that you created earlier.
  2. Choose Action.
  3. Choose Teach transform.
  4. For Upload labels, choose I have labels.
  5. Choose Upload labeling file from S3.
  6. Choose Next.
  7. If you want to use the labelset provided in this blog post, download the labelset here.
  8. Create a folder with name trainingset in the same S3 bucket created by the previously launched cloudformation template above.
  9. Upload the above labelset in the trainingset folder in the same S3 bucket
  10. Choose Overwrite my existing labels.You are only using one set of labels. If adding labels iteratively, choose the Append to my existing labels option.
  11. Choose Upload.With the labels uploaded, your transform is now ready to use. Though not strictly required, check the transform match quality by reviewing the metrics of matching and non-matching records.
  12. Choose Estimate transform quality.The transform quality estimate learns using 70% of your labels. After it’s trained, the quality estimate tests how well the transform learned to identify matching records against the remaining 30%. Finally, the transform generates quality metrics by comparing the matches and non-matches predicted by the algorithm vs. your actual labels. This process may take up to several minutes.

Your results should be similar to those in the following screenshot.

Consider these metrics approximate, because the test uses only a small subset of data for estimating quality. If you’re satisfied with the metrics, proceed with creating and running a record-matching job. Or, to improve matching quality further, upload more labeled records.

Creating and running an AWS Glue ETL job

After you create a FindMatches transform and verify that it learned to identify matching records in your data, you’re ready to identify matches in your complete dataset. To create and run a record-matching job, complete the following steps:

  1. Create a transformresults folder inside the S3 bucket that the AWS CloudFormation template created when you launched the stack.This folder stores the MLTransform results from your AWS Glue job.
  2. On the AWS Glue console, under Jobs, choose Add job.
  3. Under Configure the job properties, for Name, enter a name for the job.
  4. For IAM role, choose your role from the dropdown menu.Choose the IAM role that the AWS CloudFormation stack created, called AWSGlueServiceRoleMLTransform. For more information, see Create an IAM Role for AWS Glue.
  5. Select Spark as the Type with Glue version as Spark 2.2, Python 2 (Glue version 0.9)
  6. Select job run as A proposed script generated by AWS Glue.
  7. For Choose a data source, choose the transform data source.This post uses the data source cfn_table_patient.
  8. Under Choose a transform type, choose Find matching records.
  9. For Worker type, choose G.2X.
  10. For Number of workers, enter 10.You can add more workers based on the size of the datasets by increasing this number.
  11. To review records identified as duplicate, do not select Remove duplicate records.
  12. Choose Next.
  13. Choose the transform that you created.
  14. Choose Next.
  15. For Choose a data target, choose Create tables in your data target.
  16. For Data store, choose Amazon S3.
  17. For Format, choose CSV.
  18. For Compression type, choose None.
  19. For Target path, choose a path for the job’s output.The target path is the S3 bucket that AWS CloudFormation created, along with the folder named transformresults you created previously.
  20. Choose Save job and edit script.The script is now generated for your job and ready to use. Alternatively, you can customize the script further to suit your specific ETL needs.
  21. To start identifying matches in this dataset, choose Run job as shown in the below screen.For now, leave the job parameters with the default settings, and close this page after starting the job.The following screenshot shows the proposed Python Spark script generated by AWS Glue using the ML Transform.If the execution is successful, FindMatches shows the run status as Succeeded. The execution might take a few minutes to finish.

FindMatches saves your output data as multi-part .csv files in the target path that you specified during the job creation. The resulting .csv files mirror the input table’s structure and data, but have a match_id column. Matched records display the same match_id value.

Creating a Data Catalog of your transform results

To view the output, you can either download the multi-part .csv files from the S3 bucket directly and review it via your preferred editor, or you can run SQL-like queries against the data stored on S3 using Athena. To view the data using Athena, you need to crawl the folder with the multi-part .csv files that you created as part of the output of your FindMatches ETL job.

Go to AWS Glue and create a new table using AWS Glue crawlers in the existing database for patient matching that holds the records from the output of your FindMatches ETL job with the source data as the folder of your S3 bucket containing multi-part .csv files. For this post, the source data is the folder transformresults in the bucket created by the AWS CloudFormation stack.

To create a crawler, complete the following steps:

  1. On the AWS Glue console, under Data Catalog, choose Crawlers.
  2. Click Add crawler to create a new crawler to crawl the transform results.
  3. Provide the name of the crawler and click Next.
  4. Choose Data stores as the Crawler source type and click Next.
  5. In the Add a data store section, for Choose a data store, choose S3.
  6. For Crawl data in, choose Specified path in my account.
  7. For Include path, enter the name of your path. This should be the same S3 bucket created by cloudformation previously along with the folder named transformresults you created. Verify the folder has multi-part csv files created.
  8. Choose Next.
  9. In the Choose an IAM role section, choose Choose the IAM role.
  10. For IAM role, enter the name of the crawler.
  11. Choose Next.
  12. Select Run on demand for Frequency.
  13. Configure the crawler’s output with Database set to cfn-database-patient.
  14. Set the Prefix added to tables value to be table_results_. This will help identify the table containing the transform results.
  15. Click on Finish.
  16. Select the same crawler and click on Run the crawler.After crawler execution is successful, you should see a new table created corresponding to the crawler settings in the respective database you selected during crawler configuration.
  17. From the AWS Glue console, under Databases, choose Tables.
  18. Choose Action.
  19. Choose Edit table details.
  20. For Serde Serialization lib, enter org.apache.hadoop.hive.serde2.OpenCSVSerde.
  21. Under Serde parameters, add key escapeChar, with value as \\.
  22. Add key quoteChar with value as " (double quotes).
  23. Set key field.delim with value as ,
  24. Add key separatorChar with value as ,You can set the Serde parameters as per your requirements based on the type of datasets you have.
  25. Edit the schema of the table by setting the data types of all columns to String. To edit the schema of the table, click on the table and click on the Edit schema button.

You can also choose to retain the inferred data types by crawler as per your requirements. This post sets all to the String datatype for the sake of simplicity, except for the match_id column, which is set as bigint.

Reviewing the output with Amazon Athena

To review the output with Amazon Athena, complete the following steps:

  1. From the Data Catalog, choose Tables.
  2. Choose the table name created by your crawler for the results.
  3. Choose Action.
  4. Choose View data.

    The Athena console opens.  If you are running Amazon Athena for the first time, you might have to click on Get Started. Before you run your first query, you will also need to set up a query result location in Amazon S3. Click on set up a query result location in Amazon S3 on Amazon Athena console and set the location of the query results. You can create additional folder in the same Amazon S3 bucket created previously by the cloudformation. Please make sure the S3 path ends with a /.
  5. Choose the appropriate database.For this post, choose cfn-database-patient. You might need to refresh the data source if you do not see the database in the drop down.
  6. Choose the results table that contains the FindMatches output containing the patient records with the match_id column. In this case, it will be table_results_transformresults. If you chose a different name for the results table, the below query needs to be changed to reflect the correct table name.
  7. Run the below query by choosing Run query.
    SELECT * FROM "cfn-database-patient"."table_results_transformresults" order by match_id;

The following screenshot shows your output.

Security Considerations

AWS Lake Formation helps protect your data by giving you a central location in which you can configure granular data access policies, regardless of which services you use to access it.

To centralize data access policy controls using Lake Formation, first shut down direct access to your buckets in S3 so Lake Formation manages all data access. Configure data protection and access policies using Lake Formation, which enforces those policies across all the AWS services accessing data in your lake. You can configure users and roles and define the data these roles can access, down to the table and column level.

AWS Lake Formation provides a permissions model that is based on a simple grant/revoke mechanism. Lake Formation permissions combine with IAM permissions to control access to data stored in data lakes and to the metadata that describes that data. For more information, see Security and Access Control to Metadata and Data in Lake Formation.

Lake Formation currently supports Server-Side-Encryption on S3 (SSE-S3, AES-265). Lake Formation also supports private endpoints in your VPC and records all activity in AWS CloudTrail, so you have network isolation and auditability.

AWS Lake Formation service is a HIPAA eligible service.

Summary

This post demonstrated how to find matching records in a patient database using the Lake Formation FindMatches ML transform. It allows you to find matches when the records in two datasets don’t share a common identifier or include duplicates. This method helps you find matches between dataset rows when fields don’t match exactly or attributes are missing or corrupted.

You are now ready to start building with Lake Formation and try FindMatches on your data. Please share your feedback and questions in the comments.

 


About the Authors

Dhawalkumar Patel is a Senior Solutions Architect at Amazon Web Services. He has worked with organizations ranging from large enterprises to mid-sized startups on problems related to distributed computing, and Artificial Intelligence. He is currently focused on Machine Learning and Serverless technologies

 

 

 

Ujjwal Ratan is a principal machine learning specialist solution architect in the Global Healthcare and Lifesciences team at Amazon Web Services. He works on the application of machine learning and deep learning to real world industry problems like medical imaging, unstructured clinical text, genomics, precision medicine, clinical trials and quality of care improvement. He has expertise in scaling machine learning/deep learning algorithms on the AWS cloud for accelerated training and inference. In his free time, he enjoys listening to (and playing) music and taking unplanned road trips with his family.

 

 

 

 

Highlight the breadth of your data and analytics technical expertise with new AWS Certification beta

Post Syndicated from Beth Shepherd original https://aws.amazon.com/blogs/big-data/highlight-the-breadth-of-your-data-and-analytics-technical-expertise-with-new-aws-certification-beta/

AWS offers the broadest set of analytic tools and engines that analyzes data using open formats and open standards. To validate expertise with AWS data analytics solutions, builders can now take the beta for the AWS Certified Data Analytics — Specialty certification.

The AWS Certified Data Analytics — Specialty certification validates technical expertise with designing, building, securing, and maintaining analytics solutions on AWS. This certification first launched in 2017 as AWS Certified Big Data — Specialty. The new name highlights the breadth of the data and analytics technical skills and experience validated by the certification. Candidates who take and pass the beta exam available for registration now or the general availability release in April 2020 will earn a certification with the new name.

The new exam version includes updated content across categories from collection to visualization. View the exam guide to learn more about what’s on the exam.

This is the only AWS Certification to specifically focus on data analytics expertise. This certification demonstrates your ability to design and implement analytics solutions that provide insight through visualizing data with the appropriate security measures and automation in place.

You can take the AWS Certified Data Analytics — Specialty beta exam at testing centers worldwide through January 10, 2020, and onsite at re:Invent 2019. Space is limited, so register today. The beta exam is available in English for 150 USD, which is 50% off the standard pricing for Specialty-level certifications. Results for the beta exam will be available approximately 90 days after the end of the beta period. If you miss the beta, the standard version is expected in April 2020.


About the Authors

Beth Shepherd is the Product Marketing Manager for AWS Certification. She joined Amazon in 2019 and is based in Boston.

 

 

 

 

Bill Baldwin is the Head of AWS Data Days and the WW Tech Leader AWS Databases. He has been with Amazon since 2016 and is based in Atlanta.

 

 

 

Extract, Transform and Load data into S3 data lake using CTAS and INSERT INTO statements in Amazon Athena

Post Syndicated from Pathik Shah original https://aws.amazon.com/blogs/big-data/extract-transform-and-load-data-into-s3-data-lake-using-ctas-and-insert-into-statements-in-amazon-athena/

Amazon Athena is an interactive query service that makes it easy to analyze the data stored in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. You can reduce your per-query costs and get better performance by compressing, partitioning, and converting your data into columnar formats. To learn more about best practices to boost query performance and reduce costs, see Top 10 Performance Tuning Tips for Amazon Athena.

Overview

This blog post discusses how to use Athena for extract, transform and load (ETL) jobs for data processing. This example optimizes the dataset for analytics by partitioning it and converting it to a columnar data format using Create Table as Select (CTAS) and INSERT INTO statements.

CTAS statements create new tables using standard SELECT queries to filter data as required. You can also partition the data, specify compression, and convert the data into columnar formats like Apache Parquet and Apache ORC using CTAS statements. As part of the execution, the resultant tables and partitions are added to the AWS Glue Data Catalog, making them immediately available for subsequent queries.

INSERT INTO statements insert new rows into a destination table based on a SELECT query statement that runs on a source table. If the source table’s underlying data is in CSV format and destination table’s data is in Parquet format, then INSERT INTO can easily transform and load data into destination table’s format. CTAS and INSERT INTO statements can be used together to perform an initial batch conversion of data as well as incremental updates to the existing table.

Here is an overview of the ETL steps to be followed in Athena for data conversion:

  1. Create a table on the original dataset.
  2. Use a CTAS statement to create a new table in which the format, compression, partition fields and location of the new table can be specified.
  3. Add more data into the table using an INSERT INTO statement.

This example uses a subset of NOAA Global Historical Climatology Network Daily (GHCN-D), a publicly available dataset on Amazon S3, in this example.

This subset of data is available at the following S3 location:

s3://aws-bigdata-blog/artifacts/athena-ctas-insert-into-blog/
Total objects: 41727 
Size of CSV dataset: 11.3 GB
Region: us-east-1

Procedure

Follow these steps to use Athena for an ETL job.

Create a table based on original dataset

The original data is in CSV format with no partitions in Amazon S3. The following files are stored at the Amazon S3 location:

2019-10-31 13:06:57  413.1 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0000
2019-10-31 13:06:57  412.0 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0001
2019-10-31 13:06:57   34.4 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0002
2019-10-31 13:06:57  412.2 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0100
2019-10-31 13:06:57  412.7 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0101

Note that the file sizes are pretty small. Merging them into larger files and reducing total number of files would lead to faster query execution. CTAS and INSERT INTO can help achieve this.

To execute queries in the Athena console (preferably in us-east-1 to avoid inter-region Amazon S3 data transfer charges). First, create a database for this demo:

CREATE DATABASE blogdb

Now, create a table from the data above.

CREATE EXTERNAL TABLE `blogdb`.`original_csv` (
  `id` string, 
  `date` string, 
  `element` string, 
  `datavalue` bigint, 
  `mflag` string, 
  `qflag` string, 
  `sflag` string, 
  `obstime` bigint)
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY ',' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://aws-bigdata-blog/artifacts/athena-ctas-insert-into-blog/'

Use CTAS to partition data and convert into parquet format with snappy compression

Now, convert the data to Parquet format with Snappy compression and partition the data on a yearly basis. All these actions are performed using the CTAS statement. For the purpose of this blog, the initial table only includes data from 2015 to 2019. You can add new data to this table using the INSERT INTO command.

The table created in Step 1 has a date field with the date formatted as YYYYMMDD (e.g. 20100104). The new table is partitioned on year. Extract the year value from the date field using the Presto function substr(“date”,1,4).

CREATE table new_parquet
WITH (format='PARQUET', 
parquet_compression='SNAPPY', 
partitioned_by=array['year'], 
external_location = 's3://your-bucket/optimized-data/') 
AS
SELECT id,
         date,
         element,
         datavalue,
         mflag,
         qflag,
         sflag,
         obstime,
         substr("date",1,4) AS year
FROM original_csv
WHERE cast(substr("date",1,4) AS bigint) >= 2015
        AND cast(substr("date",1,4) AS bigint) <= 2019

Once the query is successful, check the Amazon S3 location specified in the CTAS statement above. You should be able to see partitions and parquet files in each of these partitions, as shown in the following examples:

  1. Partitions:
    $ aws s3 ls s3://your-bucket/optimized-data/
                               PRE year=2015/
                               PRE year=2016/
                               PRE year=2017/
                               PRE year=2018/
                               PRE year=2019/

  2. Parquet files:
    $ aws s3 ls s3://your-bucket/optimized-data/ --recursive --human-readable | head -5
    
    2019-10-31 14:51:05    7.3 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_1be48df2-3154-438b-b61d-8fb23809679d
    2019-10-31 14:51:05    7.0 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_2a57f4e2-ffa0-4be3-9c3f-28b16d86ed5a
    2019-10-31 14:51:05    9.9 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_34381db1-00ca-4092-bd65-ab04e06dc799
    2019-10-31 14:51:05    7.5 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_354a2bc1-345f-4996-9073-096cb863308d
    2019-10-31 14:51:05    6.9 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_42da4cfd-6e21-40a1-8152-0b902da385a1

Add more data into table using INSERT INTO statement

Now, add more data and partitions into the new table created above. The original dataset has data from 2010 to 2019. Since you added 2015 to 2019 using CTAS, add the rest of the data now using an INSERT INTO statement:

INSERT INTO new_parquet
SELECT id,
         date,
         element,
         datavalue,
         mflag,
         qflag,
         sflag,
         obstime,
         substr("date",1,4) AS year
FROM original_csv
WHERE cast(substr("date",1,4) AS bigint) < 2015

List the Amazon S3 location of the new table:

 $ aws s3 ls s3://your-bucket/optimized-data/
                           PRE year=2010/
                           PRE year=2011/
                           PRE year=2012/
                           PRE year=2013/
                           PRE year=2014/
                           PRE year=2015/
                           PRE year=2016/
                           PRE year=2017/
                           PRE year=2018/
                           PRE year=2019/ 

You can see that INSERT INTO is able to determine that “year” is a partition column and writes the data to Amazon S3 accordingly. There is also a significant reduction in the total size of the dataset thanks to compression and columnar storage in the Parquet format:

Size of dataset after parquet with snappy compression - 1.2 GB

You can also run INSERT INTO statements if more CSV data is added to original table. Assume you have new data for the year 2020 added to the original Amazon S3 dataset. In that case, you can run the following INSERT INTO statement to add this data and the relevant partition(s) to the new_parquet table:

INSERT INTO new_parquet
SELECT id,
         date,
         element,
         datavalue,
         mflag,
         qflag,
         sflag,
         obstime,
         substr("date",1,4) AS year
FROM original_csv
WHERE cast(substr("date",1,4) AS bigint) = 2020

Query the results

Now that you have transformed data, run some queries to see what you gained in terms of performance and cost optimization:

First, find the number of distinct IDs for every value of the year:

    1. Query on the original table:
      SELECT substr("date",1,4) as year, 
             COUNT(DISTINCT id) 
      FROM original_csv 
      GROUP BY 1 ORDER BY 1 DESC

    2. Query on the new table:
      SELECT year, 
        COUNT(DISTINCT id) 
      FROM new_parquet 
      GROUP BY  1 ORDER BY 1 DESC

      Original table

      New table

      Savings

      Run timeData scannedCost

      Run

      Time

      Data

      Scanned

      Cost
      16.88 seconds11.35 GB$0.05673.79 seconds428.05 MB$0.00214577.5% faster and 96.2% cheaper

       

Next, calculate the average maximum temperature (Celsius), average minimum temperature (Celsius), and average rainfall (mm) for the Earth in 2018:

      1. Query on the original table:
        SELECT element, round(avg(CAST(datavalue AS real)/10),2) AS value
        FROM original_csv
        WHERE element IN ('TMIN', 'TMAX', 'PRCP') AND substr("date",1,4) = '2018'
        GROUP BY  1

      1. Query on the new table:
        SELECT element, round(avg(CAST(datavalue AS real)/10),2) AS value
        FROM new_parquet 
        WHERE element IN ('TMIN', 'TMAX', 'PRCP') and year = '2018'
        GROUP BY  1

        Original tableNew tableSavings
        Run timeData scannedCost

        Run

        Time

        Data

        Scanned

        Cost
        18.65 seconds11.35 GB$0.05671.92 seconds68.08 MB$0.00034590% faster and 99.4% cheaper

         

Conclusion

This post showed you how to perform ETL operations using CTAS and INSERT INTO statements in Athena. You can perform the first set of transformations using a CTAS statement. When new data arrives, use an INSERT INTO statement to transform and load data to the table created by the CTAS statement. Using this approach, you converted data to the Parquet format with Snappy compression, converted a non-partitioned dataset to a partitioned dataset, reduced the overall size of the dataset and lowered the costs of running queries in Athena.

 


About the Author

 Pathik Shah is a big data architect for Amazon EMR at AWS.

 

 

 

 

 

Connect Amazon Athena to your Apache Hive Metastore and use user-defined functions

Post Syndicated from Janak Agarwal original https://aws.amazon.com/blogs/big-data/connect-amazon-athena-to-your-apache-hive-metastore-and-use-user-defined-functions/

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. This post details the two new preview features that you can start using today: connecting to Apache Hive Metastore and using user-defined functions.

Connecting Athena to your Apache Hive Metastore

Several customers use the Hive Metastore as a common metadata catalog for their big data environments. Such customers run Apache Spark, Presto, and Apache Hive on Amazon EC2 and Amazon EMR clusters with a self-hosted Hive Metastore as the common catalog. AWS also offers the AWS Glue Data Catalog, which is a fully managed catalog and drop-in replacement for the Hive Metastore. With the release as of this writing, you can now use the Hive Metastore in addition to the Data Catalog with Athena. Athena now allows you to connect to multiple Hive Metastores along with existing Data Catalog.

To connect to a self-hosted Hive Metastore, you need a metastore connector. You can download a reference implementation of this connector, which runs as a Lambda function in your account. The current version of the implementation supports only SELECT queries. DDL support is limited to basic metadata syntax. For more information, see please check Considerations and Limitations of this feature. You can also write a Hive Metastore connector using the previous reference implementation as an example. You can deploy your implementation as a Lambda function, and subsequently use it with Athena. For more information about the feature, see the Using Athena Data Connector for External Hive Metastore (Preview) documentation.

Using user-defined functions in Athena

Athena also offers preview support for scalar user-defined functions (UDFs). UDFs enable you to write functions and invoke them in SQL queries. A scalar UDF is applied one row at a time and returns a single column value. Athena invokes your scalar UDF with batches of rows to limit the performance impact associated with making a remote call for the UDF itself.

With the latest release as of this writing, you can use the Athena Query Federation SDK to define your functions and invoke them inline in SQL queries. You can now compress and decompress row values, scrub personally identifiable information (PII) from columns, transform dates to a different format, read image metadata, and execute proprietary custom code in your queries. You can also execute UDFs in both the SELECT and FILTER phase of the query and invoke multiple UDFs in the same query.

For more information about UDFs, see our documentation. For common UDF example implementations, see the GitHub repo. For more information about writing functions using the Athena Query Federation SDK, please visit this link.

Testing the preview features

All Athena queries originating from the workgroup AmazonAthenaPreviewFunctionality are considered Preview test queries.

Create a new workgroup AmazonAthenaPreviewFunctionality using Athena APIs or the Athena console. For more information, see Create a Workgroup.

The following considerations are important when using preview features.

Do not edit the workgroup name. You can edit other workgroup properties, such as enabling Amazon CloudWatch metrics and requester pays. You can use the Athena console, JDBC/ODBC drivers, or APIs to submit your test queries. Specify the workgroup AmazonAthenaPreviewFunctionality when you submit test queries.

Preview functionality is available only in the us-east-1 Region. If you use Athena in any other Region and submit queries using AmazonAthenaPreviewFunctionality, your query fails. Cross-Region calls are not supported in preview mode.

During the preview, you do not incur charges for the data scanned from federated data sources. However, you are charged standard Athena rates for data scanned from S3. Additionally, you are charged standard rates for the AWS services that you use with Athena, such as S3, AWS Lambda, AWS Glue, Amazon SageMaker, and AWS SAM. For example, you are charged S3 rates for storage, requests, and inter-Region data transfer. By default, query results are stored in an S3 bucket of your choice and are billed at standard S3 rates. If you use Lambda, you are charged based on the number of requests for your functions and the duration (the time it takes for your code to execute).

It is not recommended to onboard your production workload to AmazonAthenaPreviewFunctionality.

Query performance may vary between the preview workgroup and the other workgroups in your account. Additionally, new features and bug fixes may not be backwards compatible.

Summary

In summary, we introduced Athena’s two new features that released today in Preview.

Customers who use the Apache Hive Metastore for metadata management, and were previously unable to use Athena, can now connect their Hive Metastore to Athena to run queries. Also, customers can now use Athena’s Query Federation SDK to define and invoke their own functions in their SQL queries in Athena.

Both these features are available in Preview in the AWS us-east-1 region. Begin your Preview now by following these steps in the Athena FAQ. We welcome your feedback at [email protected]

 


About the Author

Janak Agarwal is a product manager for Athena at AWS.

 

 

 

Prepare data for model-training and invoke machine learning models with Amazon Athena

Post Syndicated from Janak Agarwal original https://aws.amazon.com/blogs/big-data/prepare-data-for-model-training-and-invoke-machine-learning-models-with-amazon-athena/

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

Amazon Athena has announced a public preview of a new feature that provides an easy way to run inference using machine learning (ML) models deployed on Amazon SageMaker directly from SQL queries. The ability to use ML models in SQL queries makes complex tasks such as anomaly detection, customer cohort analysis, and sales predictions as simple as writing a SQL query.

Overview

Users now have the flexibility to use ML models trained on proprietary datasets or to use out-of-the-box, pre-trained ML models deployed on Amazon SageMaker. You can now easily invoke a variety of ML algorithms across text analysis, statistical tools, and any algorithm deployed on Amazon SageMaker. There is no additional setup required. Users can invoke ML models in SQL queries from the Athena console, Athena APIs, and through use of Athena’s JDBC and ODBC drivers in tools such as Amazon QuickSight. Within seconds, analysts can run inferences to forecast sales, detect suspicious logins to an application, or categorize all users into customer cohorts.

During the preview, you are not charged for the data scanned from federated data sources. However, you are charged standard Athena rates for data scanned from Amazon S3. Additionally, you are charged standard rates for the AWS services that you use with Athena, such as Amazon S3, AWS Lambda, AWS Glue, Amazon SageMaker, and AWS Serverless Application Repository. For example, you are charged S3 rates for storage, requests, and inter-region data transfer. By default, query results are stored in an S3 bucket of your choice and are also billed at standard Amazon S3 rates. If you use AWS Lambda, you are charged based on the number of requests for your functions and the duration, the time it takes for your code to execute.

Athena has also added support for federated queries and user-defined functions (UDFs). This blog demonstrates how to:

  1. Ingest, clean, and transform a dataset using Athena SQL queries to ready it for model training purposes
  2. Use Amazon SageMaker to train the model
  3. Use Athena’s ML inference capabilities to run prediction queries in SQL using the model created in Step 2

For illustration purposes, consider an imaginary video game company whose goal is to predict which games will be successful.

Assume that the video game company has a raw dataset in Amazon S3 with the schema represented by the following SQL statement.

CREATE EXTERNAL TABLE `video_game_data`.`video_games`(
  `gameid` int COMMENT '', 
  `name` string COMMENT '', 
  `platform` string COMMENT '',
  `year_of_release` int COMMENT '',
  `genre` string COMMENT '',
  `publisher` string COMMENT '',
  `sales` int COMMENT '',
  `critic_score` int COMMENT '',
  `critic_count` int COMMENT '',
  `user_score` int COMMENT '',
  `user_count` int COMMENT '',
  `developer` string COMMENT '',
  `rating` string COMMENT '')
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.serde2.OpenCSVSerde' 
WITH SERDEPROPERTIES ( 
  'separatorChar'=',') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://video-game-demo/'
TBLPROPERTIES (
  'has_encrypted_data'='false')

A screenshot of the sample dataset in Athena follows.

Diagram 1: Screenshot of the sample dataset in our example.

Data Analysis and Cleaning

To prepare the dataset for training the model using Amazon SageMaker, you need to select the relevant data required to train the ML model. You don’t need columns that are not relevant for training, such as game_id, name, year_of_release, sales, critic_count, user_count, and developer.

Additionally, this example defines success as the scenario in which sales of a particular game exceed 1,000,000. You can create a success column that is either 0 (denoting no success) or 1 (denoting success) in the dataset to reflect success.

A sample query and screenshot showing the success column follow:

SELECT platform,
         genre,
         critic_score,
         user_score,
         rating,
         (sales > 1000000) AS success
FROM "video_game_data"."video_games";

Diagram 2: Screenshot of our sample dataset with the columns that are irrelevant for training our ML model deleted.

ML models typically do not work well with String Enums. To optimize performance and improve model accuracy, convert the platform, genre, publisher, and rating to integer Enum constants. You can use Athena’s UDF functionality to accomplish this.

The following sample code creates an AWS Lambda function that you can invoke in the Athena SQL query as UDF.

public class AthenaUDFHandler
        extends UserDefinedFunctionHandler
{
    private static final String SOURCE_TYPE = "athena_common_udfs";
    private final ImmutableMap<String, Integer> genreMap;

    public AthenaUDFHandler()
    {
        super(SOURCE_TYPE);
        genreMap = ImmutableMap.<String, Integer>builder()
                .put("Action", 1)
                .put("Adventure", 2)
                .put("Fighting", 3)
                .put("Misc", 4)
                .put("Platform", 5)
                .put("Puzzle", 6)
                .put("Racing", 7)
                .put("Role-Playing", 8)
                .put("Shooter", 9)
                .put("Simulation", 10)
                .put("Sports", 11)
                .put("Strategy", 12).build();
    }

    public Integer normalize_genre(String genre)
    {
        //Implement your code here
        return genreMap.getOrDefault(genre, 0);    }

    public Integer normalize_rating(String rating)
    {
        //Implement your code here
        return rating.hascode();
    }

    public Integer normalize_platform(String platform)
    {
        //Implement your code here
        return platform.hashcode();
    }
}

A sample Athena Query that normalizes the dataset using the functions created above follows:

USING 
FUNCTION normalize_genre(value VARCHAR) RETURNS VARCHAR TYPE LAMBDA_INVOKE
WITH (lambda_name='VideoNormalization'),

FUNCTION normalize_platform(value VARCHAR) RETURNS VARCHAR TYPE LAMBDA_INVOKE
WITH (lambda_name='VideoNormalization')

FUNCTION normalize_rating(value VARCHAR) RETURNS VARCHAR TYPE LAMBDA_INVOKE
WITH (lambda_name='VideoNormalization')

SELECT CAST((sales > 1000000) AS integer) AS success,
         normalize_platform('platform', platform) AS platform, 
         normalize_genre('genre', genre) AS genre, 
         critic_score, 
         user_score, 
         normalize_rating('rating', rating) AS rating
FROM video_game_data.video_games;

The following screenshot shows the normalized dataset:

Diagram 3: Screenshot after defining, and invoking our UDFs that help to normalize our dataset.

Creating the machine learning model

Next, create the ML model in Amazon SageMaker.

Open an Amazon Sagemaker Notebook instance with Permissions to Execute Athena Queries and execute the following sample Athena query. The query first imports the required Amazon SageMaker libraries and PyAthena into your Amazon SageMaker Notebook, executes an Athena query to retrieve the training dataset, invokes the training algorithm on this dataset, and deploys the resulting model on the selected Amazon SageMaker instance. PyAthena allows you to invoke Athena SQL queries from within your Amazon SageMaker Notebook.

Note that you can also train your model using a different algorithm and evaluate your model. For more details on using Amazon SageMaker, please visit the SageMaker documentation.

import sys
!{sys.executable} -m pip install PyAthena
import boto3, os
import sagemaker
import pandas as pd 

from sagemaker import get_execution_role
from pyathena import connect 

# Create Traing Dataset for inference
athena_output_bucket = 'athena-results'
region = 'us-east-1'
connection = connect(s3_staging_dir='s3://{}/'.format(athena_output_bucket, region_name=region, work_group='AmazonAthenaPreviewFunctionality') 

results = pd.read_sql("USING FUNCTION normalize_genre(value VARCHAR) RETURNS INTEGER TYPE LAMBDA_INVOKE WITH (lambda_name='VideoNormalization'), 
FUNCTION normalize_platform(value VARCHAR) RETURNS INTEGER TYPE LAMBDA_INVOKE WITH (lambda_name='VideoNormalization'), 
FUNCTION normalize_rating(value VARCHAR) RETURNS INTEGER TYPE LAMBDA_INVOKE WITH (lambda_name='VideoNormalization') 
SELECT CAST((sales > 1000000) AS integer) AS success, normalize_platform(platform) AS platform, normalize_genre(genre) AS genre, critic_score, user_score, normalize_rating(rating) AS rating FROM video_game_data.video_games", connection) 

training_data_output_location = 'video-games-sales-prediction'

results.to_csv('training_data.csv', index=False, header=False)
boto3.Session().resource('s3').Bucket(training_data_output_location).Object(os.path.join(prefix, 'train/train.csv')).upload_file('training_data.csv')
s3_input = sagemaker.s3_input(s3_data='s3://{}/train'.format(training_data_output_location), content_type='csv')

#Model Training
role = get_execution_role()
container = '811284229777.dkr.ecr.us-east-1-1.amazonaws.com/xgboost:latest'

sess = sagemaker.Session()
xgb = sagemaker.estimator.Estimator(container,
                                    role, 
                                    train_instance_count=1, 
                                    train_instance_type='ml.m4.xlarge',
                                    output_path='s3://{}/model'.format(training_data_output_location),
                                    sagemaker_session=sess)
xgb.set_hyperparameters(max_depth=5,
                        eta=0.2,
                        gamma=4,
                        min_child_weight=6,
                        subsample=0.8,
                        silent=0,
                        num_round=100)

xgb.fit({'train': s3_input})

# Model Deployment
xgb_predictor = xgb.deploy(initial_instance_count=1,instance_type='ml.m4.xlarge')
xgb_predictor.endpoint_name

Using the model in SQL queries

Having trained the ML model and deployed it on Amazon SageMaker, your next task is to use this model in your Athena SQL queries to predict whether a given video game will be a success.

A sample prediction query follows. You can run this query in your Amazon SageMaker notebook after loading PyAthena, or using the Athena Console. You can also submit this query using Athena’s APIs or using the Athena Preview JDBC driver.

#Prediction

USING FUNCTION predict(platform int, genre int, critic_score int, user_score int, rating int) returns DOUBLE type SAGEMAKER_INVOKE_ENDPOINT
WITH (sagemaker_endpoint='xgboost-2019-11-22-00-52-22-742'),

FUNCTION normalize_genre(value VARCHAR) RETURNS int TYPE LAMBDA_INVOKE
WITH (lambda_name='VideoNormalization'),

FUNCTION normalize_platform(value VARCHAR) RETURNS int TYPE LAMBDA_INVOKE
WITH (lambda_name='VideoNormalization'),

FUNCTION normalize_rating(value VARCHAR) RETURNS int TYPE LAMBDA_INVOKE
WITH (lambda_name='VideoNormalization')

SELECT predict(platform,
         genre,
         critic_score,
         user_score,
         rating),
         name
FROM 
    (SELECT name,
         normalize_platform(platform) AS platform, 
         normalize_genre(genre) AS genre, 
         critic_score, 
         user_score, 
         normalize_rating(rating) AS rating
FROM video_game_data.video_games);

Conclusion

In this blog, we introduced Athena’s new functionality that enables you to invoke your machine learning models in SQL queries to run inference. Using an example, we saw how to use Athena’s UDF functionality. We defined and invoked our functions to ready our dataset for machine learning model training purposes. To train our model, we used PyAthena to invoke Athena SQL queries in Amazon SageMaker and finally, invoked our ML model in SQL query to run inference.

Amazon Athena’s ML functionality is available today in preview in the us-east-1 (N. Virginia) region. Begin your preview now by following these Athena FAQ.
To learn more about the feature, please visit our querying ML model documentation.

 


About the Authors

Janak Agarwal is a product manager for Athena at AWS.

 

 

 

 

 Ronak Shah is a software development engineer for Athena at AWS.

 

 

 

 

Query any data source with Amazon Athena’s new federated query

Post Syndicated from Janak Agarwal original https://aws.amazon.com/blogs/big-data/query-any-data-source-with-amazon-athenas-new-federated-query/

Organizations today use data stores that are the best fit for the applications they build. For example, for an organization building a social network, a graph database such as Amazon Neptune is likely the best fit when compared to a relational database. Similarly, for workloads that require flexible schema for fast iterations, Amazon DocumentDB (with MongoDB compatibility) is likely a better fit. As Werner Vogels, CTO and VP of Amazon.com, said: “Seldom can one database fit the needs of multiple distinct use cases.” Developers today build highly distributed applications using a multitude of purpose-built databases. In a sense, developers are doing what they do best: dividing complex applications into smaller pieces, which allows them to choose the right tool for the right job. As the number of data stores and applications increase, running analytics across multiple data sources can become challenging.

Today, we are happy to announce a Public Preview of Amazon Athena support for federated queries.

Federated Query in Amazon Athena

Federated query is a new Amazon Athena feature that enables data analysts, engineers, and data scientists to execute SQL queries across data stored in relational, non-relational, object, and custom data sources. With Athena federated query, customers can submit a single SQL query and analyze data from multiple sources running on-premises or hosted on the cloud. Athena executes federated queries using Data Source Connectors that run on AWS Lambda. AWS has open-sourced Athena Data Source connectors for Amazon DynamoDB, Apache HBase, Amazon DocumentDB, Amazon Redshift, Amazon CloudWatch Logs, AWS CloudWatch Metrics, and JDBC-compliant relational data sources such MySQL, and PostgreSQL under the Apache 2.0 license. Customers can use these connectors to run federated SQL queries in Athena across these data sources. Additionally, using Query Federation SDK, customers can build connectors to any proprietary data source and enable Athena to run SQL queries against the data source. Since connectors run on Lambda, customers continue to benefit from Athena’s serverless architecture and do not have to manage infrastructure or scale for peak demands.

Running analytics on data spread across applications can be complex and time consuming. Application developers pick a data store based on the application’s primary function. As a result, data required for analytics is often spread across relational, key-value, document, in-memory, search, graph, time-series, and ledger databases. Event and application logs are often stored in object stores such as Amazon S3. To analyze data across these sources, analysts have to learn new programming languages and data access constructs, and build complex pipelines to extract, transform and load into a data warehouse before they can easily query the data. Data pipelines introduce delays and require custom processes to validate data accuracy and consistency across systems. Moreover, when source applications are modified, data pipelines have to be updated and data has to be re-stated for corrections. Federated queries in Athena eliminate this complexity by allowing customers to query data in-place wherever it resides. Analysts can use familiar SQL constructs to JOIN data across multiple data sources for quick analysis or use scheduled SQL queries to extract and store results in Amazon S3 for subsequent analysis.

The Athena Query Federation SDK extends the benefits of federated querying beyond AWS provided connectors. With fewer than 100 lines of code, customers can build connectors to proprietary data sources and share them across the organization. Connectors are deployed as Lambda functions and registered for use in Athena as data sources. Once registered, Athena invokes the connector to retrieve databases, tables, and columns available from the data source. A single Athena query can span multiple data sources. When a query is submitted against a data source, Athena invokes the corresponding connector to identify parts of the tables that need to be read, manages parallelism, and pushes down filter predicates. Based on the user submitting the query, connectors can provide or restrict access to specific data elements. Connectors use Apache Arrow as the format for returning data requested in a query, which enables connectors to be implemented in languages such as C, C++, Java, Python, and Rust. Since connectors are executed in Lambda, they can be used to access data from any data source on the cloud or on-premises that is accessible from Lambda.

Data Source Connectors

You can run SQL queries against new data sources by registering the data source with Athena. When registering the data source, you associate an Athena Data Connector specific to the data source. You can use AWS-provided open-source connectors, build your own, contribute to existing connectors, or use community or marketplace-built connectors. Depending on the type of data source, a connector manages metadata information; identifies specific parts of the tables that need to be scanned, read or filtered; and manages parallelism. Athena Data Source Connectors run as Lambda functions in your account.

Each data connector is composed of two Lambda functions, each specific to a data source: one for metadata and one for record reading. The connector code is open-source and should be deployed as Lambda functions. You can also deploy Lambda functions to AWS Serverless Application Repository and use them with Athena. Once the Lambda functions are deployed, they produce a unique Amazon Resource Name or ARN. You must register these ARNs with Athena. Registering an ARN allows Athena to understand with which Lambda function to talk during query execution. Once both the ARNs are registered, you can query the registered data source.

When a query runs on a federated data source, Athena fans out the Lambda invocations reading metadata and data in parallel. The number of parallel invocations depends on the Lambda concurrency limits enforced in your account. For example, if you have a limit of 300 concurrent Lambda invocations, Athena can invoke 300 parallel Lambda functions for record reading. For two queries running in parallel, Athena invokes twice the number of concurrent executions.

Diagram 1 shows how Athena Federated Queries work. When you submit a federated query to Athena, Athena will invoke the right Lambda-based connector to connect with your Data Source. Athena will fan out Lambda invocations to read metadata and data in parallel.

Diagram 1: Athena Federated Query Architecture

Example

This blog post demonstrates how data analysts can query data in multiple databases for faster analysis in one SQL query. For illustration purposes, consider an imaginary e-commerce company whose architecture leverages the following purpose-built data sources:

  1. Payment transaction records stored in Apache HBase running on AWS.
  2. Active orders, defined as customer orders not yet delivered, stored in Redis so that the processing engine can retrieve such orders quickly.
  3. Customer data such as email addresses, shipping information, etc., stored in DocumentDB.
  4. Product Catalog stored in Aurora.
  5. Order Processor’s log events housed in Amazon CloudWatch Logs.
  6. Historical orders and analytics in Redshift.
  7. Shipment tracking data in DynamoDB.
  8. A fleet of drivers performing last-mile delivery while using IoT-enabled tablets.

Customers of this imaginary e-commerce company have a problem. They have complained that their orders are stuck in a weird state. Some orders show as pending even though they have actually been delivered while other orders show as delivered but have not actually been shipped.

The company management has tasked the customer service analysts to determine the true state of all orders.

Using Athena federated queries

Using Athena’s query federation, the analysts can quickly analyze records from different sources. Additionally, they can setup a pipeline that can extract data from these sources, store them in Amazon S3 and use Athena to query them.

Diagram 2 shows Athena invoking Lambda-based connectors to connect with data sources that are on On Premises and in Cloud in the same query. In this diagram, Athena is scanning data from S3 and executing the Lambda-based connectors to read data from HBase in EMR, Dynamo DB, MySQL, RedShift, ElastiCache (Redis) and Amazon Aurora.

Diagram 2: Federated Query Example.

 

Analysts can register and use the following connectors found in this repository and run a query that:

  1. Grabs all active orders from Redis. (see athena-redis)
  2. Joins against any orders with ‘WARN’ or ‘ERROR’ events in Cloudwatch logs by using regex matching and extraction. (see athena-cloudwatch)
  3. Joins against our EC2 inventory to get the hostname(s) and status of the Order Processor(s) that logged the ‘WARN’ or ‘ERROR’. (see athena-cmdb)
  4. Joins against DocumentDB to obtain customer contact details for the affected orders. (see athena-docdb)
  5. Joins against a scatter-gather query sent to the Driver Fleet via Android Push notification. (see athena-android)
  6. Joins against DynamoDB to get shipping status and tracking details. (see athena-dynamodb)
  7. Joins against HBase to get payment status for the affected orders. (see athena-hbase)
  8. Joins against the advertising conversion data in BigQuery to see which promotions need to be applied if a re-order is needed. (see athena-bigquery)

Data Source Connector Registration

Analysts can register a data source connector using the Connect data source Flow in the Athena Query Editor.

  1. Choose Connect data source or Data sources on the Query Editor.
  2. Select the data source to which you want to connect, as shown in the following screenshot. You can also choose to write your own data source connector using the Query Federation SDK.
  3. Follow the rest of the steps in the UX to complete the registration. They involve configuring the connector function for your data source (as shown in the following screenshot), selecting a Name as the Catalog Name to use in your query, and providing a description.

Sample Analyst Query

Once the registration of the data source connectors is complete, the customer service analyst can write the following sample query to identify the affected orders in one SQL query, thus increasing the organization’s business velocity.

Below you’ll find a video demonstration of a sample federated query:

WITH logs 
     AS (SELECT log_stream, 
                message                                          AS 
                order_processor_log, 
                Regexp_extract(message, '.*orderId=(\d+) .*', 1) AS orderId, 
                Regexp_extract(message, '(.*):.*', 1)            AS log_level 
         FROM 
     "lambda:cloudwatch"."/var/ecommerce-engine/order-processor".all_log_streams 
         WHERE  Regexp_extract(message, '(.*):.*', 1) != 'WARN'), 
     active_orders 
     AS (SELECT * 
         FROM   redis.redis_db.redis_customer_orders), 
     order_processors 
     AS (SELECT instanceid, 
                publicipaddress, 
                state.NAME 
         FROM   awscmdb.ec2.ec2_instances), 
     customer 
     AS (SELECT id, 
                email 
         FROM   docdb.customers.customer_info), 
     addresses 
     AS (SELECT id, 
                is_residential, 
                address.street AS street 
         FROM   docdb.customers.customer_addresses),
     drivers
     AS ( SELECT name as driver_name, 
                 result_field as driver_order, 
                 device_id as truck_id, 
                 last_updated 
         FROM android.android.live_query where query_timeout = 5000 and query_min_results=5),
     impressions 
     AS ( SELECT path as advertisement, 
                 conversion
         FROM bigquery.click_impressions.click_conversions),
     shipments 
     AS ( SELECT order_id, 
                 shipment_id, 
                 from_unixtime(cast(shipped_date as double)) as shipment_time,
                 carrier
        FROM lambda_ddb.default.order_shipments),
     payments
     AS ( SELECT "summary:order_id", 
                 "summary:status", 
                 "summary:cc_id", 
                 "details:network" 
        FROM "hbase".hbase_payments.transactions)

SELECT _key_            AS redis_order_id, 
       customer_id, 
       customer.email   AS cust_email, 
       "summary:cc_id"  AS credit_card,
       "details:network" AS CC_type,
       "summary:status" AS payment_status,
       impressions.advertisement as advertisement,
       status           AS redis_status, 
       addresses.street AS street_address, 
       shipments.shipment_time as shipment_time,
       shipments.carrier as shipment_carrier,
       driver_name     AS driver_name,
       truck_id       AS truck_id,
       last_updated AS driver_updated,
       publicipaddress  AS ec2_order_processor, 
       NAME             AS ec2_state, 
       log_level, 
       order_processor_log 
FROM   active_orders 
       LEFT JOIN logs 
              ON logs.orderid = active_orders._key_ 
       LEFT JOIN order_processors 
              ON logs.log_stream = order_processors.instanceid 
       LEFT JOIN customer 
              ON customer.id = customer_id 
       LEFT JOIN addresses 
              ON addresses.id = address_id 
       LEFT JOIN drivers 
              ON drivers.driver_order = active_orders._key_ 
       LEFT JOIN impressions
              ON impressions.conversion = active_orders._key_
       LEFT JOIN shipments
              ON shipments.order_id = active_orders._key_
       LEFT JOIN payments
              ON payments."summary:order_id" = active_orders._key_

Additionally, Athena writes all query results in an S3 bucket that you specify in your query. If your use-case mandates you to ingest data into S3, you can use Athena’s query federation capabilities statement to register your data source, ingest to S3, and use CTAS statement or INSERT INTO statements to create partitions and metadata in Glue catalog as well as convert data format to a supported format.

Conclusion

In this blog, we introduced Athena’s new federated query feature. Using an example, we saw how to register and use Athena data source connectors to write federated queries to connect Athena to any data source accessible by AWS Lambda from your account. Finally, we learnt that we can use federated queries to not only enable faster analytics, but also to extract, transform and load data into your datalake in S3.

Athena federated query is available in Preview in the us-east-1 (N. Virginia) region. Begin your Preview now by following these steps in the Athena FAQ.
To learn more about the feature, please see documentation the Connect to a Data Source documentation here.
To get started with using an existing connector, please follow this Connect to a Data Source guide.
To learn how to build your own data source connector using the Athena Query Federation SDK, please visit this Athena example in GitHub .

 


About the Author

Janak Agarwal is a product manager for Athena at AWS.

 

 

 

Simplify ETL data pipelines using Amazon Athena’s federated queries and user-defined functions

Post Syndicated from Manny Wald original https://aws.amazon.com/blogs/big-data/simplify-etl-data-pipelines-using-amazon-athenas-federated-queries-and-user-defined-functions/

Amazon Athena recently added support for federated queries and user-defined functions (UDFs), both in Preview. See Query any data source with Amazon Athena’s new federated query for more details. Jornaya helps marketers intelligently connect consumers who are in the market for major life purchases such as homes, mortgages, cars, insurance, and education.

Jornaya collects data from a variety of data sources. Our main challenge is to clean and ingest this data into Amazon S3 to enable access for data analysts and data scientists.

Legacy ETL and analytics solution

In 2012, Jornaya moved from using from MySQL to Amazon DynamoDB for our real-time database. DynamoDB allowed a company of our size to receive the benefits of create, read, update, and delete (CRUD) operations with predictable low latency, high availability, and excellent data durability without the administrative burden of having to manage the database. This allowed our technology team to focus on solving business problems and rapidly building new products that we could bring to market.

Running analytical queries on NoSQL databases can be tough. We decided to extract data from DynamoDB and run queries on it. This was not simple.

Here are a few methods we use at Jornaya to get data from DynamoDB:

  • Leveraging EMR: We temporarily provision additional read capacity with DynamoDB tables and create transient EMR clusters to read data from DynamoDB and write to Amazon S3.
    • Our Jenkins jobs trigger pipelines that spin up a cluster, extract data using EMR, and use the Amazon Redshift copy command to load data into Amazon Redshift. This is an expensive process because we use excess read capacity. To lower EMR costs, we use spot instances.
  • Enabling DynamoDB Streams: We use a homegrown Python AWS Lambda function named Dynahose to consume data from the stream and write it to a Amazon Kinesis Firehose delivery stream. We then configure the Kinesis Firehose delivery stream to write the data to an Amazon S3 location. Finally, we use another homegrown Python Lambda function named Partition to ensure that the partitions corresponding to the locations of the data written to Amazon S3 are added to the AWS Glue Data Catalog so that it can read using tools like AWS Glue, Amazon Redshift Spectrum, EMR, etc.

The process is shown in the following diagram.

We go through such pipelines because we want to ask questions about our operational data in a natural way, using SQL.

Using Amazon Athena to simplify ETL workflows and enable quicker analytics

Athena, a fully managed serverless interactive service for querying data in Amazon S3 using SQL, has been rapidly adopted by multiple departments across our organization. For our use case, we did not require an always-on EMR cluster waiting for an analytics query. Athena’s serverless nature is perfect for our use case. Along the way we discovered that we could use Athena to run extract, transform, and load (ETL) jobs.

However, Athena is a lot more than an interactive service for querying data in Amazon S3. We also found Athena to be a robust, powerful, reliable, scalable, and cost-effective ETL tool. The ability to schedule SQL statements, along with support for Create Table As Select (CTAS) and INSERT INTO statements, helped us accelerate our ETL workloads.

Before Athena, business users in our organization had to rely on engineering resources build pipelines. The release of Athena changed that in a big way. Athena enabled software engineers and data scientists to work with data that would have otherwise been inaccessible or required help from data engineers.

With the addition of query federation and UDFs to Athena, Jornaya has been able to replace many of our unstable data pipelines with Athena to extract and transform data from DynamoDB and write it to Amazon S3. The product and engineering teams at Jornaya noticed our reduced ETL failure rates. The finance department took note of lower EMR and DynamoDB costs. And the members of our on-call rotation (as well as their spouses) have been able to enjoy uninterrupted sleep.

For instance, the build history of one ETL pipeline using EMR looked like this (the history of ETL pipeline executions is shown in this chart with the job execution id on the x-axis and the execution time in minutes on the y-axis):

After migrating this pipeline to Athena and using federated queries to query DynamoDB, we were able to access data sources with ease that we simply could not previously with queries like the following:

CREATE TABLE "__TABLE_NAME__"
WITH (
  external_location = '__S3_LOCATION__'
, format = 'PARQUET'
, orc_compression = 'SNAPPY'
, partitioned_by = ARRAY['create_day']
) AS
SELECT DISTINCT
  d.key.s AS device_id
, CAST(d.created.n AS DECIMAL(14, 4)) AS created
, d.token.s AS token
, c.industry AS industry_code
, CAST(CAST(FROM_UNIXTIME(CAST(d.created.n AS DECIMAL(14, 4))) AS DATE) AS VARCHAR) AS create_day
FROM "rdl"."device_frequency_stream" d
  LEFT OUTER JOIN "lambda::dynamodb"."default"."campaigns" c ON c.key = d.campaign_key
WHERE d.insert_ts BETWEEN TIMESTAMP '__PARTITION_START_DATE__' AND TIMESTAMP '__PARTITION_END_DATE__'
  AND d.created.n >= CAST(CAST(TO_UNIXTIME(DATE '__START_DATE__') AS DECIMAL(14, 4)) AS VARCHAR)
  AND d.created.n < CAST(CAST(TO_UNIXTIME(DATE '__END_DATE__') AS DECIMAL(14, 4)) AS VARCHAR);

We achieved a much more performant process with a build history shown in the following diagram:

Conclusion

Using one SQL query, we were able to process data from DynamoDB, convert that data to Parquet, apply Snappy compression, create the correct partitions in our AWS Glue Data Catalog, and ingest data to Amazon S3. Our ETL process execution time was reduced from hours to minutes, the cost was significantly lowered, and the new process is simpler and much more reliable. The new process using Athena for ETL is also future-proof due to extensibility. In case we need to import a dataset from another purpose-built data store that does not have a ready data source connector, we can simply use the data source connector SDK to write our own connector and deploy it in Production—a one-time effort that will cost us barely one day.

Additionally, Athena federated queries have empowered Jornaya to run queries that connect data from not just different data sources, but from different data paradigms! We can run a single query that seamlessly links data from a NoSQL datastore, an RDS RDBMS, and an Amazon S3 data lake.

 


About the Authors

Manny Wald is the technical co-founder at Jornaya. He holds multiple patents and is passionate about the power of the cloud, big data and AI to accelerate the rate at which companies can bring products to market to solve real-world problems. He has a background in BI, application development, data warehousing, web services, and building tools to manage transactional and analytical information. Additionally, Manny created the internet’s first weekly hip hop turntablism mix show, is admitted to practice law at the state and federal levels, and plays basketball whenever he gets the chance.

Janak Agarwal is a product manager for Athena at AWS.

Highlight Critical Insights with Conditional Formatting in Amazon QuickSight

Post Syndicated from Susan Fang original https://aws.amazon.com/blogs/big-data/highlight-critical-insights-with-conditional-formatting-in-amazon-quicksight/

Amazon QuickSight now makes it easier for you to spot the highlights or low-lights in data through conditional formatting. With conditional formatting, you can specify customized text or background colors based on field values in the dataset, using solid or gradient colors. You can also display data values with the supported icons. Using color coding and icons, you can visually explore and analyze data, quickly detect issues, and identify trends, as seen in the following screenshot. These dynamic visual cues also help your dashboard viewers cut through the densely populated data values and get to the attention-worthy insights much faster.

With this release, you can apply conditional formatting to tables, pivot tables and KPI charts. This blog post walks you through how to leverage conditional formatting in each supported chart type in Amazon QuickSight.

Tables

You can apply conditional formatting to tables through the following methods.

Conditionally formatting columns based on data fields

When applying conditional formatting to tables, there are two ways to access the configuration pane. One option is to select Table in an analysis, and choose the caret in the upper right corner of the Table visual. Select Conditional formatting from the dropdown menu, as shown in the following screenshots, and the Conditional formatting configuration pane pops up to the left of the analysis canvas. Choose the + sign to select the target column to which you’d like to apply the formatting; it can be any column present in the table.

For example, select Sales from the dropdown menu to add a condition for the Sales target column.

Alternatively, from the selected table, you can expand the field well on the top of the analysis canvas and choose the Group by or Value target column directly from the columns used in the table. For example, select Sales from the Value well and select Conditional formatting from the dropdown, as seen in the following screenshots. The conditional formatting confirmation pane pops up to the left of the analysis canvas, with a condition already added for the Sales target column.

Next, choose the formatting style from the three available options.

  • Add background color
  • Add text color
  • Add icon

For each target column, you can add any formatting style or any combination of the styles.

Background and text color

Choose Add background color to expand the configuration pane for New background color. In the Sales target column are two Fill type options, Solid or Gradient, with Solid as the default.

You are prompted to decide the base column of the conditions, which can be any field in the dataset, regardless of whether it is present in the Table or not.

Next, define the Aggregation of the base column used in the conditions, which can be any aggregation currently supported in QuickSight, such as sum, average, min, max, or standard deviation. Then you can create the conditions that drive the color coding of the cell background.

This example highlights cells based on the following rules:

  • Cells with sum of sales greater than $100, green
  • Cells with sum of between $10 and $100 (inclusive), yellow
  • Cells with sum of sales less than $10, red

Since the conditions are discrete, apply solid color based on Sales in the Sum aggregation.

In Conditional #1:

  1. Choose the Greater than comparison method.
  2. Enter 100 as the threshold Value to be 100
  3. Use the color picker to pick green.
  4. Choose Add condition and configure the other two conditions accordingly.
  5. Choose Apply, and the Sales column is conditionally formatted, showing green, yellow and red colors when conditions are met.

If you’d like to re-order the conditions, choose and move the chosen condition up or down. You can also delete the condition. To reset all of the conditions, choose Clear to clear the applied formatting from the visual and remove the conditions from the configuration pane.

The following screenshots demonstrate the configuration step by step:

If you are happy with the highlighting rules, choose Close to return to the summary page.

You see a brief summary of the conditions applied to background color of the Sales column. You can Edit or Delete the applied conditions, Add other formatting styles such as text color or icon to the same Sales column, Delete the target column altogether, or Add another target column up top.

Icons

This example adds icons to the same Sales column based on data in the Profits column so that a thumbs-up icon shows when profits are greater than 0.

  1. Start with Add icon. In the New icon configuration page, choose the base column and Aggregation
  2. Under Icon set, QuickSight offers a list of canned color icon sets for quick application.
  3. For icons beyond this list, collapse the dropdown and check the Custom conditions Choose the pre-populated + icon for more icon selections and the option to Use custom Unicode icon. This example uses the thumbs-up and sets the color as orange.
  4. Choose Apply and to see icons show up in the Sales column accordingly.

The following screenshots show the configuration step by step:

Please note that you have the option to Show icon only. Once checked, the text values of the target column are hidden, showing icons only.

Conditionally formatting entire rows based on calculated fields

Next, here is a slightly more complex example that involves calculated fields. This example highlights rows in which Ship charges exceeds 20% of the total sales amount. This condition needs to be based on a flag that differentiates orders with high shipping charges from the rest.

Create a calculated field called high_shipping_flag using this function:

ifelse(sum({Ship charges}) > 0.2 * sum(Sales),1,0)

As a result, orders with shipping charges over 20% of sales amount take a value of 1, otherwise, they take a value of 0.

With the base field ready, proceed with conditional formatting configuration. Since the target property for formatting is the entire row instead of a specific column, access the configuration pane from the visual dropdown menu instead of a particular column from the field well.

Scroll down to the bottom of the dropdown list and select the option to conditionally format the [Entire row] in addition to individual columns.

Note the options to Add background color or Add text color, without the Add icon option, because it is not applicable to entire rows. Also note the Apply on top checkbox, which allows the conditional formatting for the entire row to paint over conditional formatting applied to individual columns. For this example, color the entire row teal if the shipping charge flag is 1.

The following screenshots demonstrate the configuration step by step:

Pivot tables

Conditional formatting is enabled for pivot tables as well, and the configuration workflow is very similar to that of tables, with the following differences:

  • Conditional formatting can be applied to measures only, or fields in the Values well, but not the other data types.
  • Conditional formatting in solid colors can be applied to various levels of your choice: values, subtotals, and totals.
  • Conditional formatting can only be applied to individual columns, not entire rows.

Applying gradient colors

Next, explore the conditional formatting functionalities unique to pivot tables. As mentioned above, only the two measures, or numeric fields, in the Values well can be the target fields for conditional formatting.

The following screenshot shows a conditionally formatted pivot table:

As with tables, you can access Conditional formatting configuration pane from the dropdown of either the visual or the field itself in the Values well. Note that only the two measures show up in the dropdown list.

Applying gradient colors to Percent Tested is fairly straightforward. When you select Gradient, you are prompted to define a base field, which can be any field from the dataset as well as Aggregation. Amazon QuickSight offers three pre-set gradient colors for easy application: blue, red-green, and red-yellow-green.

Choose the diverging color option. You have the option to further customize the three colors as well as the corresponding min, mid, and max values by overriding the default, as shown in the following screenshots.

Applying solid colors to column values, subtotals and totals

For Average Score, apply solid color rules to the text.

Add Average Score as the target column for conditional formatting and select Add text color. As mentioned above, only solid colors can be applied to subtotals and totals.

Note the extra text Values next to Condition, which is different from those in Table, right above the comparison method dropdown box. By default, Amazon QuickSight applies the solid color conditions to the most fine-grained values of the target field with the dimension fields at all levels expanded.

This example examines how the average scores at the Borough level compare with each other, which requires applying conditions to both Values and Subtotals. To do so, click on the icon to expose the options. This is a multi-select dropdown list, in which you can check one option or multiple options. After checking the desired boxes, the Values and Subtotals show up next to Condition. You can Add condition after the first you have completed.

Note that the next condition added inherits the level of granularity from the previous condition. In this example, Condition #2 automatically applies to Values and Subtotals, just like in the previous one. You can override from the check boxes of the multi-select dropdown.

The following screenshot shows the configuration and the effect.

For illustration purposes, add icons as well and enable Show subtotals for rows from the visual dropdown menu. With some rows collapsed at the Borough level, the solid color rules are applied to the average score for the collapsed boroughs, along with the school-level average scores on the expanded rows. The solid color rules also show up in the subtotals enabled for rows.

The following screenshots show conditional formatting applied to subtotals:

KPI Chart

This release also makes conditional formatting available for KPI charts so that you can easily tell whether the KPI is over or under the business-critical thresholds, using text color and icons as quick indicators. In cases in which progress bars are shown, you also have the option to conditionally format the foreground color of the progress bar. The configuration workflow is similar to that of a table or pivot table. The difference is that only the primary value field can be conditionally formatted, not the Target nor the Trend group. This example uses a simple KPI chart showing Primary value only and adds an icon if the Average SAT Score is above 1200.

The following screenshots show the configuration and the effect:

Summary

This post illustrated how you can leverage conditional formatting to create richer data visualizations and thus more appealing dashboards in Amazon QuickSight. As of this writing, conditional formatting is enabled for tables, pivot tables, and KPI charts. You can expect to see enhanced capability to dynamically customize data colors based on numeric fields in other chart types as well. Stay tuned.

If you have any questions or feedback, please leave a comment.

 


About the Authors

Susan Fang is a senior product manager for QuickSight with AWS.

 

 

 

 

Evolve your analytics with Amazon QuickSight’s new APIs and theming capabilities

Post Syndicated from Jose Kunnackal original https://aws.amazon.com/blogs/big-data/evolve-your-analytics-with-amazon-quicksights-new-apis-and-theming-capabilities/

The Amazon QuickSight team is excited to announce the availability of Themes and more APIs! Themes for dashboards let you align the look and feel of Amazon QuickSight dashboards with your application’s branding or corporate themes. The new APIs added allow you to manage your Amazon QuickSight deployments programmatically, with support for dashboards, datasets, data sources, SPICE ingestion, and fine-grained access control over AWS resources. Together, they allow you to creatively tailor Amazon QuickSight to your audiences, whether you are using Amazon QuickSight to provide your users with an embedded analytics experience or for your corporate Business Intelligence (BI) needs. This post provides an overview of these new capabilities and details on getting started.

Theming your dashboards

Amazon QuickSight themes allow you to control the look and feel of your dashboards. Specifically, you will be able to affect the following items through themes, and then share with other authors in the account.

  • Margins
  • Borders around visuals
  • Gutters (spacing between visuals)
  • Data colors (used within the visuals)
  • Background colors (for visuals within the dashboard and the dashboard itself)
  • Foreground colors (for text in the visuals and on the dashboard)
  • Highlight colors on visuals

The following screenshot of the dashboard highlights aspects of the dashboards you can modify via the theming options currently available:

With the available options, you can also make your QuickSight dashboards denser (e.g. no margins or gutters), as shown in the view below that uses QuickSight’s new “Seaside” theme.

You also have a dark dashboard option with QuickSight’s new “Midnight” theme as shown below.

Themes are accessible via the left hand panel in the dashboard authoring interface.

You can create your own theme by starting with one of the built-in themes and customizing it. The interactive theme editing experience makes this easy – you can edit as little or as much as you want, and get to the perfect theme for your dashboards.

Using APIs for dashboards, datasets, data sources, SPICE ingestion, and permissions

The new APIs cover dashboards, datasets, data sources, SPICE ingestion, and fine-grained access control over S3/Athena. We’ve introduced templates in QuickSight with this release: templates allow you to store the visual configuration and data schema associated with a dashboard and then easily replicate the dashboard for different sets of users or different data, within or across accounts.

The new template APIs allow you to create templates from QuickSight analyses. You can then use the dashboard APIs to create a dashboard from the template in the same or different account, version control dashboards, and connect them to different datasets as needed. The dataset APIs allow you to create custom datasets, with filters applied at the dataset level (for example, data for a specific customer only). Dataset APIs can be used to create both SPICE and direct query datasets. SPICE APIs let you refresh datasets programmatically and view SPICE ingestion status and history. Data source APIs programmatically define data source connections. Finally, fine-grained access permissions APIs let you secure access to data in Amazon S3, Amazon Athena, and other AWS resources.

As a developer using Amazon QuickSight’s embedded capabilities, or author using Amazon QuickSight to publish dashboards on different subject areas, the new APIs allow you to automate your workflows and programmatically perform repetitive activities such as rolling out identical versions of dashboards to different teams or customers. QuickSight assets can now be migrated from dev to QA to production via API, overcoming the need to manually access each environment. With versioning, you also have your previous dashboards backed up, providing you flexibility to roll back dashboard deployments if the need arises.

As an Amazon QuickSight administrator, you can use the new APIs to audit and manage access to dashboards, data sources, and datasets across your account. You can also pre-populate dashboards and datasets into individual author accounts for easier and faster onboarding. You no longer need to distribute individual credentials to data sources: you can provision centrally-managed data sources shared with all your users. You can also manage individual author access to S3, Athena, or other AWS resources on-the-fly using IAM policies through the fine-grained access control APIs. The APIs open up a lot of possibilities with Amazon QuickSight – we’ll cover a few scenarios in this blog post and then follow up with additional posts that cover more.

The Amazon QuickSight APIs can be invoked via the AWS Software Development Kit (SDK) or the AWS Command Line Interface (CLI). For our discussion, we will use the AWS CLI. To capture all the details needed for each API, this post uses the generate-cli-skeleton option available in the AWS CLI and walks you through the example of creating a Redshift data source, creating data sets, and dashboards associated with it. This option, when invoked, provides a skeleton file with all the required API details, which you can edit to the correct inputs and use for your API calls. The code below invokes the generate-cli-skeleton option and writes the details to the create-data-source-cli-input.json file; we would then modify this file and use it as the input to the create-data-source API when invoking it.

aws quicksight create-data-source --generate-cli-skeleton > create-data-source-cli-input.json

Let’s say that you want to help Alice, a new hire in your company, create an Amazon QuickSight dashboard from some customer revenue data in Redshift. To set things up for Alice, you want to use the new APIs to connect to Redshift and create a QuickSight data set within her QuickSight user account.

Creating a data source

To connect to your Redshift database, you’ll need to create a data source in QuickSight. First, you describe it with the following configuration, which covers the data source connection details, credentials, and permissions around who should be able to see the data sources.

cat create-data-source-cli-input.json

{
    "AwsAccountId": "xxxxxxxxxxxx",
    "DataSourceId": "SampleRedshiftDatasouce",
    "Name": "Sample Redshift Datasouce",
    "Type": "REDSHIFT",
    "DataSourceParameters": {
        "RedshiftParameters": {
            "Host": "redshift-cluster-1.xxxxxxxxxx.us-east-1.redshift.amazonaws.com",
            "Port": 5439,
            "Database": "dev",
            "ClusterId": "redshift-cluster-1"
        }
    },
    "Credentials": {
        "CredentialPair": {
            "Username": "xxxxxxxxx",
            "Password": "xxxxxxxxxxx"
        }
    },
    "Permissions": [
        {
            "Principal": "arn:aws:quicksight:us-east-1:xxxxxxxxxxxx:user/default/alice",
            "Actions": [
                "quicksight:UpdateDataSourcePermissions",
                "quicksight:DescribeDataSource",
                "quicksight:DescribeDataSourcePermissions",
                "quicksight:PassDataSource",
                "quicksight:UpdateDataSource",
                "quicksight:DeleteDataSource"
                
            ]
        }
    ],
    "VpcConnectionProperties": {
        "VpcConnectionArn": "arn:aws:quicksight:us-east-1:xxxxxxxxxxxx:vpcConnection/VPC to Redshift"
    },
    "Tags": [
        {
            "Key": "Name",
            "Value": "API-RedShiftDemoDataSource"
        }
    ]
}

You can then invoke create-data-source API with this input to start the creation of your data source as indicated below.

aws quicksight create-data-source --cli-input-json file://./create-data-source-cli-input.json

{
    "Status": 202,
    "Arn": "arn:aws:quicksight:us-east-1:xxxxxxxxxxxx:datasource/SampleRedshiftDatasouce",
    "DataSourceId": "SampleRedshiftDatasouce",
    "CreationStatus": "CREATION_IN_PROGRESS",
    "RequestId": "ac9fb8fe-71d8-4005-a7c9-d66d814e224e"
}

You can validate that the data source was successfully created by calling the describe-data-source API as shown below and checking the response’s Status:

aws quicksight describe-data-source --aws-account-id $AAI --data-source-id SampleRedshiftDatasouce

{
    "Status": 200,
    "DataSource": {
        "Arn": "arn:aws:quicksight:us-east-1:xxxxxxxxxxxx:datasource/SampleRedshiftDatasouce",
        "DataSourceId": "SampleRedshiftDatasouce",
        "Name": "Sample Redshift Datasouce",
        "Type": "REDSHIFT",
        "Status": "CREATION_SUCCESSFUL",
        "CreatedTime": 1574053515.41,
        "LastUpdatedTime": 1574053516.368,
        "DataSourceParameters": {
            "RedshiftParameters": {
                "Host": "redshift-cluster-1.c6qmmwnqxxxx.us-east-1.redshift.amazonaws.com",
                "Port": 5439,
                "Database": "dev",
                "ClusterId": "redshift-cluster-1"
            }
        },
        "VpcConnectionProperties": {
            "VpcConnectionArn": "arn:aws:quicksight:us-east-1:xxxxxxxxxxxx:vpcConnection/VPC to Redshift"
        },
        "SslProperties": {
            "DisableSsl": false
        }
    },
    "RequestId": "57d2d6d6-ebbf-4e8c-82ab-c38935cae8aa"
}

Because you granted permissions to Alice as part of the call to create-data-source, this data source will now be visible to her when she logs in to QuickSight. If you didn’t grant user permissions, this data source is only visible to programmatic users with appropriate permissions.

Creating a dataset

Before Alice can build her dashboard, you’ll need to create a dataset, which identifies the specific data in a data source you want to use. To create a dataset from this data source, you can call the create-data-set API. The create-data-set API allows you to join two tables to create a desired view of data for Alice. Below, PhysicalTableMap points to the tables in your Redshift data source you want to join, and LogicalTableMap defines the join between them:

A sample dataset definition with joins included is shown below.

{
    "AwsAccountId": "xxxxxxxxxxxx",
    "DataSetId": "DemoDataSet1",
    "Name": "Demo Data Set 1",
    "PhysicalTableMap": {
        "DemoDataSet1-LineOrder": {
            "RelationalTable": {
                "DataSourceArn": "arn:aws:quicksight:us-east-1:xxxxxxxxxxxx:datasource/SampleRedshiftDatasouce",
                "Schema": "public",
                "Name": "lineorder",
                "InputColumns": [
                        {
                            "Name": "lo_custkey",
                            "Type": "INTEGER"
                        },
                        {
                            "Name": "lo_revenue",
                            "Type": "INTEGER"
                        },
                        {
                            "Name": "lo_quantity",
                            "Type": "INTEGER"
                        },
                        {
                            "Name": "lo_orderkey",
                            "Type": "INTEGER"
                        },
                        {
                            "Name": "lo_shipmode",
                            "Type": "STRING"
                        },
                        {
                            "Name": "lo_orderdate",
                            "Type": "INTEGER"
                        },
                        {
                            "Name": "lo_ordertotalprice",
                            "Type": "INTEGER"
                        },
                        {
                            "Name": "lo_tax",
                            "Type": "INTEGER"
                        },
                        {
                            "Name": "lo_discount",
                            "Type": "INTEGER"
                        }
                ]
	    }
            },
        "DemoDataSet1-Customer": {
            "RelationalTable": {
                "DataSourceArn": "arn:aws:quicksight:us-east-1:xxxxxxxxxxxx:datasource/SampleRedshiftDatasouce",
                "Schema": "public",
                "Name": "customer",
                "InputColumns": [
                        {
                            "Name": "c_custkey",
                            "Type": "INTEGER"
                        },
                        {
                            "Name": "c_phone",
                            "Type": "STRING"
                        },
                        {
                            "Name": "c_address",
                            "Type": "STRING"
                        },
                        {
                            "Name": "c_name",
                            "Type": "STRING"
                        },
                        {
                            "Name": "c_city",
                            "Type": "STRING"
                        }
                ]
            }
        }
    },
    "LogicalTableMap": {
        "DemoDataSet1-LineOrder-Logical": {
            "Alias": "LineOrder",
            "Source": {
                "PhysicalTableId": "DemoDataSet1-LineOrder"
            }
        },
        "DemoDataSet1-Customer-Logical": { 
            "Alias": "Customer",
            "Source": {
                "PhysicalTableId": "DemoDataSet1-Customer"
            }
        },
        "DemoDataSet1-Intermediate": { 
            "Alias": "Intermediate",
            "DataTransforms": [
                {   
                    "ProjectOperation": {
                        "ProjectedColumns": [
                                "lo_revenue",
                                "c_name",
                                "c_city"
                        ]
                    }
                }
            ],
            "Source": {
                "JoinInstruction": {
                    "LeftOperand": "DemoDataSet1-LineOrder-Logical",
                    "RightOperand": "DemoDataSet1-Customer-Logical",
                    "Type": "INNER",
                    "OnClause": "{lo_custkey} = {c_custkey}"
                }
            }
        }

    },
    "ImportMode": "DIRECT_QUERY",
    "Permissions": [
        {
            "Principal": "arn:aws:quicksight:us-east-1:xxxxxxxxxxxx:user/default/alice",
            "Actions": [
                "quicksight:UpdateDataSetPermissions",
                "quicksight:DescribeDataSet",
                "quicksight:DescribeDataSetPermissions",
                "quicksight:PassDataSet",
                "quicksight:DescribeIngestion",
                "quicksight:ListIngestions",
                "quicksight:UpdateDataSet",
                "quicksight:DeleteDataSet",
                "quicksight:CreateIngestion",
                "quicksight:CancelIngestion"
            ]
        }
    ],
    "Tags": [
        {
            "Key": "Name",
            "Value": "API-DemoDataSet1"
        }
    ]
}

If you wanted to create a similar dataset with a SQL statement instead of specifying all the columns, call the create-data-set API specifying your SQL statement in PhysicalTableMap as shown below:

aws quicksight create-data-set --generate-cli-skeleton >create-data-set-cli-input-sql.json

{
    "AwsAccountId": "xxxxxxxxxxxx",
    "DataSetId": "DemoDataSet2",
    "Name": "Demo Data Set 2",
    "PhysicalTableMap": {
        "DemoDataSet2-CustomerSales": {
            "CustomSql": {
                "DataSourceArn": "arn:aws:quicksight:us-east-1:xxxxxxxxxxxx:datasource/SampleRedshiftDatasouce",
                "Name": "Customer Sales",
                "SqlQuery":"SELECT lineorder.lo_revenue, customer.c_name, customer.c_city FROM public.lineorder INNER JOIN public.customer ON lineorder.lo_custkey = customer.c_custkey",
                    "Columns": [
                        {
                            "Name": "lo_revenue",
                            "Type": "INTEGER"
                        },
                        {
                            "Name": "c_name",
                            "Type": "STRING"
                        },
                        {
                            "Name": "c_city",
                            "Type": "STRING"
                        }
                    ]
 
	    }
        }
    },
    "LogicalTableMap": {
        "DemoDataSet2-CustomerSales-Logical": {
            "Alias": "Customer Sales",
            "Source": {
                "PhysicalTableId": "DemoDataSet2-CustomerSales"
            }
        }

    },
    "ImportMode": "DIRECT_QUERY",
    "Permissions": [
        {
            "Principal": "arn:aws:quicksight:us-east-1:xxxxxxxxxxxx:user/default/alice",
            "Actions": [
                "quicksight:UpdateDataSetPermissions",
                "quicksight:DescribeDataSet",
                "quicksight:DescribeDataSetPermissions",
                "quicksight:PassDataSet",
                "quicksight:DescribeIngestion",
                "quicksight:ListIngestions",
                "quicksight:UpdateDataSet",
                "quicksight:DeleteDataSet",
                "quicksight:CreateIngestion",
                "quicksight:CancelIngestion"
            ]
        }
    ],
    "Tags": [
        {
            "Key": "Name",
            "Value": "API-DemoDataSet2"
        }
    ]
}

Because you granted permissions to Alice, these data sets will be visible to her under Your Data Sets when she logs into QuickSight.

This dataset can be used like any other dataset in Amazon QuickSight. Alice can log in to the QuickSight UI and create an analysis that utilizes this dataset, as shown below:

Creating a template

Once Alice creates her dashboard and it gets popular with her team, the next ask might be to provide the same view, but to 100 other teams within the company, and using different datasets for each team. Without APIs, this ask would be a challenge. However, with the APIs, you can now define a template from Alice’s analysis and then create dashboards as needed. A template is created by pointing to a source analysis in QuickSight, which creates references for all datasets used, so that new datasets can referenced when using the template.

The following configuration describes a template created from Alice’s analysis:

{
    "AwsAccountId": "xxxxxxxxxxxx",
    "TemplateId": "DemoDashboardTemplate",
    "Name": "Demo Dashboard Template",
    "SourceEntity": {
        "SourceAnalysis": {
            "Arn": "arn:aws:quicksight:us-east-1:xxxxxxxxxxxx:analysis/7975f7aa-261c-4e7c-b430-305d71e07a8e",
            "DataSetReferences": [
                {
                    "DataSetPlaceholder": "DS1",
                    "DataSetArn": "arn:aws:quicksight:us-east-1:xxxxxxxxxxxx:dataset/DemoDataSet1"
                }
            ]
        }
    },
    "VersionDescription": "1"
}

Templates are not visible within the QuickSight UI and are a developer/admin managed asset that is only accessible via the APIs at this point. You can save the template configuration by calling the create-template API to start creation of your template:

aws quicksight create-template --cli-input-json file://./create-template-cli-input.json

{
    "Status": 202,
    "Arn": "arn:aws:quicksight:us-east-1:xxxxxxxxxxxx:template/DemoDashboardTemplate",
    "VersionArn": "arn:aws:quicksight:us-east-1:xxxxxxxxxxxx:template/DemoDashboardTemplate/version/1",
    "TemplateId": "DemoDashboardTemplate",
    "CreationStatus": "CREATION_IN_PROGRESS",
    "RequestId": "aa189aee-8ab5-4358-9bba-8f4e32450aef"
}

To confirm the template was created, call the describe-template API as shown below:

aws quicksight describe-template --aws-account-id $AAI --template-id DemoDashboardTemplate

{
    "Status": 200,
    "Template": {
        "Arn": "arn:aws:quicksight:us-east-1:xxxxxxxxxxxx:template/DemoDashboardTemplate",
        "Name": "Demo Dashboard Template",
        "Version": {
            "CreatedTime": 1574137165.136,
            "VersionNumber": 1,
            "Status": "CREATION_SUCCESSFUL",
            "DataSetConfigurations": [
                {
                    "Placeholder": "DS1",
                    "DataSetSchema": {
                        "ColumnSchemaList": [
                            {
                                "Name": "c_city",
                                "DataType": "STRING"
                            },
                            {
                                "Name": "lo_revenue",
                                "DataType": "INTEGER"
                            }
                        ]
                    },
                    "ColumnGroupSchemaList": []
                }
            ],
            "Description": "1",
            "SourceEntityArn": "arn:aws:quicksight:us-east-1: xxxxxxxxxxxx:analysis/7975f7aa-261c-4e7c-b430-305d71e07a8e"
        },
        "TemplateId": "DemoDashboardTemplate",
        "LastUpdatedTime": 1574137165.104,
        "CreatedTime": 1574137165.104
    }
}

Creating dashboards

You can then use this template to create a dashboard that is connected to the second, similar, dataset you created earlier. When calling the create-dashboard API, you define a SourceEntity, (in this case, the template we just created), permissions, and any dashboard publish options, as shown in the configuration used for create-dashboard-cli-input.json

create-dashboard-cli-input.json

{
    "AwsAccountId": "xxxxxxxxxxxx",
    "DashboardId": "DemoDashboard1",
    "Name": "Demo Dashboard 1",
    "Permissions": [
        {
            "Principal": "arn:aws:quicksight:us-east-1:xxxxxxxxxxxx:user/default/alice",
            "Actions": [
                "quicksight:DescribeDashboard",
                "quicksight:ListDashboardVersions",
                "quicksight:UpdateDashboardPermissions",
                "quicksight:QueryDashboard",
                "quicksight:UpdateDashboard",
                "quicksight:DeleteDashboard",
                "quicksight:DescribeDashboardPermissions",
                "quicksight:UpdateDashboardPublishedVersion"
            ]
        }
    ],
    "SourceEntity": {
        "SourceTemplate": {
            "DataSetReferences": [
                {
                    "DataSetPlaceholder": "DS1",
                    "DataSetArn": "arn:aws:quicksight:us-east-1:xxxxxxxxxxxx:dataset/DemoDataSet2"
                }
            ],
            "Arn": "arn:aws:quicksight:us-east-1:xxxxxxxxxxxx:template/DemoDashboardTemplate"
        }
    },
    "Tags": [
        {
            "Key": "Name",
            "Value": "API-DemoDashboard"
        }
    ],
    "VersionDescription": "1",
    "DashboardPublishOptions": {
        "AdHocFilteringOption": {
            "AvailabilityStatus": "ENABLED"
        },
        "ExportToCSVOption": {
            "AvailabilityStatus": "ENABLED"
        },
        "SheetControlsOption": {
            "VisibilityState": "EXPANDED"
        }
    }
}

You can confirm you created the dashboard correctly with the describe-dashboard API. See the following code:

aws quicksight describe-dashboard --aws-account-id $AAI --dashboard-id DemoDashboard1

{
    "Status": 200,
    "Dashboard": {
        "DashboardId": "DemoDashboard1",
        "Arn": "arn:aws:quicksight:us-east-1:xxxxxxxxxxxx:dashboard/DemoDashboard1",
        "Name": "Demo Dashboard 1",
        "Version": {
            "CreatedTime": 1574138252.449,
            "Errors": [],
            "VersionNumber": 1,
            "Status": "CREATION_SUCCESSFUL",
            "SourceEntityArn": "arn:aws:quicksight:us-east-1:xxxxxxxxxxxx:template/DemoDashboardTemplate/version/1",
            "Description": "1"
        },
        "CreatedTime": 1574138252.395,
        "LastPublishedTime": 1574138252.395,
        "LastUpdatedTime": 1574138252.449
    },
    "RequestId": "62945539-bb63-4faf-b897-8e84ea5644ae"
}

Because you granted permissions to Alice during creation, this dashboard will be visible to her under All dashboards when she logs into QuickSight as shown below:

Configuring SPICE datasets        

Alice’s datasets are direct query datasets and may cause higher load on your Redshift cluster during peak times, which might also cause Alice and her customers to wait longer for their dashboards to load. With SPICE, Amazon QuickSight’s in-memory data store, you can offload user traffic from your database, while also achieving high concurrency limits and fast, responsive performance for your end-users. The following configuration redefines our second dataset and sets ImportMode to SPICE as highlighted below:

{
    "AwsAccountId": "xxxxxxxxxxxx",
    "DataSetId": "DemoDataSet3",
    "Name": "Demo Data Set 3",
    "PhysicalTableMap": {
        "DemoDataSet2-CustomerSales": {
            "CustomSql": {
                "DataSourceArn": "arn:aws:quicksight:us-east-1:xxxxxxxxxxxx:datasource/SampleRedshiftDatasouce",
                "Name": "Customer Sales",
                "SqlQuery":"SELECT lineorder.lo_revenue, customer.c_name, customer.c_city FROM public.lineorder INNER JOIN public.customer ON lineorder.lo_custkey = customer.c_custkey",
                    "Columns": [
                        {
                            "Name": "lo_revenue",
                            "Type": "INTEGER"
                        },
                        {
                            "Name": "c_name",
                            "Type": "STRING"
                        },
                        {
                            "Name": "c_city",
                            "Type": "STRING"
                        }
                    ]
 
	    }
        }
    },
    "LogicalTableMap": {
        "DemoDataSet2-CustomerSales-Logical": {
            "Alias": "Customer Sales",
            "Source": {
                "PhysicalTableId": "DemoDataSet2-CustomerSales"
            }
        }

    },
    "ImportMode": "SPICE",
    "Permissions": [
        {
            "Principal": "arn:aws:quicksight:us-east-1:xxxxxxxxxxxx:user/default/alice",
            "Actions": [
                "quicksight:UpdateDataSetPermissions",
                "quicksight:DescribeDataSet",
                "quicksight:DescribeDataSetPermissions",
                "quicksight:PassDataSet",
                "quicksight:DescribeIngestion",
                "quicksight:ListIngestions",
                "quicksight:UpdateDataSet",
                "quicksight:DeleteDataSet",
                "quicksight:CreateIngestion",
                "quicksight:CancelIngestion"
            ]
        }
    ],
    "Tags": [
        {
            "Key": "Name",
            "Value": "API-DemoDataSet2"
        }
    ]
}

You can call the create-data-set API with that configuration to create the dataset in SPICE mode:

aws quicksight create-data-set --aws-account-id $AAI --cli-input-json file://./create-data-set-cli-input-sql-spice.json

{
    "Status": 201,
    "Arn": "arn:aws:quicksight:us-east-1:xxxxxxxxxxxx:dataset/DemoDataSet3",
    "DataSetId": "DemoDataSet3",
    "IngestionArn": "arn:aws:quicksight:us-east-1:xxxxxxxxxxxx:dataset/DemoDataSet3/ingestion/589eaf9d-0e21-4572-9ebe-f61b129886d9",
    "IngestionId": "589eaf9d-0e21-4572-9ebe-f61b129886d9",
    "RequestId": "7a442521-ab8f-448f-8668-1f0d6823f987"
}

When creating datasets in SPICE mode, you’ll consume SPICE capacity. The describe-data-set API provides details on the SPICE capacity consumed. See the relevant bits from the describe-data-set response highlighted in the code snippet below:

aws quicksight describe-data-set --aws-account-id $AAI --data-set-id DemoDataSet3|tail -5

        "ImportMode": "SPICE",
        "ConsumedSpiceCapacityInBytes": 107027200
    },
    "RequestId": "b0175568-94ed-40e4-85cc-b382979ca377"
}

For SPICE datasets, the APIs also provide the option of setting up ingestions or listing past ingestions. Listing past ingestions allows you to identify ingestion status and when the data in the dataset was last updated. To list past ingestions, call the list-ingestions API for your dataset:

aws quicksight list-ingestions --aws-account-id $AAI --data-set-id DemoDataSet3

{
    "Status": 200,
    "Ingestions": [
        {
            "Arn": "arn:aws:quicksight:us-east-1:xxxxxxxxxxxx:dataset/DemoDataSet3/ingestion/589eaf9d-0e21-4572-9ebe-f61b129886d9",
            "IngestionId": "589eaf9d-0e21-4572-9ebe-f61b129886d9",
            "IngestionStatus": "COMPLETED",
            "ErrorInfo": {},
            "RowInfo": {
                "RowsIngested": 1126608,
                "RowsDropped": 0
            },
            "CreatedTime": 1574138852.564,
            "IngestionTimeInSeconds": 24,
            "IngestionSizeInBytes": 107027200,
            "RequestSource": "MANUAL",
            "RequestType": "INITIAL_INGESTION"
        }
    ],
    "RequestId": "ee1d9a6f-a290-418a-a526-8906f4689776"
}

Set up ingestions to update the data in your dataset from your data source. As you may have new data in your Redshift cluster, you can use the create-ingestion API for your dataset to trigger a SPICE refresh:

aws quicksight create-ingestion --aws-account-id $AAI --data-set-id DemoDataSet3 --ingestion-id DemoDataSet3-Ingestion2

{
    "Status": 201,
    "Arn": "arn:aws:quicksight:us-east-1:xxxxxxxxxxxx:dataset/DemoDataSet3/ingestion/DemoDataSet3-Ingestion2",
    "IngestionId": "DemoDataSet3-Ingestion2",
    "IngestionStatus": "INITIALIZED",
    "RequestId": "fc1f7eea-1327-41d6-9af7-c12f097ed343"
}

Listing ingestion history again shows the new ingestion in the state RUNNING. See the following code:

aws quicksight list-ingestions --aws-account-id $AAI --data-set-id DemoDataSet3

{
    "Status": 200,
    "Ingestions": [
        {
            "Arn": "arn:aws:quicksight:us-east-1:xxxxxxxxxxxx:dataset/DemoDataSet3/ingestion/DemoDataSet3-Ingestion2",
            "IngestionId": "DemoDataSet3-Ingestion2",
            "IngestionStatus": "RUNNING",
            "RowInfo": {
                "RowsIngested": 0
            },
            "CreatedTime": 1574139332.876,
            "RequestSource": "MANUAL",
            "RequestType": "FULL_REFRESH"
        },
        {
            "Arn": "arn:aws:quicksight:us-east-1:xxxxxxxxxxxx:dataset/DemoDataSet3/ingestion/589eaf9d-0e21-4572-9ebe-f61b129886d9",
            "IngestionId": "589eaf9d-0e21-4572-9ebe-f61b129886d9",
            "IngestionStatus": "COMPLETED",
            "ErrorInfo": {},
            "RowInfo": {
                "RowsIngested": 1126608,
                "RowsDropped": 0
            },
            "CreatedTime": 1574138852.564,
            "IngestionTimeInSeconds": 24,
            "IngestionSizeInBytes": 107027200,
            "RequestSource": "MANUAL",
            "RequestType": "INITIAL_INGESTION"
        }
    ],
    "RequestId": "322c165e-fb90-45bf-8fa5-246d3034a08e"
}

Versioning and aliasing

As you may have noticed, the template and dashboard APIs support versioning. When you create a template or dashboard, version 1 of that entity is created, and each update of a template or dashboard results in a new version of that entity with an incremented version number. Versioning can be useful when you want to republish an old version of a dashboard or ensure you’re building off a known version of the template. For example, if you published your dashboard multiple times, but want to republish the first version, you can call the update-dashboard-published-version API as follows:

aws quicksight update-dashboard-published-version --aws-account-id $AAI --dashboard-id DemoDashboard1 --dashboard-version-number 1

{
   "DashboardArn": "arn:aws:quicksight:us-east-
1:xxxxxxxxxxxx:dashboard/DemoDashboard1",
   "DashboardId": "DemoDashboard1",
   "RequestId": "c97cdbff-d954-4f5b-a887-28659ad9cc9a"
}

Versioning becomes even more powerful with aliases. A template or dashboard alias is a pointer to a specific template or dashboard version. Aliases are helpful when you want to point to a logical version of a resource rather than an explicit version number that needs to be manually updated. This feature can be helpful for development workflows where you want to make changes to a template and only want production code to use an approved template version. For example, you can create a ‘PROD’ template alias for your approved production template and point it to version 1, as shown below:

aws quicksight create-template-alias --aws-account-id $AAI --template-id DemoDashboardTemplate --template-version-number 1 --alias-name PROD

{
   "RequestId": "c112a48c-14da-439e-b4d8-538b60c1f188",
   "TemplateAlias": { 
      "AliasName": "PROD",
      "Arn": "arn:aws:quicksight:us-east-
1:xxxxxxxxxxxx:template/DemoDashboardTemplate/alias/PROD",
      "TemplateVersionNumber": 1
   }
}

When you create a dashboard from this template, you can specify the SourceEntity as the ‘PROD’ template alias to always publish the dashboard from the approved production template version. Note how the template Arn contains the template alias name in the call to the create-dashboard API below:

aws quicksight create-dashboard --generate-cli-skeleton > create-dashboard-cli-input.json

{
    "AwsAccountId": "xxxxxxxxxxxx",
    "DashboardId": "DemoDashboard2",
    "Name": "Demo Dashboard 2",
    "Permissions": [
        {
            "Principal": "arn:aws:quicksight:us-east-1:xxxxxxxxxxxx:user/default/user/Alice",
            "Actions": [
                "quicksight:DescribeDashboard",
                "quicksight:ListDashboardVersions",
                "quicksight:UpdateDashboardPermissions",
                "quicksight:QueryDashboard",
                "quicksight:UpdateDashboard",
                "quicksight:DeleteDashboard",
                "quicksight:DescribeDashboardPermissions",
                "quicksight:UpdateDashboardPublishedVersion"
            ]
        }
    ],
    "SourceEntity": {
        "SourceTemplate": {
            "DataSetReferences": [
                {
                    "DataSetPlaceholder": "DS1",
                    "DataSetArn": "arn:aws:quicksight:us-east-1:xxxxxxxxxxxx:dataset/DemoDataSet2"
                }
            ],
            "Arn": "arn:aws:quicksight:us-east-1:xxxxxxxxxxxx:template/DemoDashboardTemplate/alias/PROD"
        }
    },
    "Tags": [
        {
            "Key": "Name",
            "Value": "API-DemoDashboard"
        }
    ],
    "VersionDescription": "1",
    "DashboardPublishOptions": {
        "AdHocFilteringOption": {
            "AvailabilityStatus": "ENABLED"
        },
        "ExportToCSVOption": {
            "AvailabilityStatus": "ENABLED"
        },
        "SheetControlsOption": {
            "VisibilityState": "EXPANDED"
        }
    }
}

If you were to create new versions of the template, you could then update the ‘PROD’ template alias to point to the new template version, without touching your dashboard creation code from above. For example, if you wanted to set template version 3 to be your approved production version, you can call the update-template-alias API as shown below:

aws quicksight update-template-alias --aws-account-id $AAI --template-id DemoDashboardTemplate --template-version-number 3 --alias-name PROD

{
   "RequestId": "5ac9ff9e-28c1-4b61-aeab-5949986f5b2b",
   "TemplateAlias": { 
      "AliasName": "PROD",
      "Arn": "arn:aws:quicksight:us-east-
1:xxxxxxxxxxxx:template/DemoDashboardTemplate/alias/PROD",
      "TemplateVersionNumber": 3
   }
}

Keep an eye out for a future blog post that will detail more examples of using versioning and aliases for large scale deployments!

Amazon QuickSight’s data source, dataset, dashboard, and SPICE APIs are available in both Standard and Enterprise editions, with edition-specific support for functionality. Fine-grained access control APIs and template support for dashboard APIs are available in Amazon QuickSight Enterprise edition. APIs for Amazon QuickSight are part of the AWS SDK and are available in a variety of programming languages. For more information, see API Reference and Tools to Build on AWS.

Conclusion

With QuickSight’s new themes, APIs, and additional customization options underway, there are many creative ways to author the next chapter in your analytics story. To be a part of future API previews and connect with us, sign up for the embedding early access program.

 


About the Author

Jose Kunnackal John is a principal product manager for Amazon QuickSight.

 

 

 

Provisioning the Intuit Data Lake with Amazon EMR, Amazon SageMaker, and AWS Service Catalog

Post Syndicated from Michael Sambol original https://aws.amazon.com/blogs/big-data/provisioning-the-intuit-data-lake-with-amazon-emr-amazon-sagemaker-and-aws-service-catalog/

This post shares Intuit’s learnings and recommendations for running a data lake on AWS. The Intuit Data Lake is built and operated by numerous teams in Intuit Data Platform. Thanks to Tristan Baker (Chief Architect), Neil Lamka (Principal Product Manager), Achal Kumar (Development Manager), Nicholas Audo, and Jimmy Armitage for their feedback and support.

A data lake is a centralized repository for storing structured and unstructured data at any scale. At Intuit, creating such a pile of raw data is easy. However, more interesting challenges present themselves:

  1. How should AWS accounts be organized?
  2. What ingestion methods will be used? How will analysts find the data they need?
  3. Where should data be stored? How should access be managed?
  4. What security measures are needed to protect Intuit’s sensitive data?
  5. Which parts of this ecosystem can be automated?

This post outlines the approach taken by Intuit, though it is important to remember that there are many ways to build a data lake (for example, AWS Lake Formation).

We’ll cover the technologies and processes involved in creating the Intuit Data Lake at a high level, including the overall structure and the automation used in provisioning accounts and resources. Watch this space in the future for more detailed blog posts on specific aspects of the system, from the other teams and engineers who worked together to build the Intuit Data Lake.

Architecture

Account Structure

Data lakes typically follow a hub-and-spoke model, with the hub account containing shared services that control access to data sources. For the purposes of this post, we’ll refer to the hub account as Central Data Lake.

In this pattern, access to Central Data Lake is apportioned to spoke accounts called Processing Accounts. This model maintains separation between end users and allows for division of billing among distinct business units.

 

 

It is common to maintain two ecosystems: pre-production (Pre-Prod) and production (Prod). This allows data lake administrators to silo access to data by preventing connectivity between Pre-Prod and Prod.

To enable experimentation and testing, it may also be advisable to maintain separate VPC-based environments within Pre-Prod accounts, such as dev, qa, and e2e. Processing Account VPCs would then be connected to the corresponding VPC in Central Data Lake.

Note that at first, we connected accounts via VPC Peering. However, as we scaled we quickly approached the hard limit of 125 VPC peering connections, requiring us to migrate to AWS Transit Gateway. As of this writing, we connect multiple new Processing Accounts weekly.

 

 

Central Data Lake

There may be numerous services running in a hub account, but we’ll focus on the aspects that are most relevant to this blog: ingestion, sanitization, storage, and a data catalog.

 

 

Ingestion, Sanitization, and Storage

A key component to Central Data Lake is a uniform ingestion pattern for streaming data. One example is an Apache Kafka cluster running on Amazon EC2. (You can read about how Intuit engineers do this in another AWS blog.) As we deal with hundreds of data sources, we’ve enabled access to ingestion mechanisms via AWS PrivateLink.

Note: Amazon Managed Streaming for Apache Kafka (Amazon MSK) is an alternative for running Apache Kafka on Amazon EC2, but was not available at the start of Intuit’s migration.

In addition to stream processing, another method of ingestion is batch processing, such as jobs running on Amazon EMR. After data is ingested by one of these methods, it can be stored in Amazon S3 for further processing and analysis.

Intuit deals with a large volume of customer data, and each field is carefully considered and classified with a sensitivity level. All sensitive data that enters the lake is encrypted at the source. The ingestion systems retrieve the encrypted data and move it into the lake. Before it is written to S3, the data is sanitized by a proprietary RESTful service. Analysts and engineers operating within the data lake consume this masked data.

Data Catalog

A data catalog is a common way to give end users information about the data and where it lives. One example is a Hive Metastore backed by Amazon Aurora. Another alternative is the AWS Glue Data Catalog.

Processing Accounts

When Processing Accounts are delivered to end users, they include an identical set of resources. We’ll discuss the automation of Processing Accounts below, but the primary components are as follows:

 

 

                           Processing Account structure upon delivery to the customer

 

Data Storage Mechanisms

One reasonable question is whether all data should reside in Central Data Lake, or if it’s acceptable to distribute data across multiple accounts. A data lake might employ a combination of the two approaches, and classify data locations as primary or secondary.

The primary location for data is Central Data Lake, and it arrives there via the ingestion pipelines discussed previously. Processing Accounts can read from the primary source, either directly from the ingestion pipelines or from S3. Processing Accounts can contribute their transformed data back into Central Data Lake (primary), or store it in their own accounts (secondary). The proper storage location depends on the type of data, and who needs to consume it.

One rule worth enforcing is that no cross-account writes should be permitted. In other words, the IAM principal (in most cases, an IAM role assumed by EC2 via an instance profile) must be in the same account as the destination S3 bucket. This is because cross-account delegation is not supported—specifically, S3 bucket policies in Central Data Lake cannot grant Processing Account A access to objects written by a role in Processing Account B.

Another possibility is for EMR to assume different IAM roles via a custom credentials provider (see this AWS blog), but we chose not to go down this path at Intuit because it would have required many EMR jobs to be rewritten.

 

 

Data Access Patterns

The majority of end users are interested in the data that resides in S3. In Central Data Lake and some Processing Accounts, there may be a set of read-only S3 buckets: any account in the data lake ecosystem can read data from this type of bucket.

To facilitate management of S3 access for read-only buckets, we built a mechanism to control S3 bucket policies, administered entirely via code. Our deployment pipelines use account metadata to dynamically generate the correct S3 bucket policy based on the type of account (Pre-Prod or Prod). These policies are committed back into our code repository for auditability and ease of management.

We employ the same method for managing KMS key policies, as we use KMS with customer managed customer master keys (CMKs) for at-rest encryption in S3.

Here’s an example of a generated S3 bucket policy for a read-only bucket:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "ProcessingAccountReadOnly",
            "Effect": "Allow",
            "Principal": {
                "AWS": [
                    "arn:aws:iam::111111111111:root",
                    "arn:aws:iam::222222222222:root",
                    "arn:aws:iam::333333333333:root",
                    "arn:aws:iam::444444444444:root",
                    "arn:aws:iam::555555555555:root",
                    ...
                    ...
                    ...
                    "arn:aws:iam::999999999999:root",
                ]
            },
            "Action": [
                "s3:ListBucket",
                "s3:GetObject"
            ],
            "Resource": [
                "arn:aws:s3:::intuit-data-lake-example/*",
                "arn:aws:s3:::intuit-data-lake-example"
            ]
        }
    ]
}

Note that we grant access at the account level, rather than using explicit IAM principal ARNs. Because the reads are cross-account, permissions are also required on the IAM principals in Processing Accounts. Maintaining these policies—with automation, at that level of granularity—is untenable at scale. Furthermore, using specific IAM principal ARNs would create an external dependency on foreign accounts. For example, if a Processing Account deletes an IAM role that is referenced in an S3 bucket policy in Central Data Lake, the bucket policy can no longer be saved, causing interruptions to deployment pipelines.

Security

Security is mission critical for any data lake. We’ll mention a subset of the controls we use, but not dive deep.

Encryption

Encryption can be enforced both in transit and at rest, using multiple methods:

  1. Traffic within the lake should use the latest version of TLS (1.2 as of this writing)
  2. Data can be encrypted with application-level (client-side) encryption
  3. KMS keys can used for at-rest encryption of S3, EBS, and RDS

Ingress and Egress

There’s nothing out of the ordinary in our approach to ingress and egress, but it’s worth mentioning the standard patterns we’ve found important:

Policies restricting ingress and egress are the primary points at which a data lake can guarantee quality (ingress) and prevent loss (egress).

Authorization

Access to the Intuit Data Lake is controlled via IAM roles, meaning no IAM users (with long-term credentials) are created. End users are granted access via an internal service that manages role-based, federated access to AWS accounts. Regular reviews are conducted to remove nonessential users.

Configuration Management

We use an internal fork of Cloud Custodian, which is a suite of preventative, detective, and responsive controls consisting of Amazon CloudWatch Events and AWS Config rules. Some of the violations it reports and (optionally) mitigates include:

  • Unauthorized CIDRs in inbound security group rules
  • Public S3 bucket policies and ACLs
  • IAM user console access
  • Unencrypted S3 buckets, EBS volumes, and RDS instances

Lastly, Amazon GuardDuty is enabled in all Intuit Data Lake accounts and is monitored by Intuit Security.

Automation

If there is one thing we’ve learned building the Intuit Data Lake, it is to automate everything.

There are four areas of automation we’ll discuss in this blog:

  1. Creation of Processing Accounts
  2. Processing Account Orchestration Pipeline
  3. Processing Account Terraform Pipeline
  4. EMR and SageMaker deployment via Service Catalog

Creation of Processing Accounts

The first step in creating a Processing Account is to make a request through an internal tool. This triggers automation that provisions an Intuit-stamped AWS account under the correct business unit.

 

Note: AWS Control Tower’s Account Factory was not available at the start of our journey, but it can be leveraged to provision new AWS accounts in a secured, best practice, self-service way.

Account setup also includes automated VPC creation (with optional VPN), fully automated using Service Catalog. End users simply specify subnet sizes.

It’s worth noting that Intuit leverages Service Catalog for self-service deployment of other common patterns, including ingress security groups, VPC endpoints, and VPC peering. Here’s an example portfolio:

Processing Account Orchestration Pipeline

After account creation and VPC provisioning, the Processing Account Orchestration Pipeline runs. This pipeline executes one-time tasks required for Processing Accounts. These tasks include:

  • Bootstrapping an IAM role for use in further configuration management
  • Creation of KMS keys for S3, EBS, and RDS encryption
  • Creation of variable files for the new account
  • Updating the master configuration file with account metadata
  • Generation of scripts to orchestrate the Terraform pipeline discussed below
  • Sharing Transit Gateways via Resource Access Manager

Processing Account Terraform Pipeline

This pipeline manages the lifecycle of dynamic, frequently-updated resources, including IAM roles, S3 buckets and bucket policies, KMS key policies, security groups, NACLs, and bastion hosts.

There is one pipeline for every Processing Account, and each pipeline deploys a series of layers into the account, using a set of parameterized deployment jobs. A layer is a logical grouping of Terraform modules and AWS resources, providing a way to shrink Terraform state files and reduce blast radius if redeployment of specific resources is required.

EMR and SageMaker Deployment via Service Catalog

AWS Service Catalog facilitates the provisioning of Amazon EMR and Amazon SageMaker, allowing end users to launch EMR clusters and SageMaker notebook instances that work out of the box, with embedded security.

Service Catalog allows data scientists and data engineers to launch EMR clusters in a self-service fashion with user-friendly parameters, and provides them with the following:

  • Bootstrap action to enable connectivity to services in Central Data Lake
  • EC2 instance profile to control S3, KMS, and other granular permissions
  • Security configuration that enables at-rest and in-transit encryption
  • Configuration classifications for optimal EMR performance
  • Encrypted AMI with monitoring and logging enabled
  • Custom Kerberos connection to LDAP

For SageMaker, we use Service Catalog to launch notebook instances with custom lifecycle configurations that set up connections or initialize the following: Hive Metastore, Kerberos, security, Splunk logging, and OpenDNS. You can read more about lifecycle configurations in this AWS blog. Launching a SageMaker notebook instance with best-practice configuration is as easy as follows:

 

 

Conclusion

This post illustrates the building blocks we used in creating the Intuit Data Lake. Our solution isn’t wholly unique, but comprised of common-sense approaches we’ve gleaned from dozens of engineers across Intuit, representing decades of experience. These practices have enabled us to push petabytes of data into the lake, and serve hundreds of Processing Accounts with varying needs. We are still building, but we hope our story helps you in your data lake journey.

The content and opinions in this post are those of the third-party author and AWS is not responsible for the content or accuracy of this post.

 


About the Authors

Michael Sambol is a senior consultant at AWS. He holds an MS in computer science from Georgia Tech. Michael enjoys working out, playing tennis, traveling, and watching Western movies.

 

 

 

 

Ben Covi is a staff software engineer at Intuit. At any given moment, he’s probably losing a game of Catan.

 

 

 

Amazon EMR introduces EMR runtime for Apache Spark

Post Syndicated from Joseph Marques original https://aws.amazon.com/blogs/big-data/amazon-emr-introduces-emr-runtime-for-apache-spark/

Amazon EMR is happy to announce Amazon EMR runtime for Apache Spark, a performance-optimized runtime environment for Apache Spark that is active by default on Amazon EMR clusters. EMR runtime for Spark is up to 32 times faster than EMR 5.16, with 100% API compatibility with open-source Spark. This means that your workloads run faster, saving you compute costs without making any changes to your applications.

Amazon EMR has been adding Spark runtime improvements since EMR 5.24, and discussed them in Optimizing Spark Performance. EMR 5.28 features several new improvements.

To measure these improvements, we compared EMR 5.16 (with open source Apache Spark version 2.4) with EMR 5.28 (with EMR runtime for Apache Spark compatible with Apache Spark version 2.4). We used TPC-DS benchmark queries with 3 TB scale and ran them on a six-node c4.8xlarge EMR cluster with data in Amazon S3. We measured performance improvements as the geometric mean of improvement in total query execution time, and the total query execution time across all queries. The results showed considerable improvement—that the geometric mean was 2.4 times faster and the total query runtime was 3.2 times faster.

The following graph shows performance improvements measured as total runtime for 104 TPC-DS queries. EMR 5.28 has the better (lower) runtime.

The following graph shows performance improvements measured as the geometric mean for 104 TPC-DS queries. EMR 5.28 has the better (lower) geomean.

In breaking down the per-query improvements, you can observe the highest performance gains in long-running queries.

The following graph shows performance improvements in EMR 5.28 compared to EMR 5.16 for long-running queries (running for more than 130 seconds in EMR 5.16). In this comparison, the higher numbers are better.

The following graph shows performance improvements in EMR 5.28 compared to EMR 5.16 for short-running queries (running for less than 130 seconds). Again, the higher numbers are better.

Queries running for more than 130 seconds are up to 32 times faster as seen in query 72. Queries running for less than 130 seconds are up to 6 times faster, with an average improvement of 2 times faster across the board.

Customers use Spark for a wide array of analytics use cases ranging from large-scale transformations to streaming, data science, and machine learning. They choose to run Spark on EMR because EMR provides the latest, stable, open-source community innovations, performant storage with Amazon S3, and the unique cost savings capabilities of Spot Instances and Auto Scaling. It also provides ease of use with managed EMR Notebooks, notebook-scoped libraries, Git integration, and easy debugging and monitoring with off-cluster Spark History Services. Combined with the runtime improvements, and fine-grained access control using AWS Lake Formation, Amazon EMR presents an excellent choice for customers running Apache Spark.

With each of these performance optimizations to Apache Spark, you benefit from better query performance. Stay tuned for additional updates that improve Apache Spark performance in Amazon EMR. To keep up to date, subscribe to the Big Data blog’s RSS feed to learn about more Apache Spark optimizations, configuration best practices, and tuning advice.

 


About the Authors

Joseph Marques is a principal engineer for EMR at Amazon Web Services.

 

 

 

 

Peter Gvozdjak is a senior engineering manager for EMR at Amazon Web Services.

 

 

 

 

Simplify management of Amazon Redshift clusters with the Redshift console

Post Syndicated from Debu Panda original https://aws.amazon.com/blogs/big-data/simplify-management-of-amazon-redshift-clusters-with-the-redshift-console/

Amazon Redshift is the most popular and the fastest cloud data warehouse. It includes a console for administrators to create, configure, and manage Amazon Redshift clusters. The new Amazon Redshift console modernizes the user interface and adds several features to improve managing your clusters and workloads running on clusters.

The new Amazon Redshift console provides the following benefits:

  • Visibility to the health and performance of clusters from a unified dashboard.
  • Simplified management of clusters by streamlining several screens and flows, and reducing the number of clicks for several everyday operations.
  • Improved mean-time-to-diagnose query performance issues by adding capabilities to monitor user queries and correlate with cluster performance metrics.

This post discusses how you can use the new console to create your first cluster, and manage and monitor your clusters in your AWS account.

Prerequisites

For the best experience in using the new Amazon Redshift console, make sure you meet the following prerequisites.

If you are using the AmazonRedshiftFullAccess, you don’t need to make any changes to your permissions. The AmazonRedshiftFullAccess grants full access to all Amazon Redshift resources for the AWS account.

If you are using custom policies, either attach AmazonRedshiftFullAccess or add the following code to your IAM user’s policy:

{  
   "Version":"2012-10-17",
   "Statement":[  
      {  
         "Action":[  
            "cloudwatch:ListMetrics",
            "cloudwatch:GetMetricWidgetImage",
            "cloudwatch:GetMetricData",
            "tag:GetResources",
            "tag:UntagResources",
            "tag:GetTagValues",
            "tag:GetTagKeys",
            "tag:TagResources",
            "iam:ListRoles"
         ],
         "Effect":"Allow",
         "Resource":"*"
      }
   ]
}

You also have to upgrade your clusters to the latest maintenance patch.

New console launch page

When you log in to the new console and you don’t have a cluster, you see the launch page. You can navigate to your dashboard, clusters, queries, Query Editor, configuration changes, and advisor using the left navigation menu.

You are redirected to the Dashboard page if you have at least one cluster.

The following screenshot shows your Amazon Redshift console and navigation menu.

To expand the menu, choose the expand icon at the top of the menu. See the following screenshot.

Setting up a new Amazon Redshift data warehouse

To use Amazon Redshift, you have to first create a new Amazon Redshift cluster. You can launch a cluster by providing a few parameters such as node type, number of nodes, and the master user password. The rest of the parameters can be default values. You can also get an estimated cost for your cluster, and calculate the best configuration based on the size of your data.

Now that you created an Amazon Redshift cluster, you’re ready to create some tables and load data using the Query Editor.

Query Editor

The Query Editor allows you to author and execute queries in your cluster. To connect to your cluster, from Editor, choose Connect to database.

You have to provide your cluster URL and database credentials to connect to your database. See the following screenshot.

You can run SQL commands to create tables, load data, run queries, and perform visual analysis in the Query Editor.

You can switch to a new database connection by clicking on the “Change connection” button as highlighted below.

Monitoring dashboard

More than 15,000 customers use Amazon Redshift to power their analytical workloads to enable modern analytics use cases such as Business Intelligence, predictive analytics, and real-time streaming analytics. As an Amazon Redshift administrator or developer, you want your users, such as data analysts or BI professionals, to get optimal performance for their workloads. Being proactive, you might set up Amazon CloudWatch alarms on different Amazon Redshift metrics.

With the existing console, you can navigate to different pages to get a glance of the health and performance of Amazon Redshift clusters. The new dashboard provides unified visibility to the health and performance of your clusters.

The dashboard enables Amazon Redshift administrators and operators to gain visibility into the following:

  • The number of clusters, number of nodes, availability, and cluster health status.
  • CloudWatch alarms for your clusters.
  • Critical performance metrics for the top five clusters, such as the number of queries, database connections, and CPU utilization.
  • Performance workloads such as query throughput, average query length, average run time, and average wait time for your clusters.
  • Consolidated view of all events for your clusters.

You can filter data in each widget by a specific filter or modify the range to perform a trend analysis for a metric.

The dashboard also allows you to isolate issues in your clusters to focus on the problem. For example, if you have CloudWatch alarm for an Amazon Redshift metric such as DB Connections, as the preceding screenshot shows, you can drill down to view the alarm details or drill down to the cluster to get details such as queries and loads running, and cluster and database metrics.

Cluster management

The Clusters page allows you to view node details and key performance metrics for the clusters, tags, and notifications. You can customize this page to include your preferred metric and areas of focus. In this post, the page includes a maintenance track and release status. Through the Actions menu, you can perform frequent operations such as modify, resize, manage tags, reboot, create a snapshot or configure a cross-region snapshot, or delete your cluster.

The following screenshot shows the Clusters page and options on the Actions menu.

Cluster details

If you are diagnosing a performance problem, you can drill down to view cluster details. The cluster details page shows the details for a specific cluster, organized in the following categories:

  • Query monitoring
  • Cluster performance
  • Maintenance
  • Backup
  • Properties

The Actions menu allows you to perform everyday operations such as resize, reboot, or change configuration. You can also modify the password for your master user for the database, create a snapshot, or restore a table in your database.

The following screenshot shows the details page for a cluster and the available menus and options.

Query and Load monitoring

You can monitor the current workload for your WLM queues and view critical metrics such as active queries and query throughput.

The following screenshot shows the Query monitoring section and the time-series view of the breakdown of the workload.

Cluster and database monitoring

You can view cluster and database performance metrics for your cluster, such as CPU utilization, percentage of disk space used, and database connections using the Cluster performance tab. You can also change the time period and period and granularity of data.

The following screenshot shows the Cluster performance section.

You can change your default metric view to the metrics of your choice by using the config button. You can either add 6, 9, or 20 metrics to this page. It is recommended that you keep these six metrics for optimal page performance.

Cluster maintenance

The Maintenance and monitoring tab allows you to view your CloudWatch alarms, events, and maintenance level. You can also enable auditing, create CloudWatch alarms, and create maintenance windows for your cluster.

The following screenshot shows the Maintenance and monitoring section.

Backup and restore

Amazon Redshift automatically takes incremental snapshots (backups) of your data every eight hours or 5 GB per node of data change. Alternatively, you can create a snapshot schedule to control when to create snapshots. You can create a new cluster by restoring from an existing snapshot.

You can view, search your snapshots, enable automatic snapshots, take a manual snapshot, and configure cross-region snapshots from this page.

You can also create a new cluster from an existing snapshot. With this release, Amazon Redshift allows you to restore snapshots to create a cluster with different node types than the original version. You can either migrate your Amazon Redshift data warehouse to a more powerful node type or clone to a smaller node type for development and testing.

You can view snapshots for your clusters by navigating the snapshots submenu from the cluster menu. You can search, view, and delete snapshots generated by existing and deleted clusters. You can restore a cluster from a snapshot, including a snapshot generated from a deleted cluster.

The following screenshot shows the Backup section and available actions in the Snapshots section.

Properties

Amazon Redshift clusters provide several customization options. The properties tab allows you to view and edit standard configurations such as IAM roles, cluster configuration details, database configurations, and network configurations. You can also manage tags for your cluster.

You need the JDBC and ODBC URLs for connecting to a cluster for using with different ETL, Business Intelligence, and developer tools. This post also provides you connection details such as JDBC and ODBC URLs and IP addresses for the nodes in the cluster.

The following screenshot shows the Properties section.

Conclusion

This post showed you the new features in the Amazon Redshift management console, such as monitoring your dashboard and updated flows to create, manage, and monitor Amazon Redshift clusters. The Clusters page logically organizes information for better navigation and simplify everyday operations. Switch today to the new Redshift Console and simplify management of your Amazon Redshift clusters.

In a later post, you can learn how the Amazon Redshift console improves query monitoring and helps you diagnose query performance.

 


About the Authors

Debu Panda, a senior product manager at AWS, is an industry leader in analytics, application platform, and database technologies. He has more than 20 years of experience in the IT industry and has published numerous articles on analytics, enterprise Java, and databases and has presented at multiple conferences. He is lead author of the EJB 3 in Action (Manning Publications 2007, 2014) and Middleware Management (Packt).

 

 

 

 

Raja Bhogi is an engineering manager at AWS. He is responsible for building delightful and easy-to-use web experiences for analytics and blockchain products. His work includes launching web experiences for new analytics products, and working on new feature launches for existing products. He is passionate about web technologies, performance insights, and tuning. He is a thrill seeker and enjoys everything from roller coasters to bungy jumping.

 

 

 

 

Publish and update data products dynamically with AWS Data Exchange

Post Syndicated from Akram Chetibi original https://aws.amazon.com/blogs/big-data/publish-and-update-data-products-dynamically-with-aws-data-exchange/

Data is revolutionizing the way organizations of all sizes conduct their business. Companies are increasingly using third-party data to complement their internal data and deliver value for their customers. Third party data is used across a wide variety of use-cases, such as to build applications for customers, to run analytics workloads to improve business operations and marketing activities, or to build predictive models using machine learning (ML) techniques.

However, as data becomes the center of how companies operate, the way data providers deliver to data subscribers has not changed in years. As data providers, you spend time and effort on undifferentiated heavy lifting to build data delivery and entitlement management mechanisms to serve your customers. Many data providers also rely on traditional sales and delivery channels and are often unable to reach many customers interested in their data, which leads to slower adoption of their data products.

Enter AWS Data Exchange.

AWS Data Exchange makes it easy to exchange data in the cloud efficiently. In a few minutes, customers can find and subscribe to hundreds of data products from more than 80 qualified data providers across industries, such as Financial Services, Healthcare and Life Sciences, and Consumer and Retail. After subscribing, customers can download a dataset or copy it to Amazon S3 and analyze it with a wide variety of AWS analytics and ML services. AWS Data Exchange gives data providers a secure, transparent, and reliable channel to reach millions of AWS customers. AWS Data Exchange also helps you service your existing customer subscriptions more efficiently and at a lower cost by eliminating the need to build and maintain data delivery, licensing, or billing infrastructure.

Many data providers publish data products that are updated regularly. For example, a stock market data provider may want to publish daily closing prices every day, or a weather forecast data provider may want to provide an updated forecast every week. This post walks through the process of publishing and updating products dynamically on AWS Data Exchange. The post first shows how to publish a new product and make it available to subscribers, which can be done in minutes using the AWS Data Exchange console. The post also reviews a workflow using a Lambda function to automatically update the product by publishing new revisions to its underlying data sets.

Prerequisites

Before you begin, complete the following prerequisites:

  1. You must be a registered provider on AWS Data Exchange. Only eligible and registered providers can publish data products on AWS Data Exchange. Eligible providers must agree to the Terms and Conditions for AWS Marketplace under a valid legal entity domiciled in the United States or a member state of the EU, supply valid banking and taxation identification, and be qualified by the AWS Data Exchange business operations team. For more information, see Providing Data Products on AWS Data Exchange.
  2. The data that you publish must be compliant with the AWS Marketplace Terms and Conditions and the AWS Data Exchange Publishing Guidelines.
  3. You must have the appropriate IAM permissions to use AWS Data Exchange as a provider. For example, you can use the AWSDataExchangeProviderFullAccess managed IAM policy.
  4. You need an S3 bucket for your ready-to-publish data files. For more information, see Create a Bucket and What is Amazon S3?

AWS Data Exchange concepts

Products are the unit of exchange in AWS Data Exchange. A product is a package of data sets that a provider publishes and others subscribe to. The AWS Data Exchange product catalog and AWS Marketplace website both list products. A product can contain one or more data sets, as well as product details, including the product’s name and description, categories, and contact details. The product also contains information related to the product’s offer terms, which are the terms that subscribers agree to when subscribing to a product. These terms include the available pricing and duration options, the data subscription agreement, and the refund policy.

A data set is a dynamic set of file-based data content. Data sets are dynamic and versioned using revisions. A revision is a specific version of a data set. Each revision can contain multiple files called assets, which you can import to a revision using an asynchronous workflow called a job. After creating a revision and importing assets into it, you need to finalize the revision to mark it as ready for publishing, before publishing it into the dataset’s product. For more information, see Working with Data Sets.

The following diagram summarizes the concepts described above and the hierarchy of the different resources.

Publishing a new product to AWS Data Exchange

Before reviewing how to automatically update an existing product, let’s start by setting up and creating a new product. If you already have a published product, skip this section and move on to “Publishing new data files to the product automatically.”

Creating a dataset

To publish a product, first create a dataset. Complete the following steps:

  1. On the AWS Data Exchange console’s, under Data sets, choose Create data set.
  2. Enter a Name and Description for the dataset and choose Create.

The name of the data set is visible as part of the product details in the catalog; consider using a concise name that enables customers to understand the content of the data set easily. The description is visible to subscribers who have an active subscription to the product; consider including coverage information as well as the features and benefits of the dataset.

The following screenshot shows the Create data set section with name and description. This post entered the name Exchange-A End of Day Prices, and the description, End-of-day pricing of all equities listed on Exchange-A. Covers all industries and all equities traded on the exchange (2,000+). This data set contains full history from 1985, and is updated daily with a new file every day around 5pm EST

Creating a revision

After creating the dataset, but before publishing it into a product, you need to create its first initial revision. Complete the following steps:

  1. On your data set’s page, choose the Revisions.
  2. Choose Create revision.
  3. For Revision settings, enter a brief comment about the data in this revision.
  4. Choose Create.The revision comment is visible to subscribers after they subscribe to your product.The following screenshot shows that this post entered the comment Historical data from January 1st, 1985 to November 13th, 2019.You can choose to import files (assets) to this revision from either an S3 bucket or your computer. This post imports a file from an S3 bucket. It is important to note that by default, AWS Data Exchange uses the source S3 Object’s key as an Asset name. The following screenshot shows the example file this post uses.
  5. When the import status is complete, choose Finalize.

Marking a revision as finalized means that it is staged for publishing. You can only publish finalized revisions to subscribers; you can’t modify a revision after publishing it.

Publishing a new product

You are now ready to publish a new product using this data set. Complete the following steps:

  1. On the AWS Data Exchange console, under Publish data, choose Products.
  2. Choose Publish new product.
  3. In Product overview, enter the product details that subscribers can use to identify the product. For information about best practices when populating your product’s details, see Publishing Products. In particular you may want to consider including links to a Data due diligence questionnaire (DDQ), information about the data set file types and schemas, and any other fact sheets.Note that you can use markdown to include links and format your product description.
  4. Choose Next to proceed to the Add data You can then add the dataset that you created above.
  5. Choose Next to proceed to the Configure the public offer page. This is the page where you configure the offer details for your product, including the available pricing options, the Data Subscription Agreement, and the refund policy.You can also choose whether you would like to enable subscription verification. If you enable subscription verification, prospective subscribers will have to fill in information such as their name, company name, email address, and use-case before being able to subscribe. The subscription request will then appear on your Product Dashboard page, and you will have up to 45 days to approve or decline the request. For information about subscription verification, see Subscription Verification for Providers.
  6. Choose Next to review your product. You can preview the product as it will appear on the AWS Data Exchange product catalog. When you are satisfied with your product and offer details, choose Publish the product.Important: Choosing Publish the product will publish your product to the AWS Data Exchange catalog and make it publicly available to subscribers.

You have now created a new data set, added your first revision to this data set with historical data, finalized the revision, and published a product using this finalized revision. This product is available for subscribers to purchase within a few hours after publishing.

Publishing new data files to the product automatically

Now that the product is available to customers, you need to update the product and continuously publish new revisions to it. In our example, you need to publish new equity prices every day. To do so, set up the following architecture, which automatically picks any files uploaded to your S3 bucket and publishes them to the product’s dataset as part of a new revision. The workflow creates and publishes a new revision for each file uploaded to the S3 bucket.

The workflow is as follows:

  1. You upload a ready-to-publish data file to the S3 bucket to update your data set.
  2. S3 invokes an AWS Lambda function with the S3 API event that contains details about the object. For more information, see Using AWS Lambda with Amazon S3.
  3. The AWS Lambda function creates a new revision under the pre-existing data set and starts a job to import the file.
  4. The AWS Lambda function modifies the pre-existing product to include the new dataset revision.
  5. Subscribers can now consume the new revision, which appears as part of their entitled data set.

Building a Lambda function

Now that you published a product with a data set, you have the foundational pieces in place to build a Lambda function that picks a new data file uploaded to S3 and publishes it as a part of that product.

To configure your Lambda function correctly, you first need to record the dataset ID and product ID that you created earlier. You can retrieve them from the AWS Data Exchange console. The product ID is available on the product page, which you can access from your Product Dashboard. The data set ID is available in the data set’s page, which you can access from the Data sets pages.

Data set page

Product page

Creating an IAM role

To give the Lambda function permission to read from the source S3 bucket, create a revision, upload files to it, and publish it to a product, you need to create an IAM role with the appropriate permissions.

To do so, create an IAM role and attach the following policy to it. Be sure to replace {INSERT-BUCKET-NAME} and {INSERT-ACCOUNTID} with your S3 bucket’s name and your account ID respectively.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "S3PermissionforGettingDataSet",
            "Effect": "Allow",
            "Action": "s3:GetObject",
            "Resource": "arn:aws:s3:::{INSERT-BUCKET-NAME}/*"
        },
        {
            "Sid": "S3DataExchangeServicePermissions",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:PutObjectAcl"
            ],
            "Resource": "arn:aws:s3:::*aws-data-exchange*"
        },
        {
            "Sid": "DataExchangeAPIPermissions",
            "Effect": "Allow",
            "Action": [
                "dataexchange:CreateRevision",
                "dataexchange:UpdateRevision",
                "dataexchange:CreateJob",
                "dataexchange:StartJob",
                "dataexchange:GetJob"
            ],
            "Resource": "*"
        },
        {
            "Sid": "MarketplaceAPIPermissions",
            "Effect": "Allow",
            "Action": [
                "aws-marketplace:DescribeEntity",
                "aws-marketplace:StartChangeSet",
                "aws-marketplace:DescribeChangeSet"
            ],
            "Resource": "*"
        },
        {
            "Sid": "CreateCloudwatchLogGroup",
            "Effect": "Allow",
            "Action": "logs:CreateLogGroup",
            "Resource": "arn:aws:logs:us-east-1:{INSERT-ACCOUNTID}:*"
        },
        {
            "Sid": "CloudwatchLogsPermissions",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:us-east-1:{INSERT-ACCOUNTID}:log-group:*"
        }
    ]
}

For more information, see Creating IAM Roles.

Deploying a Lambda layer

This post uses a Lambda layer that extends the AWS Python SDK (boto3) that is built into the Lambda Python runtime by adding the AWS Data Exchange and AWS Marketplace Catalog API SDKs as of November 13, 2019. You can deploy a sample layer published for this post, but you should use the version of the AWS SDK that matches your needs.

Creating a Lambda function

Now that you created the IAM role and deployed the Lambda layer with your latest SDK, you can create a Lambda function using the following steps:

  1. On the Lambda console, choose Create a function.
  2. In the Create function section, choose Author from scratch.
  3. In the Basic information section, configure your Lambda function with the following information:
    1. For Function name, enter a name of your choice.
    2. For Runtime, choose Python 3.7.
    3. For Permissions, select Use an existing role.
    4. From the Existing role dropdown, select the Lambda role you created earlier.
  4. Choose Create function.

Configuring your Lambda function

You can now configure your Lambda function. You first need to configure the function to be triggered when new files upload to the S3 bucket. Complete the following steps:

  1. On the Lambda console, choose Functions.
  2. Select the newly created function.
  3. On the function configuration page, choose Add trigger.
  4. Under Trigger Configuration, choose S3.
  5. From the drop-down, select the bucket you created as a part of the prerequisites.
  6. Under Event type, choose All Object Create Events.
  7. Optionally, choose a Prefix or a Suffix if you want to only publish specific files to your AWS Data Exchange product.
  8. Choose Add.

To confirm your code is running with the appropriate SDK, associate the Lambda layer that you deployed earlier with your Lambda function. As noted previously, this post published a sample layer, but you should use the appropriate version of the AWS SDK that matches your needs.

  1. On the Lambda console, choose Functions.
  2. Select the newly created function.
  3. On the function configuration page, under the function name, choose Layers.
  4. Choose Add a layer.
  5. Under Layer Selection, deselect Select from list of runtime compatible layers.
  6. From the drop-down, choose the layer you deployed earlier.
  7. Choose Add.

You now need to configure the Lambda function’s code. You can copy the following code for the Lambda function. This code programmatically calls the following APIs, which are the same APIs that you performed earlier using the console:

  • CreateRevision creates a new revision.
  • CreateJob and StartJob start importing the file to the revision.
  • GetJob checks the status of the import.
  • UpdateRevision marks the revision as finalized.

To publish an update to the product, the Lambda function uses the AWS Marketplace Catalog API service with the following APIs. To learn more, see the AWS Marketplace Catalog API Reference.

  • DescribeEntity gets the product details.
  • StartChangeSet starts an update.
  • DescribeChangeSet checks the status of the product update.

Complete the following steps:

  1. On the Lambda console, choose Functions.
  2. Select your newly created function.
  3. Scroll down to the Function code
  4. Enter the following code:
    import os
    
    #Include the Lambda layer extracted location
    os.environ['AWS_DATA_PATH'] = '/opt/' 
    
    import boto3
    import time
    import datetime
    import json
    
    region = os.environ['AWS_REGION']
    
    try:
        data_set_id = os.environ['DATA_SET_ID']
    except KeyError:
        raise Exception("DATA_SET_ID environment variable must be defined!") 
    
    try:
        product_id = os.environ['PRODUCT_ID']
    except KeyError:
        raise Exception("PRODUCT_ID environment variable must be defined!")
    
    def lambda_handler(event, context):
        # Setup the boto3 clients needed
        dataexchange = boto3.client(
            service_name='dataexchange',
            region_name=region
    
        )
        marketplace_catalog = boto3.client(
            service_name='marketplace-catalog',
            region_name=region
        )
    
        # parse the s3 details from the triggered event
        bucket_name = event['Records'][0]['s3']['bucket']['name']
        object_key = event['Records'][0]['s3']['object']['key']
    
        # CREATE REVISION under the dataset provided as an environment variable
        current_time_for_creating_revision = datetime.datetime.utcnow().strftime("%d %B %Y %I:%M%p UTC")
        create_revision_response = dataexchange.create_revision(DataSetId=data_set_id,
                                                         Comment='Revision created programmatically on ' + current_time_for_creating_revision)
        revision_id = create_revision_response['Id']
    
        # CREATE JOB under the revision to import file from S3 to DataExchange
        create_job_s3_import = dataexchange.create_job(
            Type='IMPORT_ASSETS_FROM_S3',
            Details={
                'ImportAssetsFromS3': {
                    'DataSetId': data_set_id,
                    'RevisionId': revision_id,
                    'AssetSources': [
                        {
                            'Bucket': bucket_name,
                            'Key': object_key
                        }
    
                    ]
                }
            }
        )
    
        # Filter the ID of the Job from the response
        job_id = create_job_s3_import['Id']
    
        # invoke START JOB on the created job to change it from Waiting to Completed state
        start_created_job = dataexchange.start_job(JobId=job_id)
    
        # GET JOB details to track the state of the job and wait until it reaches COMPLETED state
        job_status = ''
    
        while job_status != 'COMPLETED':
            get_job_status = dataexchange.get_job(JobId=job_id)
            job_status = get_job_status['State']
            print('Job Status ' + job_status)
            
            if job_status=='ERROR' :
                job_errors = get_job_status['Errors']
                raise Exception('JobId: {} failed with error:{}'.format(job_id, job_errors))
            
            time.sleep(.5)
            
        # Finalize revision by invoking UPDATE REVISION
        current_time_for_finalize_revision = datetime.datetime.utcnow().strftime("%d %B %Y %I:%M%p UTC")
        print(current_time_for_finalize_revision)
        finalize_revision = dataexchange.update_revision(DataSetId=data_set_id, RevisionId=revision_id, Finalized=True,
                                                  Comment='Revision finalized programmatically on ' + current_time_for_finalize_revision)
    
        # New dataset version created and finalized, now let’s add it to an existing product specified as an env variable
    
        # Describe Product details to get the metadata about the product
        describe_entity = marketplace_catalog.describe_entity(Catalog='AWSMarketplace', EntityId=product_id)
    
        # Use the output to pull out producttype, productid and datasetarn for startchangeset call
        entity_type = describe_entity['EntityType']
        entity_id = describe_entity['EntityIdentifier']
        dataset_arn = ((json.loads(describe_entity['Details']))['DataSets'][0]['DataSetArn'])
        revision_arn = create_revision_response['Arn']
     
    
        # StartChangeSet to add the newly finalized revision to an existing product
        start_change_set = marketplace_catalog.start_change_set(
            Catalog='AWSMarketplace',
            ChangeSetName="Adding revision to my Product",
            ChangeSet=[
                {
                    "ChangeType": "AddRevisions",
                    "Entity": {
                        "Identifier": entity_id,
                        "Type": entity_type
                    },
                    "Details": json.dumps({
                        "DataSetArn": dataset_arn,
                        "RevisionArns": [revision_arn]
                    })
                }
            ]
        )
        
        #Filter the changeset id from the response
        changeset_id = start_change_set['ChangeSetId']
    
        # DESCRIBE CHANGESET to get the status of the changeset and wait until it reaches SUCCEEDED state
        change_set_status = ''
    
        while change_set_status != 'SUCCEEDED':
            describe_change_set = marketplace_catalog.describe_change_set(
                Catalog='AWSMarketplace',
                ChangeSetId=changeset_id
                )
            change_set_status = describe_change_set['Status']
            print('Change Set Status ' + change_set_status)
    
            if change_set_status=='FAILED' :
                print(describe_change_set)
                failurereason = describe_change_set['FailureDescription']
                raise Exception('ChangeSetID: {} failed with error:\n{}'.format(changeset_id, failurereason))
            time.sleep(1)
            
        return ('Your data has been published successfully')

  5. Scroll down to Environment Variables
  6. Set the DATA_SET_ID and PRODUCT_ID variables to the values you retrieved from the console.
  7. Scroll further down to Basic Settings and set the Timeout value to 1 minute.
  8. Choose Save.

When you upload a file to your S3 bucket, the S3 event now triggers the Lambda function, which updates the dataset automatically and publishes the new file to your subscribers. Subscribers also receive an Amazon CloudWatch event from AWS Data Exchange to automate exporting the data to their S3 buckets.

Conclusion

AWS Data Exchange provides an easy and convenient way for data providers to exchange data with their customers in a cloud-native, secure, and efficient way. This post showed you how to publish a new product from on a newly created data set and revision in the AWS Data Exchange Console. You also learned how to automatically publish files uploaded to your S3 bucket as new revisions. To learn more, visit AWS Data Exchange.

 


About the Authors

Akram Chetibi is a senior product manager of AWS Data Exchange. Akram joined AWS more than two years ago, and has launched multiple services including AWS Data Exchange and AWS Fargate.

 

 

 

 

Keerti Shah is a global solutions architect with Amazon Web Services. She enjoys working with Financial Services customers to drive innovation, digitization, and modernization of legacy applications.

 

 

 

 

Harsha W. Sharma is a global account solutions architect with AWS New York. Harsha joined AWS more than three years ago and works with Global Financial Services customers to design and develop architectures on AWS and support their journey on the cloud.