Tag Archives: Analytics

Building an event-driven application with AWS Lambda and the Amazon Redshift Data API

Post Syndicated from Manash Deb original https://aws.amazon.com/blogs/big-data/building-an-event-driven-application-with-aws-lambda-and-the-amazon-redshift-data-api/

Eventdriven applications are becoming popular with many customers, where applications run in response to events. A primary benefit of this architecture is the decoupling of producer and consumer processes, allowing greater flexibility in application design and building decoupled processes.

An example of an even-driven application is an automated workflow being triggered by an event, which runs a series of transformations in the data warehouse. At the end of this workflow, another event gets initiated to notify end-users about the completion of those transformations and that they can start analyzing the transformed dataset.

In this post, we explain how you can easily design a similar event-driven application with Amazon Redshift, AWS Lambda, and Amazon EventBridge. In response to a scheduled event defined in EventBridge, this application automatically triggers a Lambda function to run a stored procedure performing extract, load, and transform (ELT) operations in an Amazon Redshift data warehouse, using its out-of-the-box integration with the Amazon Redshift Data API. This stored procedure copies the source data from Amazon Simple Storage Service (Amazon S3) to Amazon Redshift and aggregates the results. When complete, it sends an event to EventBridge, which triggers a Lambda function to send notification to end-users through Amazon Simple Notification Service (Amazon SNS) to inform them about the availability of updated data in Amazon Redshift.

This event-driven server-less architecture offers greater extensibility and simplicity, making it easier to maintain and faster to release new features, and also reduces the impact of changes. It also simplifies adding other components or third-party products to the application without many changes.

Prerequisites

As a prerequisite for creating the application in this post, you need to set up an Amazon Redshift cluster and associate it with an AWS Identity and Access Management (IAM) role. For more information, see Getting Started with Amazon Redshift.

Solution overview

The following architecture diagram highlights the end-to-end solution, which you can provision automatically with an AWS CloudFormation template.

The workflow includes the following steps:

  1. The EventBridge rule EventBridgeScheduledEventRule is initiated based on a cron schedule.
  2. The rule triggers the Lambda function LambdaRedshiftDataApiETL, with the action run_sql as an input parameter. The Python code for the Lambda function is available in the GitHub repo.
  3. The function performs an asynchronous call to the stored procedure run_elt_process in Amazon Redshift, performing ELT operations using the Amazon Redshift Data API.
  4. The stored procedure uses the Amazon S3 location event-driven-app-with-lambda-redshift/nyc_yellow_taxi_raw/ as the data source for the ELT process. We have pre-populated this with the NYC Yellow Taxi public dataset for the year 2015 to test this solution.
  5. When the stored procedure is complete, the EventBridge rule EventBridgeRedshiftEventRule is triggered automatically to capture the event based on the source parameter redshift-data from the Amazon Redshift Data API.
  6. The rule triggers the Lambda function LambdaRedshiftDataApiETL, with the action notify as an input parameter.
  7. The function uses the SNS topic RedshiftNotificationTopicSNS to send an automated email notification to end-users that the ELT process is complete.

The Amazon Redshift database objects required for this solution are provisioned automatically by the Lambda function LambdaSetupRedshiftObjects as part of the CloudFormation template initiation by invoking the function LambdaRedshiftDataApiETL, which creates the following objects in Amazon Redshift:

  • Table nyc_yellow_taxi, which we use to copy the New York taxi dataset from Amazon S3
  • Materialized view nyc_yellow_taxi_volume_analysis, providing an aggregated view of table
  • Stored procedure run_elt_process to take care of data transformations

The Python code for this function is available in the GitHub repo.

We also use the IAM role LambdaRedshiftDataApiETLRole for the Lambda function and  LambdaRedshiftDataApiETL to allow the following permissions:

  • Federate to the Amazon Redshift cluster through getClusterCredentials permission, avoiding password credentials
  • Initiate queries in the Amazon Redshift cluster through redshift-data API calls
  • Log with Amazon CloudWatch for troubleshooting purposes
  • Send notifications through Amazon SNS

A sample IAM role for this function is available in the GitHub repo.

Lambda is a key service in this solution because it initiates queries in Amazon Redshift using the redshift-data client. Based on the input parameter action, this function can asynchronously initiate Structured Query Language (SQL) statements in Amazon Redshift, thereby avoiding chances of timing out in case of long-running SQL statements[MOU1] [MOU2]   [MOU1]I think we should put reference to Redshift Data API and highlight that there is no need to configure drivers and connections [MOU2]done. It can also publish custom notifications through Amazon SNS. Also, it uses the Amazon Redshift Data API temporary credentials functionality, which allows it to communicate with Amazon Redshift using IAM permissions without the need of any password-based authentication. With the Data API, you also don’t need to configure drivers and connections for your Amazon Redshift cluster, because it’s handled automatically.

Deploying the CloudFormation template

When your Amazon Redshift cluster is set up, use the provided CloudFormation template to automatically create all required resources for this solution in your AWS account. For more information, see Getting started with AWS CloudFormation.

The template requires you to provide the following parameters:

  • RedshiftClusterIdentifier – Cluster identifier for your Amazon Redshift cluster.
  • DbUsername – Amazon Redshift database user name that has access to run the SQL script.
  • DatabaseName – Name of the Amazon Redshift database where the SQL script runs.
  • RedshiftIAMRoleARN – ARN of the IAM role associated with the Amazon Redshift cluster.
  • NotificationEmailId – Email to send event notifications through Amazon SNS.
  • ExecutionSchedule – Cron expression to schedule the ELT process through an EventBridge rule.
  • SqlText – SQL text to run as part of the ELT process. Don’t change the default value call run_elt_process(); if you want to test this solution with the test dataset provided for this post.

The following screenshot shows the stack details on the AWS CloudFormation console.

Testing the pipeline

After setting up the architecture, you should have an automated pipeline to trigger based on the schedule you defined in the EventBridge rule’s cron expression. You can view the CloudWatch logs and troubleshoot issues in the Lambda function. The following screenshot shows the logs for our setup.

You can also view the query status on the Amazon Redshift console, which allows you to view detailed execution plans for the queries you ran. Although the stored procedure may take around 6 minutes to complete, the Lambda function finishes in seconds. This is primarily because the execution from Lambda on Amazon Redshift was asynchronous. Therefore, the function is complete after initiating the process in Amazon Redshift without caring about the query completion.

When this process is complete, you receive the email notification that the ELT process is complete.

You may then view the updated data in your business intelligence tool, like Amazon QuickSight, or query data directly in Amazon Redshift Query Editor (see the following screenshot) to view the most recent data processed by this event-driven architecture.

Conclusion

The Amazon Redshift Data API enables you to painlessly interact with Amazon Redshift and enables you to build event-driven and cloud-native applications. We demonstrated how to build an event-driven application with Amazon Redshift, Lambda, and EventBridge. For more information about the Data API, see Using the Amazon Redshift Data API to interact with Amazon Redshift clusters and Using the Amazon Redshift Data API.


About the Authors

Manash Deb is a Senior Analytics Specialist Solutions Architect. He has worked in different database and data warehousing technologies for more than 15 years.

 

 

 

Debu Panda, a senior product manager at AWS, is an industry leader in analytics, application platform, and database technologies. He has more than 20 years of experience in the IT industry and has published numerous articles on analytics, enterprise Java, and databases and has presented at multiple conferences. He is lead author of the EJB 3 in Action (Manning Publications 2007, 2014) and Middleware Management (Packt).

 

Fei Peng is a Software Dev Engineer working in the Amazon Redshift team.

Announcing AWS Glue DataBrew – A Visual Data Preparation Tool That Helps You Clean and Normalize Data Faster

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/announcing-aws-glue-databrew-a-visual-data-preparation-tool-that-helps-you-clean-and-normalize-data-faster/

To be able to run analytics, build reports, or apply machine learning, you need to be sure the data you’re using is clean and in the right format. That’s the data preparation step that requires data analysts and data scientists to write custom code and do many manual activities. First, you need to look at the data, understand which possible values are present, and build some simple visualizations to understand if there are correlations between the columns. Then, you need to check for strange values outside of what you’re expecting, such as weather temperature above 200℉ (93℃) or speed of a truck above 200 mph (322 km/h), or for data that is missing. Many algorithms need values to be rescaled to a specific range, for example between 0 and 1, or normalized around the mean. Text fields need to be set to a standard format, and may require advanced transformations such as stemming.

That’s a lot of work. For this reason, I am happy to announce that today AWS Glue DataBrew is available, a visual data preparation tool that helps you clean and normalize data up to 80% faster so you can focus more on the business value you can get.

DataBrew provides a visual interface that quickly connects to your data stored in Amazon Simple Storage Service (S3), Amazon Redshift, Amazon Relational Database Service (RDS), any JDBC accessible data store, or data indexed by the AWS Glue Data Catalog. You can then explore the data, look for patterns, and apply transformations. For example, you can apply joins and pivots, merge different data sets, or use functions to manipulate data.

Once your data is ready, you can immediately use it with AWS and third-party services to gain further insights, such as Amazon SageMaker for machine learning, Amazon Redshift and Amazon Athena for analytics, and Amazon QuickSight and Tableau for business intelligence.

How AWS Glue DataBrew Works
To prepare your data with DataBrew, you follow these steps:

  • Connect one or more datasets from S3 or the Glue data catalog (S3, Redshift, RDS). You can also upload a local file to S3 from the DataBrew console. CSV, JSON, Parquet, and .XLSX formats are supported.
  • Create a project to visually explore, understand, combine, clean, and normalize data in a dataset. You can merge or join multiple datasets. From the console, you can quickly spot anomalies in your data with value distributions, histograms, box plots, and other visualizations.
  • Generate a rich data profile for your dataset with over 40 statistics by running a job in the profile view.
  • When you select a column, you get recommendations on how to improve data quality.
  • You can clean and normalize data using more than 250 built-in transformations. For example, you can remove or replace null values, or create encodings. Each transformation is automatically added as a step to build a recipe.
  • You can then save, publish, and version recipes, and automate the data preparation tasks by applying recipes on all incoming data. To apply recipes to or generate profiles for large datasets, you can run jobs.
  • At any point in time, you can visually track and explore how datasets are linked to projects, recipes, and job runs. In this way, you can understand how data flows and what are the changes. This information is called data lineage and can help you find the root cause in case of errors in your output.

Let’s see how this works with a quick demo!

Preparing a Sample Dataset with AWS Glue DataBrew
In the DataBrew console, I select the Projects tab and then Create project. I name the new project Comments. A new recipe is also created and will be automatically updated with the data transformations that I will apply next.

I choose to work on a New dataset and name it Comments.

Here, I select Upload file and in the next dialog I upload a comments.csv file I prepared for this demo. In a production use case, here you will probably connect an existing source on S3 or in the Glue Data Catalog. For this demo, I specify the S3 destination for storing the uploaded file. I leave Encryption disabled.

The comments.csv file is very small, but will help show some common data preparation needs and how to complete them quickly with DataBrew. The format of the file is comma-separated values (CSV). The first line contains the name of the columns. Then, each line contains a text comment and a numerical rating made by a customer (customer_id) about an item (item_id). Each item is part of a category. For each text comment, there is an indication of the overall sentiment (comment_sentiment). Optionally, when giving the comment, customers can enable a flag to ask to be contacted for further support (support_needed).

Here’s the content of the comments.csv file:

customer_id,item_id,category,rating,comment,comment_sentiment,support_needed
234,2345,"Electronics;Computer", 5,"I love this!",Positive,False
321,5432,"Home;Furniture",1,"I can't make this work... Help, please!!!",negative,true
123,3245,"Electronics;Photography",3,"It works. But I'd like to do more",,True
543,2345,"Electronics;Computer",4,"Very nice, it's going well",Positive,False
786,4536,"Home;Kitchen",5,"I really love it!",positive,false
567,5432,"Home;Furniture",1,"I doesn't work :-(",negative,true
897,4536,"Home;Kitchen",3,"It seems OK...",,True
476,3245,"Electronics;Photography",4,"Let me say this is nice!",positive,false

In the Access permissions, I select a AWS Identity and Access Management (IAM) role which provides DataBrew read permissions to my input S3 bucket. Only roles where DataBrew is the service principal for the trust policy are shown in the DataBrew console. To create one in the IAM console, select DataBrew as trusted entity.

If the dataset is big, you can use Sampling to limit the number of rows to use in the project. These rows can be selected at the beginning, at the end, or randomly through the data. You are going to use projects to create recipes, and then jobs to apply recipes to all the data. Depending on your dataset, you may not need access to all the rows to define the data preparation recipe.

Optionally, you can use Tagging to manage, search, or filter resources you create with AWS Glue DataBrew.

The project is now being prepared and in a few minutes I can start exploring my dataset.

In the Grid view, the default when I create a new project, I see the data as it has been imported. For each column, there is a summary of the range of values that have been found. For numerical columns, the statistical distribution is given.

In the Schema view, I can drill down on the schema that has been inferred, and optionally hide some of the columns.

In the Profile view, I can run a data profile job to examine and collect statistical summaries about the data. This is an assessment in terms of structure, content, relationships, and derivation. For a large dataset, this is very useful to understand the data. For this small example the benefits are limited, but I run it nonetheless, sending the output of the profile job to a different folder in the same S3 bucket I use to store the source data.

When the profile job has succeeded, I can see a summary of the rows and columns in my dataset, how many columns and rows are valid, and correlations between columns.

Here, if I select a column, for example rating, I can drill down into specific statistical information and correlations for that column.

Now, let’s do some actual data preparation. In the Grid view, I look at the columns. The category contains two pieces of information, separated by a semicolon. For example, the category of the first row is “Electronics;Computers.” I select the category column, then click on the column actions (the three small dots on the right of the column name) and there I have access to many transformations that I can apply to the column. In this case, I select to split the column on a single delimiter. Before applying the changes, I quickly preview them in the console.

I use the semicolon as delimiter, and now I have two columns, category_1 and category_2. I use the column actions again to rename them to category and subcategory. Now, for the first row, category contains Electronics and subcategory Computers. All these changes are added as steps to the project recipe, so that I’ll be able to apply them to similar data.

The rating column contains values between 1 and 5. For many algorithms, I prefer to have these kind of values normalized. In the column actions, I use min-max normalization to rescale the values between 0 and 1. More advanced techniques are available, such as mean or Z-score normalization. A new rating_normalized column is added.

I look into the recommendations that DataBrew gives for the comment column. Since it’s text, the suggestion is to use a standard case format, such as lowercase, capital case, or sentence case. I select lowercase.

The comments contain free text written by customers. To simplify further analytics, I use word tokenization on the column to remove stop words (such as “a,” “an,” “the”), expand contractions (so that “don’t” becomes “do not”), and apply stemming. The destination for these changes is a new column, comment_tokenized.

I still have some special characters in the comment_tokenized column, such as an emoticon :-). In the column actions, I select to clean and remove special characters.

I look into the recommendations for the comment_sentiment column. There are some missing values. I decide to fill the missing values with a neutral sentiment. Now, I still have values written with a different case, so I follow the recommendation to use lowercase for this column.

The comment_sentiment column now contains three different values (positive, negative, or neutral), but many algorithms prefer to have one-hot encoding, where there is a column for each of the possible values, and these columns contain 1, if that is the original value, or 0 otherwise. I select the Encode icon in the menu bar and then One-hot encode column. I leave the defaults and apply. Three new columns for the three possible values are added.

The support_needed column is recognized as boolean, and its values are automatically formatted to a standard format. I don’t have to do anything here.

The recipe for my dataset is now ready to be published and can be used in a recurring job processing similar data. I didn’t have a lot of data, but the recipe can be used with much larger datasets.

In the recipe, you can find a list of all the transformations that I just applied. When running a recipe job, output data is available in S3 and ready to be used with analytics and machine learning platforms, or to build reports and visualization with BI tools. The output can be written in a different format than the input, for example using a columnar storage format like Apache Parquet.

Available Now

AWS Glue DataBrew is available today in US East (N. Virginia), US East (Ohio), US West (Oregon), Europe (Ireland), Europe (Frankfurt), Asia Pacific (Tokyo), Asia Pacific (Sydney).

It’s never been easier to prepare you data for analytics, machine learning, or for BI. In this way, you can really focus on getting the right insights for your business instead of writing custom code that you then have to maintain and update.

To practice with DataBrew, you can create a new project and select one of the sample datasets that are provided. That’s a great way to understand all the available features and how you can apply them to your data.

Learn more and get started with AWS Glue DataBrew today.

Danilo

The Satellite Ear Tag that is Changing Cattle Management

Post Syndicated from Karen Hildebrand original https://aws.amazon.com/blogs/architecture/the-satellite-ear-tag-that-is-changing-cattle-management/

Most cattle are not raised in cities—they live on cattle stations, large open plains, and tracts of land largely unpopulated by humans. It’s hard to keep connected with the herd. Cattle don’t often carry their own mobile phones, and they don’t pay a mobile phone bill. Naturally, the areas in which cattle live, often do not have cellular connectivity or reception. But they now have one way to stay connected: a world-first satellite ear tag.

Ceres Tag co-founders Melita Smith and David Smith recognized the problem given their own farming background. David explained that they needed to know simple things to begin with, such as:

  • Where are they?
  • How many are out there?
  • What are they doing?
  • What condition are they in?
  • Are they OK?

Later, the questions advanced to:

  • Which are the higher performing animals that I want to keep?
  • Where do I start when rounding them up?
  • As assets, can I get better financing and insurance if I can prove their location, existence, and condition?

To answer these questions, Ceres Tag first had to solve the biggest challenge, and it was not to get cattle to carry their mobile phones and pay mobile phone bills to generate the revenue needed to get greater coverage. David and Melita knew they needed help developing a new method of tracking, but in a way that aligned with current livestock practices. Their idea of a satellite connected ear tag came to life through close partnership and collaboration with CSIRO, Australia’s national science agency. They brought expertise to the problem, and rallied together teams of experts across public and private partnerships, never accepting “that’s not been done before” as a reason to curtail their innovation.

 

Figure 1: How Ceres Tag works in practice

Thinking Big: Ceres Tag Protocol

Melita and David constructed their idea and brought the physical hardware to reality. This meant finding strategic partners to build hardware, connectivity partners that provided global coverage at a cost that was tenable to cattle operators, integrations with existing herd management platforms and a global infrastructure backbone that allowed their solution to scale. They showed resilience, tenacity and persistence that are often traits attributed to startup founders and lifelong agricultural advocates. Explaining the purpose of the product often requires some unique approaches to defining the value proposition while fundamentally breaking down existing ways of thinking about things. As David explained, “We have an internal saying, ‘As per Ceres Tag protocol …..’ to help people to see the problem through a new lens.” This persistence led to the creation of an easy to use ear tagging applicator and a two-prong smart ear tag. The ear tag connects via satellite for data transmission, providing connectivity to more than 120 countries in the world and 80% of the earth’s surface.

The Ceres Tag applicator, smart tag, and global satellite connectivity

Figure 2: The Ceres Tag applicator, smart tag, and global satellite connectivity

Unlocking the blocker: data-driven insights

With the hardware and connectivity challenges solved, Ceres Tag turned to how the data driven insights would be delivered. The company needed to select a technology partner that understood their global customer base, and what it means to deliver a low latency solution for web, mobile and API-driven solutions. David, once again knew the power in leveraging the team around him to find the best solution. The evaluation of cloud providers was led by Lewis Frost, COO, and Heidi Perrett, Data Platform Manager. Ceres Tag ultimately chose to partner with AWS and use the AWS Cloud as the backbone for the Ceres Tag Management System.

Ceres Tag conceptual diagram

Figure 3: Ceres Tag conceptual diagram

The Ceres Tag Management System houses the data and metadata about each tag, enabling the traceability of that tag throughout each animal’s life cycle. This includes verification as to whom should have access to their health records and history. Based on the nature of the data being stored and transmitted, security of the application is critical. As a startup, it was important for Ceres Tag to keep costs low, but to also to be able to scale based on growth and usage as it expands globally.

Ceres Tag is able to quickly respond to customers regardless of geography, routing traffic to the appropriate end point. They accomplish this by leveraging Amazon CloudFront as the Content Delivery Network (CDN) for traffic distribution of front-end requests and Amazon Route 53 for DNS routing. A multi-Availability Zone deployment and AWS Application Load Balancer distribute incoming traffic across multiple targets, increasing the availability of your application.

Ceres Tag is using AWS Fargate to provide a serverless compute environment that matches the pay-as-you-go usage-based model. AWS also provides many advanced security features and architecture guidance that has helped to implement and evaluate best practice security posture across all of the environments. Authentication is handled by Amazon Cognito, which allows Ceres Tag to scale easily by supporting millions of users. It leverages easy-to-use features like sign-in with social identity providers, such as Facebook, Google, and Amazon, and enterprise identity providers via SAML 2.0.

The data captured from the ear tag on the cattle is will be ingested via AWS PrivateLink. By providing a private endpoint to access your services, AWS PrivateLink ensures your traffic is not exposed to the public internet. It also makes it easy to connect services across different accounts and VPCs to significantly simplify your network architecture. In leveraging a satellite connectivity provider running on AWS, Ceres Tag will benefit from the AWS Ground Station infrastructure leveraged by the provider in addition to the streaming IoT database.

 

New – Export Amazon DynamoDB Table Data to Your Data Lake in Amazon S3, No Code Writing Required

Post Syndicated from Alex Casalboni original https://aws.amazon.com/blogs/aws/new-export-amazon-dynamodb-table-data-to-data-lake-amazon-s3/

Hundreds of thousands of AWS customers have chosen Amazon DynamoDB for mission-critical workloads since its launch in 2012. DynamoDB is a nonrelational managed database that allows you to store a virtually infinite amount of data and retrieve it with single-digit-millisecond performance at any scale. To get the most value out of this data, customers had […]

Handling data erasure requests in your data lake with Amazon S3 Find and Forget

Post Syndicated from Chris Deigan original https://aws.amazon.com/blogs/big-data/handling-data-erasure-requests-in-your-data-lake-with-amazon-s3-find-and-forget/

Data lakes are a popular choice for organizations to store data around their business activities. Best practice design of data lakes impose that data is immutable once stored, but new regulations such as the European General Data Protection Regulation (GDPR), California Consumer Privacy Act (CCPA), and others have created new obligations that operators now need to be able to erase private data from their data lake when requested.

When asked to erase an individual’s private data, as a data lake operator you have to find all the objects in your Amazon Simple Storage Service (Amazon S3) buckets that contain data relating to that individual. This can be complex because data lakes contain many S3 objects (each of which may contain multiple rows), as shown in the following diagram. You often can’t predict which objects contain data relating to an individual, so you need to check each object. For example, if the user mary34 asks to be removed, you need to check each object to determine if it contains data relating to mary34. This is the first challenge operators face: identifying which objects contain data of interest.

After you identify objects containing data of interest, you face a second challenge: you need to retrieve the object from the S3 bucket, remove relevant rows from the file, put a new version of the object into S3, and make sure you delete any older versions.

Locating and removing data manually can be time-consuming and prone to mistakes, considering the large number of objects typically in data lakes.

Amazon S3 Find and Forget solves these challenges with ready-to-use automations. It allows you to remove records from data lakes of any size that are in AWS Glue Data Catalog. The solution includes a web user interface that you can use and an API that you can use to integrate with your own applications.

Solution overview

Amazon S3 Find and Forget enables you to find and delete records automatically in data lakes on Amazon S3. Using the solution, you can:

  • Define which tables from your AWS Glue Data Catalog contain data you want to erase
  • Manage a queue of identifiers (such as unique customer identifiers) to erase
  • Erase rows from your data lake matching the queued record identifiers
  • Access a log of all actions taken by the solution

You can use Amazon S3 Find and Forget to work with data lakes stored on Amazon S3 in a supported file format.

The solution is developed and distributed as open-source software that you deploy and run inside your own AWS account. When deploying this solution, you only pay for the AWS services consumed to run it. We recommend reviewing the Cost Estimate guide and creating Amazon CloudWatch Billing Alarms to monitor charges before deploying the solution in your own account.

When you handle requests to remove data, you add the identifiers through the web interface or API to a Deletion Queue. The identifiers remain in the queue until you start a Deletion Job. The Deletion Job processes the queue and removes matching rows from objects in your data lake.

Where your requirements allow it, batching deletions can provide significant cost savings by minimizing the number of times the data lake needs to be re-scanned and processed. For example, you could start a Deletion Job once a week to process all requests received in the preceding week.

Solution demonstration

This section provides a demonstration of using Amazon S3 Find and Forget’s main features. To deploy the solution in your own account, refer to the User Guide.

For this demonstration, I have prepared in advance:

The first step is to deploy the solution using AWS CloudFormation by following the instructions in the User Guide. The CloudFormation stack can take 20-30 minutes to deploy depending on the options chosen when deploying.

Once deployed, I visit the web user interface by going to the address in the WebUIUrl CloudFormation stack output. Using a temporary password emailed to the address I provided in my CloudFormation parameters, I login and set a password for future use. I then see a dashboard with some base metrics for my Amazon S3 Find and Forget deployment:

I now need to create a Data Mapper so that Amazon S3 Find and Forget can find my data lake. To do this, I select Data Mappers, then Create Data Mapper:

On this screen, I give my Data Mapper a name, choose the AWS Glue database and table in my account that I want to operate on, and the columns that I want my deletions to match. In this demonstration, I’m using a copy of the Amazon Customer Reviews Dataset that I copied to my own S3 bucket. I’ll be using the customer_id column to remove data. In the dataset, this field contains a unique identifier for each customer who has created a product review.

I then specify the IAM role to be used when modifying the objects in S3. I also choose whether I want the old S3 object versions to be deleted for me. I can turn this off if I want to implement my own strategy to manage deleting old object versions, such as by using S3 lifecycle policies.

After choosing Create Data Mapper the Data Mapper is created, and I am prompted to grant permissions for S3 Find and Forget to operate in my bucket. In the Data Mapper list, I select my new Data Mapper, then choose Generate Access Policies. The interface displays a sample bucket policy that I copy and paste into the bucket policy for my S3 bucket in the AWS Management Console.

With the Data Mapper set up, I’m now able to add the customers who have requested to have their data deleted to the Deletion Queue. Using their Customer IDs, I go to the Deletion Queue section and select Add Match to the Deletion Queue.

I’ve chosen to delete from all the available Data Mappers, but I can also choose specific ones. Once I’ve added my matches, I can see a list of them on Deletion Queue page:

I can now run a deletion job that will cause the matches to be deleted from the data lake. To do this, I select Deletion Jobs then Start a Deletion Job.

After a few minutes the Deletion Job completes, and I can see metrics collected during the job including that the job took just over two-and-a-half minutes:

There is an Export to JSON option that includes all the metrics shown, more granular information about the Deletion Job, and which S3 objects were modified.

At this point the Deletion Queue is empty, and ready for me to use for future requests.

Solution design

This section includes a brief introduction to how the solution works. More comprehensive design documentation is available in the Amazon S3 Find and Forget GitHub repository.

The following diagram illustrates the architecture of this solution.

Amazon S3 Find and Forget uses AWS Serverless services to optimize for cost and scalability. The user interface and API are built using Amazon S3, Amazon Cognito, AWS Lambda, Amazon DynamoDB, and Amazon API Gateway, which automatically scale down when not in use so that there is no expensive baseline cost just for having the solution installed. These AWS services are always available and scale in concert with when the solution is used with a pay-for-what-you-use price model.

The Deletion Job workflow is coordinated using AWS Step Functions, Lambda, and Amazon Simple Queue Service (Amazon SQS). The solution uses Step Functions for high-level coordination and state tracking in the workflow, Lambda functions for discrete computation tasks, and Amazon SQS to store queues of repetitive work.

A deletion job has two phases: Find and Forget. In the Find phase, the solution uses Amazon Athena to scan the data lake for objects containing rows matching the identifiers in the deletion queue. For this to work at scale, we built a query planner Lambda function that uses the partition list in the AWS Glue Data Catalog for each data mapper to run an Athena query on each partition, returning the path to S3 objects that contain matches with the identifiers in the Deletion Queue. The object keys are then added to an SQS queue that we refer to as the Object Deletion Queue.

In the Forget phase, deletion workers are started as a service running on AWS Fargate. These workers process each object in the Object Deletion Queue by downloading the objects from the S3 bucket into memory, deleting the rows that contain matched identifiers, then putting a new version of the object to the S3 bucket using the same key. By default, older versions of the object are then deleted from the S3 bucket to make the deletion irreversible. You can alternatively disable this feature to implement your own strategy for deleting old object versions, such as by using an S3 Lifecycle policy.

Note that during the Forget phase, affected S3 objects are replaced at the time they are processed and are subject to the Amazon S3 data consistency model. We recommend that you avoid running a Deletion Job in parallel to a workload that reads from the data lake unless it has been designed to handle temporary inconsistencies between objects.

When the object deletion queue is empty, the Forget phase is complete and a final status is determined for the Deletion Job based on whether any errors occurred (for example, due to missing permissions for S3 objects).

Logs are generated for all actions throughout the Deletion Job, which you can use for reporting or troubleshooting. These are stored in DynamoDB, along with other persistent data including the Data Mappers and Deletion Queue.

Conclusion

In this post, we introduced the Amazon S3 Find and Forget solution, which assists data lake operators to handle data erasure requests they may receive pursuant to regulations such as GDPR, CCPA, and others. We then described features of the solution and how to use it for a basic use case.

You can get started today by deploying the solution from the GitHub repository, where you can also find more documentation of how the solution works, its features, and limits. We are continuing to develop the solution and welcome you to send feedback, feature requests, or questions through GitHub Issues.

 


About the Authors

Chris Deigan is an AWS Solution Engineer in London, UK. Chris works with AWS Solution Architects to create standardized tools, code samples, demonstrations, and quick starts.

 

 

 

Matteo Figus is an AWS Solution Engineer based in the UK. Matteo works with the AWS Solution Architects to create standardized tools, code samples, demonstrations and quickstarts. He is passionate about open-source software and in his spare time he likes to cook and play the piano.

 

 

 

Nick Lee is an AWS Solution Engineer based in the UK. Nick works with the AWS Solution Architects to create standardized tools, code samples, demonstrations and quickstarts. In his spare time he enjoys playing football and squash, and binge-watching TV shows.

 

 

 

Adir Sharabi is a Solutions Architect with Amazon Web Services. He works with AWS customers to help them architect secure, resilient, scalable and high performance applications in the cloud. He is also passionate about Data and helping customers to get the most out of it.

 

 

 

Cristina Fuia is a Specialist Solutions Architect for Analytics at AWS. She works with customers across EMEA helping them to solve complex problems, design and build data architectures so that they can get business value from analyzing their data.

 

A Day in the Life of a Content Analytics Engineer

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/a-day-in-the-life-of-a-content-analytics-engineer-eb0250b993be

Part of our series on who works in Analytics at Netflix — and what the role entails

by Rocio Ruelas

Back when we were all working in offices, my favorite days were Monday, Wednesday, and Friday. Those were the days with the best hot breakfast, and I’ve always been a sucker for free food. I started the day by arriving at the LA office right before 8am and finding a parking spot close to the entrance. I would greet the familiar faces at the reception desk and take a moment to check out which Netflix Original was currently being projected across the lobby. Take the elevator uninterrupted up to the top floor. Grab myself a plate of scrambled eggs, salsa, and bacon. Pour myself some coffee. Then sit at a small table next to the floor-to-ceiling windows with a clear view of the Hollywood sign.

My morning journey from lobby to elevators to breakfast (Photo Credit: Netflix)

During the day, the LA office buzzes with excitement and conversation. My time in the morning is like the calm before the storm — a chance to reflect before my head is full of numbers and figures. I often think about all the things that led me to becoming a Netflix employee. From my family immigrating to the United States from Mexico when I was very young to the teachers and professors that encouraged a low income student like me to dream big. It has been a journey and I’m grateful to be at a place that values the voice I bring to the table.

At the time of posting we’re working from home due to the pandemic, so my days look a bit different: The hot breakfasts are not as consistent and conversations are mainly with my dog. We still find ways to keep connected, but I for one am looking forward to when the office is fully open and I can look out to the Hollywood sign again.

Ok. But what do I actually do? (Besides eating breakfast)

What do I do at Netflix?

I’m a Senior Analytics Engineer on the Content and Marketing Analytics Research team. My team focuses on innovating and maintaining the metrics Netflix uses to understand performance of our shows and films on the service. We partner closely with the business strategy team to provide as much information as we can to our content executives, so that — combined with their industry experience — they can make the best decisions for Netflix.

Being an Analytics Engineer is like being a hybrid of a librarian 📚 and a Swiss army knife 🛠️: Two good things to have on hand when you’re not quite sure what you will need. Like a librarian, I have access to an encyclopedia of knowledge about our content data and have become the resident expert in one of our most important internal metrics. And like a Swiss army knife, I possess a multitude of tools to get the job done — be it SQL, Jupyter Notebooks, Tableau, or Google Sheets.

One of my favorite things about being an Analytics Engineer is the variety. I have some days where I am brainstorming and collaborating with amazing colleagues and other days where I can put my headphones on to work out a tough problem or build a dashboard.

One of my current projects involves understanding how viewing habits have evolved over the past several years. We started out with a small working group where we brainstormed the key questions to address, what data we could use to answer said questions, and came up with a work plan for how the analysis might take shape. Then I put on my headphones and got to work, writing SQL and using Tableau to present the data in a useful way. We met frequently to discuss our findings and iterate on the analysis. The great thing about these working groups is that we each contribute different skills and ideas. We benefit from both our individual strengths and our willingness to collaborate — Our values of Selflessness and Inclusion, in action.

How did I become interested in Analytics?

I did not set out from the start to be an Analyst. I never had a 5 year plan and my path has been a winding one.

Yours truly, featuring part of my extensive Netflix apparel collection
Yours truly, featuring part of my extensive Netflix apparel collection

In college, I majored in Physics because it was “the science that explains all the other sciences”. But what I ended up liking most about it was the math. Between that and the fact that there aren’t many entry-level physics jobs, I pursued a PhD in Applied Mathematics. This turned out to be a wise choice as I avoided entering the workforce right before the 2008 recession.

I loved grad school. The lectures, the research, and most of all the lifelong friendships. But as much as I enjoyed being a student, the academic track wasn’t for me. So without much of a plan I headed back home to California after graduation.

Looking around to see what I could do with my Applied Math background, I quickly settled on Data Science. I wasn’t well versed in it but I knew it was in demand. I started my new data science career as an analyst at a small marketing company. I had an incredible boss who encouraged me to learn new skills on the job. I honed my SQL and Python skills and implemented a clustering model. I also got my first introduction to working for an actual business.

Later on I went to Hulu to grow in the core skills of a data scientist. But while the predictive modeling I was doing was interesting and challenging, I missed being close to the business. As an analyst, I got to attend more meetings with the decision makers and be part of the conversation.

So by the time the opportunity arose to interview for a position at Netflix, I had figured out that Analytics was the best area for me.

It has been a journey and I’m grateful to be at a place that values the voice I bring to the table.

Why Netflix?

Growing up I watched a lot of TV. I mean a lot of TV. But I never thought I could actually work in the TV and Film business. I feel incredibly fortunate to be working at a job I am passionate about and to be at a company that brings joy to people around the world.

Even though I’d been a loyal Netflix customer since the DVD days, I had not heard about their unique culture until I started interviewing. When I did read the culture doc (which I recently learned is also published in Spanish and 12 other languages!), it sounded pretty intimidating. Phrases like “high performance” and “dream team” made me imagine an almost gladiator-style workplace. But I quickly learned this wasn’t the case. Through a combination of my existing network, the interview process, and other online resources about the company, I found that folks are actually very friendly and helpful! Everyone just wants to do their best work and help you do your best work too. Think more The Great British Baking Show and less Hell’s Kitchen. Selflessness really is embraced as an important Netflix value.

Having been here for 3 years now, I can say that working at Netflix is really special. The company is always evolving, big decisions are made in a transparent way, and I’m encouraged to voice my thoughts. But the single most important factor is the people. My Content Analytics teammates continuously impress me not only with their quality of work, but also with their kindness and mutual trust. This foundation makes innovating more fun, lets us be open about our passions outside of work, and means we genuinely enjoy each other’s company. That balance is crucial for me and is why this truly is the place where I can do my best work.

If this post resonates with you and you’d like to explore opportunities with Netflix, check out our analytics site, search open roles, and learn about our culture. You can also find more stories like this here.


A Day in the Life of a Content Analytics Engineer was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Extracting and joining data from multiple data sources with Athena Federated Query

Post Syndicated from Saurabh Bhutyani original https://aws.amazon.com/blogs/big-data/extracting-and-joining-data-from-multiple-data-sources-with-athena-federated-query/

With modern day architectures, it’s common to have data sitting in various data sources. We need proper tools and technologies across those sources to create meaningful insights from stored data. Amazon Athena is primarily used as an interactive query service that makes it easy to analyze unstructured, semi-structured, and structured data stored in Amazon Simple Storage Service (Amazon S3) using standard SQL. With the federated query functionality in Athena, you can now run SQL queries across data stored in relational, non-relational, object, and custom data sources and store the results back in Amazon S3 for further analysis.

The goals for this series of posts are to discuss how we can configure different connectors to run federated queries with complex joins across different data sources, how to configure a user-defined function for redacting sensitive information when running Athena queries, and how we can use machine learning (ML) inference to detect anomaly detection in datasets to help developers, big data architects, data engineers, and business analysts in their daily operational routines.

Athena Federated Query

Athena uses data source connectors that run on AWS Lambda to run federated queries. A data source connector is a piece of code that translates between your target data source and Athena. You can think of a connector as an extension of Athena’s query engine. Prebuilt Athena data source connectors exist for data sources like Amazon CloudWatch Logs, Amazon DynamoDB, Amazon DocumentDB, Amazon Elasticsearch Service (Amazon ES), Amazon ElastiCache for Redis, and JDBC-compliant relational data sources such as MySQL, PostgreSQL, and Amazon RedShift under the Apache 2.0 license. You can also use the Athena Query Federation SDK to write custom connectors. After you deploy data source connectors, the connector is associated with a catalog name that you can specify in your SQL queries. You can combine SQL statements from multiple catalogs and span multiple data sources with a single query.

When a query is submitted against a data source, Athena invokes the corresponding connector to identify parts of the tables that need to be read, manages parallelism, and pushes down filter predicates. Based on the user submitting the query, connectors can provide or restrict access to specific data elements. Connectors use Apache Arrow as the format for returning data requested in a query, which enables connectors to be implemented in languages such as C, C++, Java, Python, and Rust. Because connectors run in Lambda, you can use them to access data from any data source on the cloud or on premises that is accessible from Lambda.

The first post of this series discusses how to configure Athena Federated Query connectors and use them to run federated queries for data residing in HBase on Amazon EMR, Amazon Aurora MySQL, DynamoDB, and ElastiCache for Redis databases.

Test data

To demonstrate Athena federation capabilities, we use the TPCH sample dataset. TPCH is a decision support benchmark and has broad industry-wide relevance. This benchmark illustrates decision support systems that examine large volumes of data, run queries with a high degree of complexity, and give answers to critical business questions. For our use case, imagine a hypothetical ecommerce company with the following architecture:

  • Lineitems processing records stored in HBase on Amazon EMR to meet requirements for a write-optimized data store with high transaction rate and long-term durability
  • ElastiCache for Redis stores Nations and ActiveOrders tables so that the processing engine can get fast access to them
  • An Aurora with MySQL engine is used for Orders, Customer, and Suppliers accounts data like email addresses and shipping addresses
  • DynamoDB hosts Part and Partsupp data, because DynamoDB offers high flexibility and high performance

The following diagram shows a schematic view of the TPCH tables and their associated data stores.

Building a test environment using AWS CloudFormation

Before following along with this post, you need to create the required AWS resources in your account. To do this, we have provided you with an AWS CloudFormation template to create a stack that contains the required resources: the sample TPCH database on Amazon Relational Database Service (Amazon RDS), HBase on Amazon EMR, Amazon ElastiCache for Redis, and DynamoDB.

The template also creates the AWS Glue database and tables, S3 bucket, Amazon S3 VPC endpoint, AWS Glue VPC endpoint, Athena named queries, AWS Cloud9 IDE, an Amazon SageMaker notebook instance, and other AWS Identity and Access Management (IAM) resources that we use to implement the federated query, user-defined functions (UDFs), and ML inference functions.

This template is designed only to show how you can use Athena Federated Query, UDFs, and ML inference. This setup isn’t intended for production use without modification. Additionally, the template is created for use in the us-east-1 Region, and doesn’t work in other Regions.

Before launching the stack, you must have the following prerequisites:

  • An AWS account that provides access to AWS services
  • An IAM user with an access key and secret key to configure the AWS Command Line Interface (AWS CLI), and permissions to create an IAM role, IAM policies, and stacks in AWS CloudFormation

To create your resources, complete the following steps:

  1. Choose Launch Stack:
  2. Select I acknowledge that this template may create IAM resources.

This template creates resources that incur costs while they remain in use. Follow the cleanup steps at the end of this post to delete and clean up the resources to avoid any unnecessary charges.

  1. When the CloudFormation template is complete, record the outputs listed on the Outputs tab on the AWS CloudFormation console.

The CloudFormation stack takes approximately 20–30 minutes to complete. Check the AWS CloudFormation console and wait for the status CREATE_COMPLETE.

When stack creation is complete, your AWS account has all the required resources to implement this solution.

  1. On the Outputs tab of the Athena-Federation-Workshop stack, capture the following:
    1. S3Bucket
    2. Subnets
    3. WorkshopSecurityGroup
    4. EMRSecurityGroup
    5. HbaseConnectionString
    6. RDSConnectionString

You need all this information when setting up connectors.

  1. When the stacks are complete, check the status of the Amazon EMR steps on the Amazon EMR console.

It can take up to 15 minutes for this step to complete.

Deploying connectors and connecting to data sources

Preparing to create federated queries is a two-part process: deploying a Lambda function data source connector, and connecting the Lambda function to a data source. In the first part, you give the Lambda function a name that you can later choose on the Athena console. In the second part, you give the connector a name that you can reference in your SQL queries.

We want to query different data sources, so in the following sections we set up Lambda connectors for HBase on Amazon EMR, Aurora MySQL, DynamoDB, and Redis before we start creating complex joins across data sources using Athena federated queries. The following diagram shows the architecture of our environment.

Installing the Athena JDBC connector for Aurora MySQL

The Athena JDBC connector supports the following databases:

  • MySQL
  • PostGreSQL
  • Amazon Redshift

To install the Athena JDBC connector for Aurora MySQL, complete the following steps:

  1. In your AWS account, search for serverless application repository.
  2. Choose Available applications.
  3. Make sure that Show apps that create custom IAM roles or resource policies is selected.
  4. Search for athena federation.
  5. Locate and choose AthenaJdbcConnector.
  6. Provide the following values:
    1. Application name – Leave it as default name, AthenaJdbcConnector.
    2. SecretNamePrefix – Enter AthenaJdbcFederation.
    3. SpillBucket – Enter the S3Bucket value from the AWS CloudFormation outputs.
    4. DefaultConnectionString – Enter the RDSConnectionString value from the AWS CloudFormation outputs.
    5. DisableSpillEncryption – Leave it as the default value false.
    6. LambdaFunctionName – Enter mysql.
    7. LambdaMemory – Leave it as the default value 3008.
    8. LambdaTimeout – Leave it as the default value 900.
    9. SecurityGroupIds – Enter the WorkshopSecurityGroup value from the AWS CloudFormation outputs.
    10. SpillPrefix – Change the default value to athena-spill/jdbc.
    11. SubnetIds – Enter the Subnets value from the AWS CloudFormation outputs.
  7. Select I acknowledge that this app creates custom IAM roles.
  8. Choose Deploy.

This deploys the Athena JDBC connector for Aurora MySQL; you can refer to this Lambda function in your queries as lambda:mysql.

For more information about the Athena JDBC connector, see the GitHub repo.

Installing the Athena DynamoDB connector

To install Athena DynamoDB Connector, complete the following steps:

  1. In your AWS account, search for serverless application repository.
  2. Choose Available applications.
  3. Make sure that Show apps that create custom IAM roles or resource policies is selected.
  4. Search for athena federation.
  5. Locate and choose AthenaDynamoDBConnector.
  6. Provide the following values:
    1. Application name – Leave it as default name AthenaDynamoDBConnector.
    2. SpillBucket – Enter the S3Bucket value from the AWS CloudFormation outputs.
    3. AthenaCatalogName – Enter dynamo.
    4. DisableSpillEncryption – Leave it as the default value false.
    5. LambdaMemory – Leave it as the default value 3008.
    6. LambdaTimeout – Leave it as the default value 900.
    7. SpillPrefix – Enter athena-spill-dynamo.
  7. Select I acknowledge that this app creates custom IAM roles.
  8. Choose Deploy.

This deploys Athena DynamoDB connector; you can refer to this Lambda function in your queries as lambda:dynamo.

For more information about the Athena DynamoDB connector, see the GitHub repo.

Installing the Athena HBase connector

To install the Athena HBase connector, complete the following steps:

  1. In your AWS account, search for serverless application repository.
  2. Choose Available applications.
  3. Make sure that Show apps that create custom IAM roles or resource policies is selected.
  4. Search for athena federation.
  5. Locate and choose AthenaHBaseConnector.
  6. Provide the following values:
    1. Application name – Leave it as default name AthenaHBaseConnector
    2. SecretNamePrefix – Enter hbase-*.
    3. SpillBucket – Enter the S3Bucket value from the AWS CloudFormation outputs.
    4. AthenaCatalogName – Enter hbase.
    5. DisableSpillEncryption – Leave it as the default value false.
    6. DefaultConnectionString – Enter the HbaseConnectionString value from the AWS CloudFormation outputs.
    7. LambdaMemory – Leave it as the default value of 3008.
    8. LambdaTimeout – Leave it as the default value of 900.
    9. SecurityGroupIds – Enter the EMRSecurityGroup value from the AWS CloudFormation outputs.
    10. SpillPrefix – Enter athena-spill-hbase.
    11. SubnetIds – Enter the Subnets value from the AWS CloudFormation outputs.
  7. Select I acknowledge that this app creates custom IAM roles.
  8. Choose Deploy.

This deploys the Athena HBase connector; you can refer to this Lambda function in your queries as lambda:hbase.

For more information about the Athena HBase connector, see the GitHub repo.

Installing the Athena Redis connector

To install Athena Redis Connector, complete the following steps:

  1. In your AWS account, search for serverless application repository.
  2. Choose Available applications.
  3. Make sure that Show apps that create custom IAM roles or resource policies is selected.
  4. Search for athena federation.
  5. Locate and choose AthenaRedisConnector.
  6. Provide the following values:
    1. Application name – Leave it as default name AthenaRedisConnector.
    2. SecretNameOrPrefix – Enter redis-*.
    3. SpillBucket – Enter the S3Bucket value from the AWS CloudFormation outputs.
    4. AthenaCatalogName – Enter redis.
    5. DisableSpillEncryption – Leave it as the default value false.
    6. LambdaMemory – Leave it as the default value 3008.
    7. LambdaTimeout – Leave it as the default value 900.
    8. SecurityGroupIds – Enter the EMRSecurityGroup value from the AWS CloudFormation outputs.
    9. SpillPrefix – Enter athena-spill-redis.
    10. SubnetIds – Enter the Subnets value from the AWS CloudFormation outputs.
  7. Select I acknowledge that this app creates custom IAM roles.
  8. Choose Deploy.

This deploys the Athena Redis connector; you can refer to this Lambda function in your queries as lambda:redis.

For more information about the Athena Redis connector, see the GitHub repo.

Redis database and tables with the AWS Glue Data Catalog

Because Redis doesn’t have a schema of its own, the Redis connector can’t infer the columns or data type from Redis. The Redis connector needs an AWS Glue database and tables to be set up so it can associate the data to the schema. The CloudFormation template creates the necessary Redis database and tables in the Data Catalog. You can confirm this on the AWS Glue console.

Running federated queries

Now that the connectors are deployed, we can run Athena queries that use those connectors.

  1. On the Athena console, choose Get Started.
  2. Make sure you’re in the workgroup AmazonAthenaPreviewFunctionality. If not, choose Workgroups, select AmazonAthenaPreviewFunctionality, and choose Switch Workgroup.

On the Saved Queries tab, you can see a list of pre-populated queries to test.

The Sources saved query tests your Athena connector functionality for each data source, and you can make sure that you can extract data from each data source before running more complex queries involving different data sources.

  1. Highlight the first query up to the semicolon and choose Run query.

After successfully testing connections to each data source, you can proceed with running more complex queries, such as:

  • FetchActiveOrderInfo
  • ProfitBySupplierNationByYr
  • OrdersRevenueDateAndShipPrio
  • ShippedLineitemsPricingReport
  • SuppliersWhoKeptOrdersWaiting

If you see an error on the HBase query like the following, try rerunning it and it should resolve the issue.

GENERIC_USER_ERROR: Encountered an exception[java.lang.RuntimeException] from your LambdaFunction[hbase] executed in context[retrieving meta-data] with message[org.apache.hadoop.hbase.client.RetriesExhaustedException: Can't get the location for replica 0]

As an example of the advanced queries, the SuppliersWhoKeptOrdersWaiting query identifies suppliers whose product was part of a multi-supplier order (with current status of F) and they didn’t ship the required parts on time. This query uses multiple data sources: Aurora MySQL and HBase on Amazon EMR. As shown in the following screenshot, the query extracts data from the supplier table on Aurora MySQL, the lineitem table on HBase, and the orders tables on Aurora MySQL. The results are returned in 7.13 seconds.

Cleaning up

To clean up the resources created as part of our CloudFormation template, complete the following steps:

  1. On the Amazon S3 console, empty the bucket athena-federation-workshop-<account-id>.
  2. If you’re using the AWS CLI, delete the objects in the athena-federation-workshop-<account-id> bucket with the following code (make sure you’re running this command on the correct bucket):
    aws s3 rm s3://athena-federation-workshop-<account-id> --recursive

  3. On the AWS CloudFormation console, delete all the connectors so they’re no longer attached to the elastic network interface (ENI) of the VPC. Alternatively, go to each connector and deselect the VPC so it’s no longer attached to the VPC created by AWS CloudFormation.
  4. On the Amazon SageMaker console, delete any endpoints you created as part of the ML inference.
  5. On the Athena console, delete the AmazonAthenaPreviewFunctionality workgroup.
  6. On the AWS CloudFormation console or the AWS CLI, delete the stack Athena-Federation-Workshop.

Summary

In this post, we demonstrated the functionality of Athena federated queries by creating multiple different connectors and running federated queries against multiple data sources. In the next post, we show you how you can use the Athena Federation SDK to deploy your UDF and invoke it to redact sensitive information in your Athena queries.


About the Authors

Saurabh Bhutyani is a Senior Big Data Specialist Solutions Architect at Amazon Web Services. He is an early adopter of open-source big data technologies. At AWS, he works with customers to provide architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation.

 

 

 

Amir Basirat is a Big Data Specialist Solutions Architect at Amazon Web Services, focused on Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation, where he helps customers craft distributed analytics applications on the AWS platform. Prior to his AWS Cloud journey, he worked as a big data specialist for different technology companies. He also has a PhD in computer science, where his research primarily focused on large-scale distributed computing and neural networks.

 

 

 

 

Introducing Bot Analytics

Post Syndicated from Ben Solomon original https://blog.cloudflare.com/introducing-bot-analytics/

Introducing Bot Analytics

Introducing Bot Analytics

Bots — both good and bad — are everywhere on the Internet. Roughly 40% of Internet traffic is automated. Fortunately, Cloudflare offers a tool that can detect and block unwanted bots: we call it Bot Management. This is the most recent platform in our long history of detecting bots for our customers. In fact, Cloudflare has always offered some form of bot detection. Over the past two years, our team has focused on building advanced detection engines, innovating as bots become more sophisticated, and creating new features.

Today, we are releasing Bot Analytics to help you visualize your automated traffic.

Background

It’s worth including some background for those who are new to bots.

Many websites expect human behavior. When I shop online, I behave as anyone else would: I might search for a few items, read reviews when I find something interesting, and eventually complete an order. This is expected. It is a standard use of the Internet.

Introducing Bot Analytics

Unfortunately, without protection these sites can be ripe for exploitation. Those shoes I was looking at? They are limited edition sneakers that resell for five times the price. Sneaker hoarders clamor at the chance to buy a pair (or fifty). Or perhaps I just added a book to my cart: there are probably hundreds of online retailers that sell the same book, each one eager to offer the best price. These retailers desperately want to know what their competitors’ prices are.

You can see where this is going. While most humans make good use of the Internet, some use automated tools to perform abuse at scale. For example, attackers will deplete sneaker inventories by using automated bots to check out quickly. By the time humans click “add to cart,” bots have already paid for shipping. Humans hardly stand a chance. Similarly, online retailers keep track of their competitors with “price scraping” bots that collect pricing information. So when one retailer lowers a book price to $10, another retailer’s bot will respond by pricing at $9.99. This is how we end up with weird prices like $12.32 for toilet paper. Worst of all, malicious bots are incentivized to hide their identities. They’re hidden among us.

Introducing Bot Analytics

Not all bots are bad. Cloudflare maintains a list of verified good bots that we keep separated from the rest. Verified bots are usually transparent about who they are: DuckDuckGo, for example, publicly lists the IP addresses it uses for its search engine. This is a well-intentioned service that happens to be automated, so we verified it. We also verify bots for error monitoring and other tools.

Enter: Bot Analytics

Introducing Bot Analytics

As discussed earlier, we built a Bot Management platform that intelligently detects bots on the Internet, allowing our customers to block bad ones and allow good ones. If you’re curious about how our solution works, read here.

Beginning today, we are going to show you the bots that reach your website. You can see these bots with a new tool called Bot Analytics. It’s fast, accurate, and loaded with information. You can query data up to one month in the past with no noticeable lag. To accomplish this, we exposed the data with GraphQL and paired it with adaptive bitrate (ABR) technology to dynamically load content. If you already have Bot Management added to your Cloudflare account, Bot Analytics is included in your service. Open up your dashboard and let’s take a tour…

The Tour

First: where to go? Bot Analytics lives under the Firewall tab of the dashboard. Once you’re in the Firewall, go to “Overview” and click the second thumbnail on the left. Remember, Bot Management must be added to your account for full access to analytics.

Introducing Bot Analytics

It’s worth noting that Enterprise sites without Bot Management can see a snapshot of their bot traffic. This data is updated in real time and should help you determine if you have a bot problem. Generally speaking, if you have a double-digit percentage of automated traffic, you might be spending more on origin costs than you have to. More importantly, you might be losing revenue or sensitive information to inventory hoarding and credential stuffing.

“Requests by bot score” is the first section on the page. Here, we show traffic over time, but we split it vertically by the traffic type. Green segments represent verified bots, while shades of purple and blue show varying degrees of bot/human likelihood.

Introducing Bot Analytics

“Bot score distribution” is next. This shows similar data, but we display it horizontally without the notion of time. Use the slider below to filter on subsets of traffic and watch the rest of the page adapt.

Introducing Bot Analytics

We recommend that you use the slider to find your ideal bot threshold. In other words: what is the cutoff for suspicious traffic on your site? We generally consider traffic below 30 to be automated, but customers might choose to challenge traffic below 40 or block traffic below 10 (you can even do both!). You should set a threshold that is ambitious but not too aggressive. If your traffic looks like the example below, consider setting a threshold at a “drop off” point like 3 or 14. Why? Notice that the request density is very high near scores 1-2 and 12-13. Many of these requests will have similar characteristics, meaning that the scores immediately above them (3 and 14) offer some differentiating quality. These are the most promising places to segment your bot rules. Notably, not every graph is this pronounced.

Introducing Bot Analytics

“Bot score source” sits lower on the page. Here, you can examine the detection engines that are responsible for scoring your traffic. If you can’t remember the purpose of each engine, simply hover over the tooltip to view a brief description. Customers may wonder why some requests are flagged as “not computed.” This commonly occurs when Cloudflare has issued an error page on your behalf. Perhaps a visitor’s request was met with a gateway timeout (error 504), in which case Cloudflare responded with a branded error page. The error page would not have warranted a challenge or a block, so we did not spend time calculating a bot score. We published another blog post that provides an overview of the most common sources, including machine learning and heuristics.

Introducing Bot Analytics

“Top requests by source” is the final section of Bot Analytics. Although it’s not quite as colorful as the sections above, this section grounds Bot Analytics in highly specific data. You can filter or exclude request attributes, including IP addresses, user agents, and ASNs. In the next section, we’ll use this to spot a bot attack.

Let’s Spot A Bot Attack!

First, I’m going to use the “bot score source” tool to select the most obvious bot requests — those detected by our heuristics engine. This provides us with the following information, some of which has been redacted for privacy reasons:

Introducing Bot Analytics

I already suspect a correlation between a few of these attributes. First, the IP addresses all have very similar request counts. No human would access a site 22,000 times, and the uniformity across IPs 2-5 suggests foul play. Not surprisingly, the same pattern occurs for user agents on the right. User agents tell us about the browser and device associated with a particular request. When Bot Analytics shows this much uniformity and presents clear anomalies in country and ASN, I get suspicious (and you should too). I’m now going to filter on these anomalies to see if my instinct is right:

Introducing Bot Analytics

The trends hold true — to be sure, I briefly expanded the table and found nine separate IP addresses exhibiting the same behavior. This is likely an aggressive content scraper. Notably, it is not marked as a verified bot, so Bot Management issued the lowest possible score and flagged it as “automated.” At the top of Bot Analytics, I will narrow down the traffic and keep the time period at 24 hours:

Introducing Bot Analytics

The most severe attacks come and go. This traffic is clearly sustained, and my best guess is that someone is frequently scraping the homepage for content. This isn’t the most malicious of attacks, but content is still being taken. If I wanted to, I could set a firewall rule to target this bot score or any of the filters I used.

Try It Out

As a reminder, all Enterprise customers will be able to see a snapshot of their bot traffic. Even if you don’t have Bot Management for your site, visit the Firewall for some high-level insights that are updated in real time.

Introducing Bot Analytics

And for those of you with Bot Management — check out Bot Analytics! It’s live now, and we hope you’ll have fun using it. Keep your eyes open for new analytics features in the coming months.

How the ZS COVID-19 Intelligence Engine helps Pharma & Med device manufacturers understand local healthcare needs & gaps at scale

Post Syndicated from Saunak Chandra original https://aws.amazon.com/blogs/big-data/how-the-zs-covid-19-intelligence-engine-helps-pharma-med-device-manufacturers-understand-local-healthcare-needs-gaps-at-scale/

This post is co-written by Parijat Sharma: Principal, Strategy & Transformation, Wenhao Xia: Manager, Data Science, Vineeth Sandadi: Manager, Business Consulting from ZS Associates, Inc, Arianna Tousi: Strategy, Insights and Planning Consultant from ZS, Gopi Vikranth: Associate Principal from ZS. In their own words, “We’re passionately committed to helping our clients and their customers thrive, working side by side to drive customer value and results”.

The COVID-19 trajectory across the US continues to remain unstable and heterogeneous. Although certain cities and counties were able to tame the adverse effects of the pandemic by applying stricter controls on social life, newer hotspots are emerging in different locations sporadically.

Organizations in healthcare, pharma, and biotech are looking to adapt to a rapidly evolving and diverse local market landscape, and restart parts of their operations that are significantly impacted, such as patient support functions, sales, and key account management. Real-time insights into the rapidly evolving COVID-19 situation and its impact on all key stakeholders in the healthcare supply chain, including patients, physicians, and health systems, is a key asset in helping companies adapt based on local market dynamics and remain resilient to future disruptions. However, several life-science companies don’t have these insights because they lack the infrastructure to integrate and manage the relevant datasets at scale and the analytical capabilities to mine the data for the relevant insights.

ZS came into this critical situation and built a data lake on AWS to address these challenges. The primary characteristics of this data lake is that it’s largely open source, which gives ZS a head start to meet the product launch SLA using AWS. This post describes how ZS developed the data lake and brought their proprietary machine learning (ML) models to run on AWS, providing intelligent insight on COVID-19.

What is the ZS COVID-19 Intelligence Engine?

The ZS COVID-19 Intelligence Engine was designed as a customizable capability that does the following:

  • Integrates diverse public and proprietary healthcare datasets in a scalable data warehouse that stores data in a secure and compliant manner
  • Provides advanced descriptive and predictive analytical modules to forecast COVID-19 evolution and its impact on key stakeholders and the treatment journey
  • Packages insights into intuitive preconfigured reports and dashboards for dissemination across an organization

AWS Cloud data and analytics infrastructure

In this section, we dive into the infrastructure components of the ZS COVID-19 Intelligence Engine. The objective was to quickly set up a data lake with an accompanying ingestion mechanism to allow rapid ingestion of public datasets, third-party data, and datasets from AWS Data Exchange.

The overall data processing solution is based on ZS’s REVO™ data management product, which uses Apache Spark on Amazon EMR. The Spark engine processes and transforms raw data into structured data that is ready for interactive analysis. The raw data comes in compressed text delimited format ranging from 100 MBs to 15 GB. After the data is cleansed and rules applied, the processed data is staged in Amazon Simple Storage Service (Amazon S3) buckets in Apache Parquet format. This data is selectively loaded into an Amazon Redshift cluster for fast interactive querying and repetitive analysis on subsets of data.

The Intelligence Engine also uses a powerful Amazon Elastic Compute Cloud (Amazon EC2) instance to run ML workloads, which predicts future COVID-19 caseloads at the county level. The prediction models run daily on a compute-optimized EC2 C5.24xlarge On-Demand Instance, allowing rapid turnaround of prediction results and saving overall cost for using On-Demand Instances.

ZS uses Amazon Redshift as the data warehouse in this architecture. Amazon Redshift is easy to launch and maintain and can quickly run analytical queries on large normalized datasets using standard ANSI SQL. After the raw data gets processed using ZS’s REVO™, the curated data is loaded into Amazon Redshift to run interactive analytical queries. The queries generate insights specific to local geography, county, and healthcare systems, and run on Amazon Redshift tables consisting of anonymized patient data. The Amazon Redshift cluster uses On-Demand Instances and is sized to accommodate 25 TB of data at the time of this product launch. Typical interactive queries include joining data across large tables, up to 1.5 billion rows in the main table.

The following diagram illustrates this architecture:

The ZS COVID-19 data lake has several benefits and applicable use cases:

  • Streamlined data procurement processes – Eliminates the need for multiple ZS teams to procure, ingest, and process the same datasets separately
  • Optimized common usage across clients and business questions – ZS uses this capability to publish common derivations of data that can then be utilized across different ZS teams and use cases to create a single version of truth
  • Cross-functional processes and requirements – Some analytics use cases require cross-functional data and are significantly hampered by the ability of a user to access various data sources in one place—a data lake enables this by design
  • Connected healthcare data – Due to developing common standards and integrating with MDM and ontologies, data from the public domain can be compliantly integrated with pharma manufacturer-specific data sources to enable seamless analytics on the data lake

Comprehensive healthcare data lake

At its core, the Intelligence Engine builds a scalable and integrated repository of diverse public and proprietary data sources. These datasets range in variety, volume, and velocity:

  • COVID-19 incidence – There are several COVID-specific datasets that the public has become accustomed to viewing over the past several months, such as Johns Hopkins incidence tracking and IHME predictive data, which describes how the disease has been progressing over time and even into the future. This data tends to be at either the state or county level and is often refreshed daily. The data lake solution contains the entire history for these datasets, which, taken together, spans into the hundreds of gigabytes in size. In addition to these sources, ZS’ proprietary predictive models add an additional element of accuracy and are customized with ZS-specific insights.
  • Government policies – Government policy data, which is mostly being used from AWS Data Exchange on behalf of the New York Times, explains the current state of government mandates and recommendations for varying degrees of lockdown or reopening as it pertains to the pandemic. This data is much smaller in volume, well under 1 GB total.
  • Insurance claims at patient level – Thanks to the partnership with Symphony Health, ZS have had the opportunity to analyze and expose patient claims data that can be attributed to the specific hospital account or healthcare provider for which that claim took place. The insurance claims data is the largest volume of data—close to 15 TB—contributing to the ZS COVID-19 Intelligence Engine. ZS’ data engineering team has wrangled these large datasets with the help of Amazon EMR for aggregating and processing metrics, which are then stored in Amazon Redshift in a transformed version that is much smaller and can be more easily understood than the original raw datasets.
  • HCP to site of care affiliations – Thanks to the partnership with Definitive Healthcare, ZS are in the process of integrating best-in-class physician-hospital and clinic affiliations from Definitive Healthcare with patient claims from Symphony to help assess available healthcare capacity and evolving approaches to care delivery and type of care being delivered by disease area.
  • Other Intelligence engine data sources
    • State testing rates
    • Mobility
    • Demographics and social determinants of health
    • Provider access and affinity for pharma commercial engagement (from ZS affinity/access monitor)
    • Automated data ingestors for a variety of pharma manufacturer-specific data sources including specialty pharmacy and hub transactions, sales force activity, customer digital engagement, and more

Predictive models for COVID-19 projections and healthcare demand-supply gaps at a local level

To drive decision-making at a local level, ZS required more granular projections of COVID-19 disease spread than what’s publicly available at a state or national level. Therefore, as part of the Intelligence Engine, the ZS data science team aimed to developed an ensemble model of COVID-19 projections at the county level to identify emerging local healthcare gaps along different phases of the treatment process.

Developing a locally predictive model has many challenges, and ZS believe that no single model can capture all the virtually infinite drivers and factors contributing to disease spread within a specific geographic area. Therefore, the ZS data science team behind the COVID-19 projections has implemented multiple projection models, each with their own set of input data sources, assumptions, and parameters. This allows to increase the accuracy of the projection while retaining a level of stability and interpretability of tge model. These models include:

  • Statistical curve fitting model – A disease progression curve using a Generalized Gaussian Cumulative Distribution Function, optimized to minimize prediction error of COVID-19 cases and deaths
  • SEIR model – Traditional epidemiological disease progression model (pathway of Susceptible – Exposed – Infectious – Recovered) combined with traditional ML on model parameters
  • Agent-based simulation – County-level simulation of individual interactions between people within the county

Obtaining a more granular view of future virus spread at a local level is critical in order to provide support for challenges in specific sites of care. Accurately projecting cases at the county level can be difficult for many reasons. Counties with low current case counts means that the model has little historical data to learn from (both in time since first infection and in magnitude of cases). Additionally, forecasts can be sensitive to many variables, and the current second wave of COVID-19 infections adds additional complications to tracking the spread of the virus.

To combat some of these difficulties, ZS implemented a two-phased approach to generate county-level projections. Counties with a long enough history of virus spread are projected independently using the three disease progression models we outlined, whereas counties with limited history are projected using a combination of state-level projections and social determinants of health factors that are predictive of disease spread (for example, age distribution in a certain county).

As the world around us continues to evolve and the COVID-19 situation with it, the ZS data science team is also working to adapt the model alongside the current situation. Currently, model adaptability and its self-learning ability are continuing to improve to better adapt to the onset of the second wave of the virus. Additional parameters and re-optimizations are happening daily as the situation develops.

Following image shows the Input data sources, modeling techniques and outputs from ZS COVID-19 projection models:

Analyzing and predicting local non-COVID-19 treatment gaps and their drivers

Several flexible analytical tools can be used to evaluate barriers along the disease treatment journey for non-COVID-19 diseases at the local geography level, their evolution over time with COVID-19, and their underlying drivers. These tools summarize local changes in and the underlying drivers of the following:

  • New patient diagnosis
  • Changes in treatment approaches and drugs used
  • Patient affordability and access to medications
  • Persistency and compliance to treatment
  • Healthcare demand, patients needing care and supply, provider capacity to offer care

Following image represents output from the Intelligence Engine illustrating local variations in Healthcare gaps:

Intuitive visualization capabilities

The solution has two intuitive visualization capabilities:

  • COVID-19 monitor – A public access dashboard with insights on historical and future predictions of trajectories of COVID-19 incidences and hospital capacity. These insights are available at the state level and further and allow you to drill into individual counties. The individual county-level view allows you to not only understand the severity of COVID-19 in that area, but also better understand how that county compares to other counties within the same state and observe what policies their local governments have set for the shutdown and reopening process.
  • Treatment finder: A second public access dashboard with near-real-time insights into individual hospital and physician group availability to treat patients for prominent non-COVID-19 diseases. This dashboard allows you to select a specific non-COVID-19 disease and identify the estimated number of COIVD-19-infected people in their geography with the disease, mortality rates, and the individual providers that are accepting patients with a specific disease and health insurance.

Following image represents Intelligence Engine screen with COVID-19 insights for a selected county:

Following image represents Intelligence engine screen that allows patients to find Hospitals / Physician offices that are open & accepting patients:

Conclusion

At its core, the ZS Intelligence Engine is a real-time planning tool. The rich set of AWS services and technologies make it possible to ingest data from various third-party sources—public and proprietary sources alike. AWS services used to build the architecture can run on open technologies. For example, building the the data lake would not have been possible  without Amazon EMR and Amazon EC2. ZS had already been using Apache spark-based EMR instances—the service behind the REVOTM tool—prior to COVID-19 hitting us. ZS can run its ML models cost-effectively by using EC2 On-Demand Instances. Finally, using Amazon Redshift as a data warehouse solution allows ZS to provide COVID-19 analytical insights efficiently and cost-effectively.

Since the project went live, ZS has catered this product to at least six customers in pharma, biotech, and medical device spaces. They are using this product in a variety of ways, including but not limited to:

  • Refining the forecast relating the COVID-19 trajectory to estimate demand for their products
  • Assessing the level of openness of healthcare facilities to understand where patients across therapy areas are being treated
  • Determining which patients and communities to support, because COVID-19 impacts attitudes and concerns regarding immunity and drug use, and greater unemployment means more reimbursement support requirements
  • Readying the education and engagement field force for a mix of in-person and virtual interactions
  • Preparing the supply chain to ensure continuity of care

To try out the analysis yourself, see ZS’s COVID-19 Intelligence Engine.


About the  Authors

Saunak is a Sr. Solutions Architect with AWS helping customers and partners build data warehouse and scalable data platform on AWS.

 

 

Parijat is the current lead of strategy and transformation at ZS. He focuses on mid to small clients that are ready for a transformational process to commercialize new products/portfolio, purchase/sell assets or expand into new markets.

 

 

Wenhao has over 10 years of experience in various data science and advanced analytics field. During his time at ZS, he has helped both to build and popularize data science capabilities across many organizations.

 

 

Vineeth works with Pharmaceutical & Biotech manufacturers on a  broad-spectrum of Commercial issues including Commercial Analytics, Organized provider Strategy & Resource Planning & Deployment.

 

 

Arianna is a Strategy, Insights and Planning Consultant in ZS’ High Tech practice. Arianna has extensive experience in working with clients across industries with go to market strategy and commercial effectiveness issues.

 

 

Gopi Vikranth is an Associate Principal in ZS’ High Tech Practice. He has extensive experience in helping clients across Retail, HiTech, Hospitality, Pharmaceutical & Insurance sectors leverage BigData & Analytics to drive Topline growth.

 

AWS serverless data analytics pipeline reference architecture

Post Syndicated from Praful Kava original https://aws.amazon.com/blogs/big-data/aws-serverless-data-analytics-pipeline-reference-architecture/

Onboarding new data or building new analytics pipelines in traditional analytics architectures typically requires extensive coordination across business, data engineering, and data science and analytics teams to first negotiate requirements, schema, infrastructure capacity needs, and workload management.

For a large number of use cases today however, business users, data scientists, and analysts are demanding easy, frictionless, self-service options to build end-to-end data pipelines because it’s hard and inefficient to predefine constantly changing schemas and spend time negotiating capacity slots on shared infrastructure. The exploratory nature of machine learning (ML) and many analytics tasks means you need to rapidly ingest new datasets and clean, normalize, and feature engineer them without worrying about operational overhead when you have to think about the infrastructure that runs data pipelines.

A serverless data lake architecture enables agile and self-service data onboarding and analytics for all data consumer roles across a company. By using AWS serverless technologies as building blocks, you can rapidly and interactively build data lakes and data processing pipelines to ingest, store, transform, and analyze petabytes of structured and unstructured data from batch and streaming sources, all without needing to manage any storage or compute infrastructure.

In this post, we first discuss a layered, component-oriented logical architecture of modern analytics platforms and then present a reference architecture for building a serverless data platform that includes a data lake, data processing pipelines, and a consumption layer that enables several ways to analyze the data in the data lake without moving it (including business intelligence (BI) dashboarding, exploratory interactive SQL, big data processing, predictive analytics, and ML).

Logical architecture of modern data lake centric analytics platforms

The following diagram illustrates the architecture of a data lake centric analytics platform.

You can envision a data lake centric analytics architecture as a stack of six logical layers, where each layer is composed of multiple components. A layered, component-oriented architecture promotes separation of concerns, decoupling of tasks, and flexibility. These in turn provide the agility needed to quickly integrate new data sources, support new analytics methods, and add tools required to keep up with the accelerating pace of changes in the analytics landscape. In the following sections, we look at the key responsibilities, capabilities, and integrations of each logical layer.

Ingestion layer

The ingestion layer is responsible for bringing data into the data lake. It provides the ability to connect to internal and external data sources over a variety of protocols. It can ingest batch and streaming data into the storage layer. The ingestion layer is also responsible for delivering ingested data to a diverse set of targets in the data storage layer (including the object store, databases, and warehouses).

Storage layer

The storage layer is responsible for providing durable, scalable, secure, and cost-effective components to store vast quantities of data. It supports storing unstructured data and datasets of a variety of structures and formats. It supports storing source data as-is without first needing to structure it to conform to a target schema or format. Components from all other layers provide easy and native integration with the storage layer. To store data based on its consumption readiness for different personas across organization, the storage layer is organized into the following zones:

  • Landing zone – The storage area where components from the ingestion layer land data. This is a transient area where data is ingested from sources as-is. Typically, data engineering personas interact with the data stored in this zone.
  • Raw zone – After the preliminary quality checks, the data from the landing zone is moved to the raw zone for permanent storage. Here, data is stored in its original format. Having all data from all sources permanently stored in the raw zone provides the ability to “replay” downstream data processing in case of errors or data loss in downstream storage zones. Typically, data engineering and data science personas interact with the data stored in this zone.
  • Curated zone – This zone hosts data that is in the most consumption-ready state and conforms to organizational standards and data models. Datasets in the curated zone are typically partitioned, cataloged, and stored in formats that support performant and cost-effective access by the consumption layer. The processing layer creates datasets in the curated zone after cleaning, normalizing, standardizing, and enriching data from the raw zone. All personas across organizations use the data stored in this zone to drive business decisions.

Cataloging and search layer

The cataloging and search layer is responsible for storing business and technical metadata about datasets hosted in the storage layer. It provides the ability to track schema and the granular partitioning of dataset information in the lake. It also supports mechanisms to track versions to keep track of changes to the metadata. As the number of datasets in the data lake grows, this layer makes datasets in the data lake discoverable by providing search capabilities.

Processing layer

The processing layer is responsible for transforming data into a consumable state through data validation, cleanup, normalization, transformation, and enrichment. It’s responsible for advancing the consumption readiness of datasets along the landing, raw, and curated zones and registering metadata for the raw and transformed data into the cataloging layer. The processing layer is composed of purpose-built data-processing components to match the right dataset characteristic and processing task at hand. The processing layer can handle large data volumes and support schema-on-read, partitioned data, and diverse data formats. The processing layer also provides the ability to build and orchestrate multi-step data processing pipelines that use purpose-built components for each step.

Consumption layer

The consumption layer is responsible for providing scalable and performant tools to gain insights from the vast amount of data in the data lake. It democratizes analytics across all personas across the organization through several purpose-built analytics tools that support analysis methods, including SQL, batch analytics, BI dashboards, reporting, and ML. The consumption layer natively integrates with the data lake’s storage, cataloging, and security layers. Components in the consumption layer support schema-on-read, a variety of data structures and formats, and use data partitioning for cost and performance optimization.

Security and governance layer

The security and governance layer is responsible for protecting the data in the storage layer and processing resources in all other layers. It provides mechanisms for access control, encryption, network protection, usage monitoring, and auditing. The security layer also monitors activities of all components in other layers and generates a detailed audit trail. Components of all other layers provide native integration with the security and governance layer.

Serverless data lake centric analytics architecture

To compose the layers described in our logical architecture, we introduce a reference architecture that uses AWS serverless and managed services. In this approach, AWS services take over the heavy lifting of the following:

  • Providing and managing scalable, resilient, secure, and cost-effective infrastructural components
  • Ensuring infrastructural components natively integrate with each other

This reference architecture allows you to focus more time on rapidly building data and analytics pipelines. It significantly accelerates new data onboarding and driving insights from your data. The AWS serverless and managed components enable self-service across all data consumer roles by providing the following key benefits:

  • Easy configuration-driven use
  • Freedom from infrastructure management
  • Pay-per-use pricing model

The following diagram illustrates this architecture.

Ingestion layer

The ingestion layer in our serverless architecture is composed of a set of purpose-built AWS services to enable data ingestion from a variety of sources. Each of these services enables simple self-service data ingestion into the data lake landing zone and provides integration with other AWS services in the storage and security layers. Individual purpose-built AWS services match the unique connectivity, data format, data structure, and data velocity requirements of operational database sources, streaming data sources, and file sources.

Operational database sources

Typically, organizations store their operational data in various relational and NoSQL databases. AWS Data Migration Service (AWS DMS) can connect to a variety of operational RDBMS and NoSQL databases and ingest their data into Amazon Simple Storage Service (Amazon S3) buckets in the data lake landing zone. With AWS DMS, you can first perform a one-time import of the source data into the data lake and replicate ongoing changes happening in the source database. AWS DMS encrypts S3 objects using AWS Key Management Service (AWS KMS) keys as it stores them in the data lake. AWS DMS is a fully managed, resilient service and provides a wide choice of instance sizes to host database replication tasks.

AWS Lake Formation provides a scalable, serverless alternative, called blueprints, to ingest data from AWS native or on-premises database sources into the landing zone in the data lake. A Lake Formation blueprint is a predefined template that generates a data ingestion AWS Glue workflow based on input parameters such as source database, target Amazon S3 location, target dataset format, target dataset partitioning columns, and schedule. A blueprint-generated AWS Glue workflow implements an optimized and parallelized data ingestion pipeline consisting of crawlers, multiple parallel jobs, and triggers connecting them based on conditions. For more information, see Integrating AWS Lake Formation with Amazon RDS for SQL Server.

Streaming data sources

The ingestion layer uses Amazon Kinesis Data Firehose to receive streaming data from internal and external sources. With a few clicks, you can configure a Kinesis Data Firehose API endpoint where sources can send streaming data such as clickstreams, application and infrastructure logs and monitoring metrics, and IoT data such as devices telemetry and sensor readings. Kinesis Data Firehose does the following:

  • Buffers incoming streams
  • Batches, compresses, transforms, and encrypts the streams
  • Stores the streams as S3 objects in the landing zone in the data lake

Kinesis Data Firehose natively integrates with the security and storage layers and can deliver data to Amazon S3, Amazon Redshift, and Amazon Elasticsearch Service (Amazon ES) for real-time analytics use cases. Kinesis Data Firehose is serverless, requires no administration, and has a cost model where you pay only for the volume of data you transmit and process through the service. Kinesis Data Firehose automatically scales to adjust to the volume and throughput of incoming data.

File sources

Many applications store structured and unstructured data in files that are hosted on Network Attached Storage (NAS) arrays. Organizations also receive data files from partners and third-party vendors. Analyzing data from these file sources can provide valuable business insights.

Internal file shares

AWS DataSync can ingest hundreds of terabytes and millions of files from NFS and SMB enabled NAS devices into the data lake landing zone. DataSync automatically handles scripting of copy jobs, scheduling and monitoring transfers, validating data integrity, and optimizing network utilization. DataSync can perform one-time file transfers and monitor and sync changed files into the data lake. DataSync is fully managed and can be set up in minutes.

Partner data files

FTP is most common method for exchanging data files with partners. The AWS Transfer Family is a serverless, highly available, and scalable service that supports secure FTP endpoints and natively integrates with Amazon S3. Partners and vendors transmit files using SFTP protocol, and the AWS Transfer Family stores them as S3 objects in the landing zone in the data lake. The AWS Transfer Family supports encryption using AWS KMS and common authentication methods including AWS Identity and Access Management (IAM) and Active Directory.

Data APIs

Organizations today use SaaS and partner applications such as Salesforce, Marketo, and Google Analytics to support their business operations. Analyzing SaaS and partner data in combination with internal operational application data is critical to gaining 360-degree business insights. Partner and SaaS applications often provide API endpoints to share data.

SaaS APIs

The ingestion layer uses AWS AppFlow to easily ingest SaaS applications data into the data lake. With a few clicks, you can set up serverless data ingestion flows in AppFlow. Your flows can connect to SaaS applications (such as SalesForce, Marketo, and Google Analytics), ingest data, and store it in the data lake. You can schedule AppFlow data ingestion flows or trigger them by events in the SaaS application. Ingested data can be validated, filtered, mapped and masked before storing in the data lake. AppFlow natively integrates with authentication, authorization, and encryption services in the security and governance layer.

Partner APIs

To ingest data from partner and third-party APIs, organizations build or purchase custom applications that connect to APIs, fetch data, and create S3 objects in the landing zone by using AWS SDKs. These applications and their dependencies can be packaged into Docker containers and hosted on AWS Fargate. Fargate is a serverless compute engine for hosting Docker containers without having to provision, manage, and scale servers. Fargate natively integrates with AWS security and monitoring services to provide encryption, authorization, network isolation, logging, and monitoring to the application containers.

AWS Glue Python shell jobs also provide serverless alternative to build and schedule data ingestion jobs that can interact with partner APIs by using native, open-source, or partner-provided Python libraries. AWS Glue provides out-of-the-box capabilities to schedule singular Python shell jobs or include them as part of a more complex data ingestion workflow built on AWS Glue workflows.

Third-party data sources

Your organization can gain a business edge by combining your internal data with third-party datasets such as historical demographics, weather data, and consumer behavior data. AWS Data Exchange provides a serverless way to find, subscribe to, and ingest third-party data directly into S3 buckets in the data lake landing zone. You can ingest a full third-party dataset and then automate detecting and ingesting revisions to that dataset. AWS Data Exchange is serverless and lets you find and ingest third-party datasets with a few clicks.

Storage layer

Amazon S3 provides the foundation for the storage layer in our architecture. Amazon S3 provides virtually unlimited scalability at low cost for our serverless data lake. Data is stored as S3 objects organized into landing, raw, and curated zone buckets and prefixes. Amazon S3 encrypts data using keys managed in AWS KMS. IAM policies control granular zone-level and dataset-level access to various users and roles. Amazon S3 provides 99.99 % of availability and 99.999999999 % of durability, and charges only for the data it stores. To significantly reduce costs, Amazon S3 provides colder tier storage options called Amazon S3 Glacier and S3 Glacier Deep Archive. To automate cost optimizations, Amazon S3 provides configurable lifecycle policies and intelligent tiering options to automate moving older data to colder tiers. AWS services in our ingestion, cataloging, processing, and consumption layers can natively read and write S3 objects. Additionally, hundreds of third-party vendor and open-source products and services provide the ability to read and write S3 objects.

Data of any structure (including unstructured data) and any format can be stored as S3 objects without needing to predefine any schema. This enables services in the ingestion layer to quickly land a variety of source data into the data lake in its original source format. After the data is ingested into the data lake, components in the processing layer can define schema on top of S3 datasets and register them in the cataloging layer. Services in the processing and consumption layers can then use schema-on-read to apply the required structure to data read from S3 objects. Datasets stored in Amazon S3 are often partitioned to enable efficient filtering by services in the processing and consumption layers.

Cataloging and search layer

A data lake typically hosts a large number of datasets, and many of these datasets have evolving schema and new data partitions. A central Data Catalog that manages metadata for all the datasets in the data lake is crucial to enabling self-service discovery of data in the data lake. Additionally, separating metadata from data into a central schema enables schema-on-read for the processing and consumption layer components.

In our architecture, Lake Formation provides the central catalog to store and manage metadata for all datasets hosted in the data lake. Organizations manage both technical metadata (such as versioned table schemas, partitioning information, physical data location, and update timestamps) and business attributes (such as data owner, data steward, column business definition, and column information sensitivity) of all their datasets in Lake Formation. Services such as AWS Glue, Amazon EMR, and Amazon Athena natively integrate with Lake Formation and automate discovering and registering dataset metadata into the Lake Formation catalog. Additionally, Lake Formation provides APIs to enable metadata registration and management using custom scripts and third-party products. AWS Glue crawlers in the processing layer can track evolving schemas and newly added partitions of datasets in the data lake, and add new versions of corresponding metadata in the Lake Formation catalog.

Lake Formation provides the data lake administrator a central place to set up granular table- and column-level permissions for databases and tables hosted in the data lake. After Lake Formation permissions are set up, users and groups can access only authorized tables and columns using multiple processing and consumption layer services such as Athena, Amazon EMR, AWS Glue, and Amazon Redshift Spectrum.

Processing layer

The processing layer in our architecture is composed of two types of components:

  • Components used to create multi-step data processing pipelines
  • Components to orchestrate data processing pipelines on schedule or in response to event triggers (such as ingestion of new data into the landing zone)

AWS Glue and AWS Step Functions provide serverless components to build, orchestrate, and run pipelines that can easily scale to process large data volumes. Multi-step workflows built using AWS Glue and Step Functions can catalog, validate, clean, transform, and enrich individual datasets and advance them from landing to raw and raw to curated zones in the storage layer.

AWS Glue is a serverless, pay-per-use ETL service for building and running Python or Spark jobs (written in Scala or Python) without requiring you to deploy or manage clusters. AWS Glue automatically generates the code to accelerate your data transformations and loading processes. AWS Glue ETL builds on top of Apache Spark and provides commonly used out-of-the-box data source connectors, data structures, and ETL transformations to validate, clean, transform, and flatten data stored in many open-source formats such as CSV, JSON, Parquet, and Avro. AWS Glue ETL also provides capabilities to incrementally process partitioned data.

Additionally, you can use AWS Glue to define and run crawlers that can crawl folders in the data lake, discover datasets and their partitions, infer schema, and define tables in the Lake Formation catalog. AWS Glue provides more than a dozen built-in classifiers that can parse a variety of data structures stored in open-source formats. AWS Glue also provides triggers and workflow capabilities that you can use to build multi-step end-to-end data processing pipelines that include job dependencies and running parallel steps. You can schedule AWS Glue jobs and workflows or run them on demand. AWS Glue natively integrates with AWS services in storage, catalog, and security layers.

Step Functions is a serverless engine that you can use to build and orchestrate scheduled or event-driven data processing workflows. You use Step Functions to build complex data processing pipelines that involve orchestrating steps implemented by using multiple AWS services such as AWS Glue, AWS Lambda, Amazon Elastic Container Service (Amazon ECS) containers, and more. Step Functions provides visual representations of complex workflows and their running state to make them easy to understand. It manages state, checkpoints, and restarts of the workflow for you to make sure that the steps in your data pipeline run in order and as expected. Built-in try/catch, retry, and rollback capabilities deal with errors and exceptions automatically.

Consumption layer

The consumption layer in our architecture is composed using fully managed, purpose-built, analytics services that enable interactive SQL, BI dashboarding, batch processing, and ML.

Interactive SQL

Athena is an interactive query service that enables you to run complex ANSI SQL against terabytes of data stored in Amazon S3 without needing to first load it into a database. Athena queries can analyze structured, semi-structured, and columnar data stored in open-source formats such as CSV, JSON, XML Avro, Parquet, and ORC. Athena uses table definitions from Lake Formation to apply schema-on-read to data read from Amazon S3.

Athena is serverless, so there is no infrastructure to set up or manage, and you pay only for the amount of data scanned by the queries you run. Athena provides faster results and lower costs by reducing the amount of data it scans by using dataset partitioning information stored in the Lake Formation catalog. You can run queries directly on the Athena console of submit them using Athena JDBC or ODBC endpoints.

Athena natively integrates with AWS services in the security and monitoring layer to support authentication, authorization, encryption, logging, and monitoring. It supports table- and column-level access controls defined in the Lake Formation catalog.

Data warehousing and batch analytics

Amazon Redshift is a fully managed data warehouse service that can host and process petabytes of data and run thousands highly performant queries in parallel. Amazon Redshift uses a cluster of compute nodes to run very low-latency queries to power interactive dashboards and high-throughput batch analytics to drive business decisions. You can run Amazon Redshift queries directly on the Amazon Redshift console or submit them using the JDBC/ODBC endpoints provided by Amazon Redshift.

Amazon Redshift provides the capability, called Amazon Redshift Spectrum, to perform in-place queries on structured and semi-structured datasets in Amazon S3 without needing to load it into the cluster. Amazon Redshift Spectrum can spin up thousands of query-specific temporary nodes to scan exabytes of data to deliver fast results. Organizations typically load most frequently accessed dimension and fact data into an Amazon Redshift cluster and keep up to exabytes of structured, semi-structured, and unstructured historical data in Amazon S3. Amazon Redshift Spectrum enables running complex queries that combine data in a cluster with data on Amazon S3 in the same query.

Amazon Redshift provides native integration with Amazon S3 in the storage layer, Lake Formation catalog, and AWS services in the security and monitoring layer.

Business intelligence

Amazon QuickSight provides a serverless BI capability to easily create and publish rich, interactive dashboards. QuickSight enriches dashboards and visuals with out-of-the-box, automatically generated ML insights such as forecasting, anomaly detection, and narrative highlights. QuickSight natively integrates with Amazon SageMaker to enable additional custom ML model-based insights to your BI dashboards. You can access QuickSight dashboards from any device using a QuickSight app, or you can embed the dashboard into web applications, portals, and websites.

QuickSight allows you to directly connect to and import data from a wide variety of cloud and on-premises data sources. These include SaaS applications such as Salesforce, Square, ServiceNow, Twitter, GitHub, and JIRA; third-party databases such as Teradata, MySQL, Postgres, and SQL Server; native AWS services such as Amazon Redshift, Athena, Amazon S3, Amazon Relational Database Service (Amazon RDS), and Amazon Aurora; and private VPC subnets. You can also upload a variety of file types including XLS, CSV, JSON, and Presto.

To achieve blazing fast performance for dashboards, QuickSight provides an in-memory caching and calculation engine called SPICE. SPICE automatically replicates data for high availability and enables thousands of users to simultaneously perform fast, interactive analysis while shielding your underlying data infrastructure. QuickSight automatically scales to tens of thousands of users and provides a cost-effective, pay-per-session pricing model.

QuickSight allows you to securely manage your users and content via a comprehensive set of security features, including role-based access control, active directory integration, AWS CloudTrail auditing, single sign-on (IAM or third-party), private VPC subnets, and data backup.

Predictive analytics and ML

Amazon SageMaker is a fully managed service that provides components to build, train, and deploy ML models using an interactive development environment (IDE) called Amazon SageMaker Studio. In Amazon SageMaker Studio, you can upload data, create new notebooks, train and tune models, move back and forth between steps to adjust experiments, compare results, and deploy models to production, all in one place by using a unified visual interface. Amazon SageMaker also provides managed Jupyter notebooks that you can spin up with just a few clicks. Amazon SageMaker notebooks provide elastic compute resources, git integration, easy sharing, pre-configured ML algorithms, dozens of out-of-the-box ML examples, and AWS Marketplace integration, which enables easy deployment of hundreds of pre-trained algorithms. Amazon SageMaker notebooks are preconfigured with all major deep learning frameworks, including TensorFlow, PyTorch, Apache MXNet, Chainer, Keras, Gluon, Horovod, Scikit-learn, and Deep Graph Library.

ML models are trained on Amazon SageMaker managed compute instances, including highly cost-effective Amazon Elastic Compute Cloud (Amazon EC2) Spot Instances. You can organize multiple training jobs by using Amazon SageMaker Experiments. You can build training jobs using Amazon SageMaker built-in algorithms, your custom algorithms, or hundreds of algorithms you can deploy from AWS Marketplace. Amazon SageMaker Debugger provides full visibility into model training jobs. Amazon SageMaker also provides automatic hyperparameter tuning for ML training jobs.

You can deploy Amazon SageMaker trained models into production with a few clicks and easily scale them across a fleet of fully managed EC2 instances. You can choose from multiple EC2 instance types and attach cost-effective GPU-powered inference acceleration. After the models are deployed, Amazon SageMaker can monitor key model metrics for inference accuracy and detect any concept drift.

Amazon SageMaker provides native integrations with AWS services in the storage and security layers.

Security and governance layer

Components across all layers of our architecture protect data, identities, and processing resources by natively using the following capabilities provided by the security and governance layer.

Authentication and authorization

IAM provides user-, group-, and role-level identity to users and the ability to configure fine-grained access control for resources managed by AWS services in all layers of our architecture. IAM supports multi-factor authentication and single sign-on through integrations with corporate directories and open identity providers such as Google, Facebook, and Amazon.

Lake Formation provides a simple and centralized authorization model for tables hosted in the data lake. After implemented in Lake Formation, authorization policies for databases and tables are enforced by other AWS services such as Athena, Amazon EMR, QuickSight, and Amazon Redshift Spectrum. In Lake Formation, you can grant or revoke database-, table-, or column-level access for IAM users, groups, or roles defined in the same account hosting the Lake Formation catalog or another AWS account. The simple grant/revoke-based authorization model of Lake Formation considerably simplifies the previous IAM-based authorization model that relied on separately securing S3 data objects and metadata objects in the AWS Glue Data Catalog.

Encryption

AWS KMS provides the capability to create and manage symmetric and asymmetric customer-managed encryption keys. AWS services in all layers of our architecture natively integrate with AWS KMS to encrypt data in the data lake. It supports both creating new keys and importing existing customer keys. Access to the encryption keys is controlled using IAM and is monitored through detailed audit trails in CloudTrail.

Network protection

Our architecture uses Amazon Virtual Private Cloud (Amazon VPC) to provision a logically isolated section of the AWS Cloud (called VPC) that is isolated from the internet and other AWS customers. AWS VPC provides the ability to choose your own IP address range, create subnets, and configure route tables and network gateways. AWS services from other layers in our architecture launch resources in this private VPC to protect all traffic to and from these resources.

Monitoring and logging

AWS services in all layers of our architecture store detailed logs and monitoring metrics in AWS CloudWatch. CloudWatch provides the ability to analyze logs, visualize monitored metrics, define monitoring thresholds, and send alerts when thresholds are crossed.

All AWS services in our architecture also store extensive audit trails of user and service actions in CloudTrail. CloudTrail provides event history of your AWS account activity, including actions taken through the AWS Management Console, AWS SDKs, command line tools, and other AWS services. This event history simplifies security analysis, resource change tracking, and troubleshooting. In addition, you can use CloudTrail to detect unusual activity in your AWS accounts. These capabilities help simplify operational analysis and troubleshooting.

Additional considerations

In this post, we talked about ingesting data from diverse sources and storing it as S3 objects in the data lake and then using AWS Glue to process ingested datasets until they’re in a consumable state. This architecture enables use cases needing source-to-consumption latency of a few minutes to hours. In a future post, we will evolve our serverless analytics architecture to add a speed layer to enable use cases that require source-to-consumption latency in seconds, all while aligning with the layered logical architecture we introduced.

Conclusion

With AWS serverless and managed services, you can build a modern, low-cost data lake centric analytics architecture in days. A decoupled, component-driven architecture allows you to start small and quickly add new purpose-built components to one of six architecture layers to address new requirements and data sources.

We invite you to read the following posts that contain detailed walkthroughs and sample code for building the components of the serverless data lake centric analytics architecture:


About the Authors

Praful Kava is a Sr. Specialist Solutions Architect at AWS. He guides customers to design and engineer Cloud scale Analytics pipelines on AWS. Outside work, he enjoys travelling with his family and exploring new hiking trails.

 

 

 

Changbin Gong is a Senior Solutions Architect at Amazon Web Services (AWS). He engages with customers to create innovative solutions that address customer business problems and accelerate the adoption of AWS services. In his spare time, Changbin enjoys reading, running, and traveling.

Big data processing in a data warehouse environment using AWS Glue 2.0 and PySpark

Post Syndicated from Kaushik Krishnamurthi original https://aws.amazon.com/blogs/big-data/big-data-processing-in-a-data-warehouse-environment-using-aws-glue-2-0-and-pyspark/

The AWS Marketing Data Science and Engineering team enables AWS Marketing to measure the effectiveness and impact of various marketing initiatives and campaigns. This is done through a data platform and infrastructure strategy that consists of maintaining data warehouse, data lake, and data transformation (ETL) pipelines, and designing software tools and services to run related operations. While providing various business intelligence (BI) and machine learning (ML) solutions for marketers, there is particular focus on the timely delivery of error-free, reliable, self-served, reusable, and scalable ways to measure and report business metrics. In this post, we discuss one such example of improving operational efficiency and how we optimized our ETL process using AWS Glue 2.0 and PySpark SQL to achieve huge parallelism and reduce the runtime significantly—under 45 minutes—to deliver data to business much sooner.

Solution overview

Our team maintained an ETL pipeline to process the entire history of a dataset. We did this by running a SQL query repeatedly in Amazon Redshift, incrementally processing 2 months at a time to account for several years of historical data, with several hundreds of billions of rows in total. The input to this query is detailed service billing metrics across various AWS products, and the output is aggregated and summarized usage data. We wanted to move this heavy ETL process outside of our data warehouse environment, so that business users and our other relatively smaller ETL processes can use the Amazon Redshift resources fully for complex analytical queries.

Over the years, raw data feeds were captured in Amazon Redshift into separate tables, with 2 months of data in each. We first UNLOAD these to Amazon Simple Storage Service (Amazon S3) as Parquet formatted files and create AWS Glue tables on top of them by running CREATE TABLE DDLs in Amazon Athena as a one-time exercise. The source data is now available to be used as a DataFrame or DynamicFrame in an AWS Glue script.

Our query is dependent on a few more dimension tables that we UNLOAD again but in an automated fashion daily because we need the most recent version of these tables.

Next, we convert Amazon Redshift SQL queries to equivalent PySpark SQL. The data generated from the query output is written back to Amazon Redshift using AWS Glue DynamicFrame and DataSink. For more information, see Moving Data to and from Amazon Redshift.

We perform development and testing using Amazon SageMaker notebooks attached to an AWS Glue development endpoint.

After completing the tests, the script is deployed as a Spark application on the serverless Spark platform of AWS Glue. We do this by creating a job in AWS Glue and attaching our ETL script. We use the recently announced version AWS Glue 2.0.

The job can now be triggered via the AWS Command Line Interface (AWS CLI) using any workflow management or job scheduling tool. We use an internal distributed job scheduling tool to run the AWS Glue job periodically.

Design choices

We made a few design choices based on a few different factors. Firstly, we used the same Amazon Redshift SQL queries with minimal changes by relying on Spark SQL, due to Spark SQL’s language syntax being very similar to traditional ANSI-SQL.

We also used several techniques to optimize our Spark script for better memory management and speed. For example, we used broadcast joins for smaller tables involved in joins. See the following code:

-- Join Hints for broadcast join
SELECT /*+ BROADCAST(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
-- https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-hints.html#join-hints

AWS Glue DynamicFrame allowed us to create an AWS Glue DataSink pointed to our Amazon Redshift destination and write the output of our Spark SQL directly to Amazon Redshift without having to export to Amazon S3 first, which requires an additional ETL to copy from Amazon S3 to Amazon Redshift. See the following code:

# Convert Spark DataFrame to Glue DynamicFrame:
myDyF = DynamicFrame.fromDF(myDF, glueContext, "dynamic_df")

# Connecting to destination Redshift database:
connection_options = {
    "dbtable": "example.redshift_destination",
    "database": "aws_marketing_redshift_db",
    "preactions": "delete from example.redshift_destination where date between '"+start_dt+"' AND '"+end_dt+"';",
    "postactions": "insert into example.job_status select 'example' as schema_name, 'redshift_destination' as table_name, to_date("+run_dt[:8]+",'YYYYMMDD') as run_date;",
}

# Glue DataSink:
datasink = glueContext.write_dynamic_frame.from_jdbc_conf(
    frame=myDyF,
    catalog_connection="aws_marketing_redshift_db_read_write",
    connection_options=connection_options,
    redshift_tmp_dir=args["TempDir"],
    transformation_ctx="datasink"
)

We also considered horizontal scaling vs. vertical scaling. Based on the results observed during our tests for performance tuning, we chose to go with 75 as the number of workers and G.2X as the worker type. This translates to 150 data processing units (DPU) in AWS Glue. With G.2X, each worker maps to 2 DPU (8 vCPU, 32 GB of memory, 128 GB of disk) and provides one executor per worker. The performance was nearly twice as fast when compared to G.1X for our dataset’s partitioning scheme, SQL aggregate functions, filters, and more put together. Each G.2X worker maps to 2 DPUs and runs twice the number of concurrent tasks compared to G.1X. This worker type is recommended for memory-intensive jobs and jobs that run intensive transforms. For more information, see the section Understanding AWS Glue Worker types in Best practices to scale Apache Spark jobs and partition data with AWS Glue.

We tested various choices of worker types between Standard, G.1X, and G.2X while also tweaking the number of workers. The job run time reduced proportionally as we added more G.2X instances.

Before AWS Glue 2.0, earlier versions involved AWS Glue jobs spending several minutes for the cluster to become available. We observed an approximate average startup time of 8–10 minutes for our AWS Glue job with 75 or more workers. With AWS Glue 2.0, you can see much faster startup times. We noticed startup times of less than 1 minute on average in almost all our AWS Glue 2.0 jobs, and the ETL workload began within 1 minute from when the job run request was made. For more information, see Running Spark ETL Jobs with Reduced Startup Times.

Although cost is a factor to consider while running a large ETL, you’re billed only for the duration of the AWS Glue job. For Spark jobs with AWS Glue 2.0, you’re billed in 1-second increments (with a 1-minute minimum). For more information, see AWS Glue Pricing.

Additional design considerations

During implementation, we also considered additional optimizations and alternatives in case we ran into issues. For example, if you want to allocate more resources to the write operations into Amazon Redshift, you can modify the workload management (WLM) configuration in Amazon Redshift accordingly so sufficient compute power from Amazon Redshift is available for the AWS Glue jobs to write data into Amazon Redshift.

To complement our ETL process, we can also perform an elastic resize of the Amazon Redshift cluster to a larger size, making it more powerful in a matter of minutes and allowing more parallelism, which helps improve the speed of our ETL load operations.

To submit an elastic resize of an Amazon Redshift cluster using Bash, see the following code:

cmd=$(aws redshift --region 'us-east-1' resize-cluster --cluster-identifier ${REDSHIFT_CLUSTER_NAME} --number-of-nodes ${NUMBER_OF_NODES} --no-classic)

To monitor the elastic resize of an Amazon Redshift cluster using Bash, see the following code:

cluster_status=$(aws redshift --region 'us-east-1' describe-clusters --cluster-identifier ${REDSHIFT_CLUSTER_NAME} | jq -r ".Clusters[0].ClusterStatus")
cluster_availability_status=$(aws redshift --region 'us-east-1' describe-clusters --cluster-identifier ${REDSHIFT_CLUSTER_NAME} | jq -r ".Clusters[0].ClusterAvailabilityStatus")

while [ "$cluster_status" != "available" ] || [ "$cluster_availability_status" != "Available" ]
do
	echo "$cluster_status" | ts
	echo "$cluster_availability_status" | ts
	echo "Waiting for Redshift resize cluster to complete..."
	sleep 60
	cluster_status=$(aws redshift --region 'us-east-1' describe-clusters --cluster-identifier ${REDSHIFT_CLUSTER_NAME} | jq -r ".Clusters[0].ClusterStatus")
	cluster_availability_status=$(aws redshift --region 'us-east-1' describe-clusters --cluster-identifier ${REDSHIFT_CLUSTER_NAME} | jq -r ".Clusters[0].ClusterAvailabilityStatus")
done

echo "$cluster_status" | ts
echo "$cluster_availability_status" | ts
echo "Done"

ETL overview

To submit AWS Glue jobs using Python, we use the following code:

jobs = []
# 'glue' is an authenticated boto3 client object
jobs.append((glue_job_name, glue.start_job_run(
    JobName=glue_job_name,
    Arguments=arguments
)))

For our use case, we have multiple jobs. Each job can have multiple job runs, and each job run can have multiple retries. To monitor jobs, we use the following pseudo code:

while overall batch is still in progress:
      loop over all job runs of all submitted jobs:
            if job run is still in progress:
                  print job run status
                  wait
            else if job run has completed:
                  print success
            else job run has failed:
                  wait for retry to begin
                  loop over up to 10 retries of this job run:
                        if retry is still in progress:
                              print retry status     
                              wait
                              break
                        else if retry has completed:
                              print success
                              break
                        else retry has failed:
                              wait for next retry to begin
                        if this is the 10th i.e. final retry that failed:
                              print failure
                              loop over all job runs of all submitted jobs:
                                    loop over all retries of job run:
                                          build all job runs list
                                    kill all job runs list
                              wait for kill job runs to complete
                              send failure signal back to caller
      update overall batch status

The Python code is as follows:

job_run_status_overall = 'STARTING'
while job_run_status_overall in ['STARTING', 'RUNNING', 'STOPPING']:
    print("")
    job_run_status_temp = 'SUCCEEDED'
    for job, response in jobs:
        glue_job_name = job
        job_run_id = response['JobRunId']
        job_run_response = glue.get_job_run(JobName=glue_job_name, RunId=job_run_id)
        job_run_status = job_run_response['JobRun']['JobRunState']
        if job_run_status in ['STARTING', 'RUNNING', 'STOPPING']:
            job_run_status_temp = job_run_status
            logger.info("Glue job ({}) with run id {} has status : {}".format(glue_job_name, job_run_id, job_run_status))
            time.sleep(120)
        elif job_run_status == 'SUCCEEDED':
            logger.info("Glue job ({}) with run id {} has status : {}".format(glue_job_name, job_run_id, job_run_status))
        else:
            time.sleep(30)
            for i in range(1, 11):
                try:
                    job_run_id_temp = job_run_id+'_attempt_'+str(i)
                    # print("Checking for " + job_run_id_temp)
                    job_run_response = glue.get_job_run(JobName=glue_job_name, RunId=job_run_id_temp)
                    # print("Found " + job_run_id_temp)
                    job_run_id = job_run_id_temp
                    job_run_status = job_run_response['JobRun']['JobRunState']
                    if job_run_status in ['STARTING', 'RUNNING', 'STOPPING']:
                        job_run_status_temp = job_run_status
                        logger.info("Glue job ({}) with run id {} has status : {}".format(glue_job_name, job_run_id, job_run_status))
                        time.sleep(30)
                        break
                    elif job_run_status == 'SUCCEEDED':
                        logger.info("Glue job ({}) with run id {} has status : {}".format(glue_job_name, job_run_id, job_run_status))
                        break
                    else:
                        logger.info("Glue job ({}) with run id {} has status : {}".format(glue_job_name, job_run_id, job_run_status))
                        time.sleep(30)
                except Exception as e:
                    pass
                if i == 10:
                    logger.info("All attempts failed: Glue job ({}) with run id {} and status: {}".format(glue_job_name, job_run_id, job_run_status))
                    logger.info("Cleaning up: Stopping all jobs and job runs...")
                    for job_to_stop, response_to_stop in jobs:
                        glue_job_name_to_stop = job_to_stop
                        job_run_id_to_stop = response_to_stop['JobRunId']
                        job_run_id_to_stop_temp = []
                        for j in range(0, 11):
                            job_run_id_to_stop_temp.append(job_run_id_to_stop if j == 0 else job_run_id_to_stop+'_attempt_'+str(j))
                        job_to_stop_response = glue.batch_stop_job_run(JobName=glue_job_name_to_stop, JobRunIds=job_run_id_to_stop_temp)
                    time.sleep(30)
                    raise ValueError("Glue job ({}) with run id {} and status: {}".format(glue_job_name, job_run_id, job_run_status))
    job_run_status_overall = job_run_status_temp

Our set of source data feeds consists of multiple historical AWS Glue tables, with 2 months’ data in each, spanning across the past few years and a year into the future:

  • Tables for year 2016: table_20160101, table_20160301, table_20160501, …, table_20161101. (6 tables)
  • Tables for year 2017: table_20170101, table_20170301, table_20170501, …, table_20171101. (6 tables)
  • Tables for year 2018: table_20180101, table_20180301, table_20180501, …, table_20181101. (6 tables)
  • Tables for year 2019: table_20190101, table_20190301, table_20190501, …, table_20191101. (6 tables)
  • Tables for year 2020: table_20200101, table_20200301, table_20200501, …, table_20201101. (6 tables)
  • Tables for year 2021: table_20210101, table_20210301, table_20210501, …, table_20211101. (6 tables)

The tables add up to 36 tables (therefore, 36 SQL queries) with about 800 billion rows to process (excluding the later months of year 2020 and year 2021, the tables for which are placeholders and empty at the time of writing).

Due to the high volume of data, we want to trigger our AWS Glue job multiple times: one job run request for each table, all at once to achieve parallelism (as opposed to sequential, stacked, or staggered-over-time job runs), resulting in 36 total job runs needed to process 6 years of data. In AWS Glue, we created 12 identical jobs with three maximum concurrent runs of each, thereby allowing the 12 * 3 = 36 job run requests that we needed. However, we encountered a few bottlenecks and limitations, which we addressed by the workarounds we discuss in the following section.

Limitations and workarounds

We needed to increase the limit on how many IP addresses we can have within one VPC. To do this, we made sure the VPC’s CIDR was configured to allow as many IP addresses as needed to launch the over 2,000 workers expected when running all the AWS Glue jobs. The following table shows an example configuration.

IPv4 CIDR Available IPv4
10.0.0.0/20 4091

For better availability and parallelism, we spread our jobs across multiple AWS Glue connections by doing the following:

  • Splitting our VPC into multiple subnets, with each subnet in a different Availability Zone
  • Creating one AWS Glue connection for each subnet (one each of us-east-1a, 1c, and 1d) so our request for over 2,000 worker nodes wasn’t made within one Availability Zone

This VPC splitting approach makes sure the job requests are evenly distributed across the three Availability Zones we chose. The following table shows an example configuration.

Subnet VPC IPv4 CIDR Available IPv4 Availability Zone
my-subnet-glue-etl-us-east-1c my-vpc 10.0.0.0/20 4091 us-east-1c
my-subnet-glue-etl-us-east-1a my-vpc 10.0.16.0/20 4091 us-east-1a
my-subnet-glue-etl-us-east-1d my-vpc 10.0.32.0/20 4091 us-east-1d

The following diagram illustrates our architecture.

Summary

In this post, we shared our experience exploring the features and capabilities of AWS Glue 2.0 for our data processing needs. We consumed over 4,000 DPUs across all our AWS Glue jobs because we used over 2,000 workers of G.2X type. We spread our jobs across multiple connections mapped to different Availability Zones of our Region: us-east-1a, 1c, and 1d, for better availability and parallelism.

Using AWS Glue 2.0, we could run all our PySpark SQLs in parallel and independently without resource contention between each other. With earlier AWS Glue versions, launching each job took an extra 8–10 minutes for the cluster to boot up, but with the reduced startup time in AWS Glue 2.0, each job is ready to start processing data in less than 1 minute. And each AWS Glue job runs a Spark version of our original SQL query and directly writes output back to our Amazon Redshift destination configured via AWS Glue DynamicFrame and DataSink.

Each job takes approximately 30 minutes. And when jobs are submitted together, due to high parallelism, all our jobs still finish within 40 minutes. Although the job durations of AWS Glue 2.0 are similar to 1.0, saving an additional 8–10 minutes of startup time previously observed for a large sized cluster is a huge benefit. The duration of our long-running ETL process was reduced from several hours to under an hour, resulting in significant improvement in runtime.

Based on our experience, we plan to migrate to AWS Glue 2.0 for a large number of our current and future data platform ETL needs.


About the Author

Kaushik Krishnamurthi is a Senior Data Engineer at Amazon Web Services (AWS), where he focuses on building scalable platforms for business analytics and machine learning. Prior to AWS, he worked in several business intelligence and data engineering roles for many years.

 

 

 

 

Automating DBA tasks on Amazon Redshift securely using AWS IAM, AWS Lambda, Amazon EventBridge, and stored procedures

Post Syndicated from Gagan Brahmi original https://aws.amazon.com/blogs/big-data/automating-dba-tasks-on-amazon-redshift-securely-using-aws-iam-aws-lambda-amazon-eventbridge-and-stored-procedures/

As a data warehouse administrator or data engineer, you may need to perform maintenance tasks and activities or perform some level of custom monitoring on a regular basis. You can combine these activities inside a stored procedure or invoke views to get details. Some of these activities include things like loading nightly staging tables, invoking views or stopping idle connections, dropping unused tables, and so on.

In this post, we discuss how you can automate these routine activities for an Amazon Redshift cluster running inside a secure private network. For this solution, we use the following AWS services:

  • AWS Lambda – To run a specified query and invoke views or stored procedures inside your Amazon Redshift cluster.
  • Amazon EventBridge – To schedule running these SQL statements by triggering a Lambda function. The EventBridge rule supplies the Amazon Redshift cluster details as the input parameters. This gives you the flexibility to provide multiple queries or multiple cluster details.
  • AWS Identity and Access Management (IAM) – To provide access to the Amazon Redshift cluster using temporarily generated credentials in a secure way. This avoids the need to store access credentials.
  • Amazon API Gateway – To securely connect to the Amazon Redshift API service from a private subnet that has no access to the internet.

Solution architecture

The following architecture diagram provides an overview to the solution.

This architecture has the following workflow:

  1. We create an EventBridge rule with a schedule using the default event bus to invoke a target. The target for this rule is a Lambda function that connects to an Amazon Redshift cluster and runs a SQL statement. The target is configured to provide input parameters as constants. These parameters include an Amazon Redshift cluster identifier, database name, Amazon Redshift user, and the SQL statement to run.
  2. The rule is triggered at the scheduled time and sends the data to the RedshiftExecuteSQLFunction function responsible for running the specified query.
  3. The RedshiftExecuteSQLFunction function in Step 4 is connected to the user’s Amazon Virtual Private Cloud (VPC) inside a private subnet that doesn’t access to the internet. However, this function needs to communicate with the Amazon Redshift API service to generate temporary user credentials to securely access the Amazon Redshift cluster. With the private subnet not having access to the internet (no NAT Gateway), the solution uses an Amazon API Gateway with a VPC endpoint to securely communicate with the Amazon Redshift API service. The function passes the Amazon Redshift cluster information inside the VPC through the private subnet to the API Gateway VPC endpoint, which is backed by another function, RedshiftApiLambda, which is responsible for communicating with the Amazon Redshift API service to generate temporary credentials send them back to the RedshiftExecuteSQLFunction function securely via your VPC.
  4. The RedshiftExecuteSQLFunction function uses the Amazon Redshift cluster endpoint, port, and temporary credentials received in the previous step to communicate with the Amazon Redshift cluster running in a private subnet inside the user’s VPC. It runs the SQL statement submitted in Step 1.

The architecture is scalable to accommodate multiple rules for different DBA tasks and different Amazon Redshift clusters.

Prerequisites

To get started, you need to have an AWS account.

We have provided an AWS CloudFormation template to demonstrate the solution. You can download and use this template to easily deploy the required AWS resources. This template has been tested in the us-east-1 Region.

When you’re logged in to your AWS account, complete the following steps:

  1. You can deploy the resources by using the template to launch the stack on the AWS Management Console. Alternatively, you can launch the stack from the following link:
  2. Choose Next.
  3. On the Specify stack details page, enter the following parameters:
    1. For Lambda VPC Configuration, choose the VPC and subnets inside the VPC. The template allows you to select multiple subnets; however, it only uses the first two subnets that are selected. Make sure the selected VPC subnets have access to the target Amazon Redshift cluster.
    2. Choose if you want to create or use an existing VPC endpoint for the API Gateway. For an existing VPC endpoint for API Gateway, you need a DNS-enabled interface endpoint.
  4. Leave the remaining values at their defaults and choose Next.
  5. On the Configure stack options page, leave everything at its default and choose Next.
  6. On the Review page, select I acknowledge that AWS CloudFormation might create IAM resources.
  7. Choose Create stack.

The CloudFormation template can take approximately 5 minutes to deploy the resources.

  1. When the stack status shows as CREATE_COMPLETE, choose the Outputs tab and record the values for RedshiftExecuteSQLFunction and RedshiftExecuteSQLFunctionSecurityGroup.

You need these values later to create EventBridge rules and to allow access to Amazon Redshift cluster.

Amazon Redshift stored procedures and security definer

A stored procedure is a user-created object to perform a set of SQL queries and logical operations. Stored procedures are often used to encapsulate logic for data transformation, data validation, and business-specific logic. You can reduce round trips between your applications and the database by combining multiple SQL steps into a stored procedure.

Amazon Redshift supports stored procedures in the PL/pgSQL dialect and can include variable declaration, control logic, loops, allow the raising of errors. The SECURITY attribute controls who has privileges to access what database objects. By default, only superusers and the owner of the stored procedure have the permission to perform actions. You can create stored procedures to perform functions without giving a user access to the underlying tables with security definer controls. With the security definer concept, you can allow users to perform actions they otherwise don’t have permissions to run. For example, they can drop a table created by another user.

For more information about stored procedures, see Creating stored procedures in Amazon Redshift and Security and privileges for stored procedures.

For this post, we create two DBA tasks in the form of a stored procedure and views inside the Amazon Redshift cluster:

  • Drop unused tables
  • Clean up idle connections

We then schedule the running of these tasks using EventBridge and Lambda.

To make it easier to track the DBA tasks, such as which table is dropped and how many idle connections are cleaned up, we create a helper table and a stored procedure to track stored procedure run details. You can run the SQL statements against the cluster either using query editor or SQL client tools.

Then you can call this stored procedure in other DBA task stored procedures to log task details. For example, see the following code:

CALL dba.sp_log_dba_task(CURRENT_USER_ID, user, 'Idle connections', 'Kill idle connections', 'Succeed');

Dropping unused tables

A user might create tables for short-term usage but forget to delete them. Over time, lots of leftover tables can accumulate in the data warehouse, wasting storage space. In this use case, the DBA needs to clean them up regularly.

We can collect table usage data from system tables and identify tables that haven’t been accessed for a certain period. Then we can target large tables or all unused tables and drop them automatically.

Various users could have created those tables. To drop them, we need to run the stored procedure as a superuser. Create the following stored procedure as a superuser and with SECURITY DEFINER on the Amazon Redshift cluster you need to maintain. This allows the DBA team to run the stored procedure to drop a table without being the owner of the table.

CREATE OR REPLACE PROCEDURE dba.sp_drop_table_cascade(schema_name VARCHAR, table_name VARCHAR)
AS 
…
SECURITY DEFINER;

CREATE OR REPLACE PROCEDURE dba.sp_drop_unused_tables(schema_name VARCHAR, unused_days int)
AS
…
SECURITY DEFINER;

Then you can call this stored procedure to delete all unused tables. Adjust the unused_days input parameter based on your workload pattern. For example, to delete tables that haven’t been accessed in the past two weeks, enter the following code:

CALL dba.sp_drop_unused_tables('prod', 14);

Cleaning up idle connections

An idle connection can consume system resources, or even hold a table lock if there is a pending transaction, and impact other workloads. As a DBA, keeping an eye on the idle connections and cleaning them up can help your data warehouse be more performant and stable.

First, find all open connections and identify if they’re active or not based on how long the transactions last. For this post, we use a 60-second threshold. Then you can remove those idle connections. The full script is available to download.

The following code deletes connections that have been idle for more than 30 minutes:

CALL dba.sp_delete_idle_connections(1800);

After you test and verify those stored procedures, you may want to run them regularly to clean up your data warehouse automatically. Lambda and EventBridge allow you to run those routine tasks easily.

AWS Lambda

For this post, our Lambda function uses the Python runtime environment with the Amazon Redshift cluster details as input and to generate temporary credentials. Amazon Redshift allows users and applications to programmatically generate temporary database user credentials for an AWS Identity and Access Management (IAM) user or role. The IAM user or role for the function is provided the IAM permission of redshift:GetClusterCredentials to perform the operation of GetClusterCredentials with the Amazon Redshift API service. For more information, see Generating IAM database credentials using the Amazon Redshift CLI or API.

creds = redshiftBoto3Client.get_cluster_credentials(DbUser=redshiftClusterUser,
    DbName=redshiftDatabaseName,
    ClusterIdentifier=redshiftClusterIdentifier,
    DurationSeconds=redshiftSessionDuration)
    
return creds

This credential is used to make a connection with the Amazon Redshift cluster and run the SQL statement, or stored procedure:

conn = DB(dbname=redshiftDatabaseName,
      user=redshift_cluster_details['tempCredentials']['DbUser'],
      passwd=redshift_cluster_details['tempCredentials']['DbPassword'],
      host=redshiftClusterAddress,
      port=redshiftClusterPort)
            
conn.query(redshiftExecuteQuery)

Providing the RedshiftExecuteSQLFunction function access to the Amazon Redshift cluster

You need to grant the RedshiftExecuteSQLFunction function access to the Amazon Redshift cluster where the queries are to be run. On the CloudFormation Outputs tab for the stack you created earlier, you should have the value for RedshiftExecuteSQLFunctionSecurityGroup. We use this value to grant access inside the Amazon Redshift cluster’s security group.

For information about managing the Amazon Redshift security group on the EC2-Classic platform, see Amazon Redshift cluster security groups. For instructions on managing security groups on the EC2-VPC platform, see Managing VPC security groups for a cluster.

You can manage the security group via the Amazon VPC console or the Amazon Redshift console. For this post, we use the EC2-VPC platform for our Amazon Redshift cluster and use the Amazon Redshift console to update the security group.

  1. On the Amazon Redshift console, choose Clusters.
  2. Choose the Amazon Redshift cluster identifier that you need to grant access to.
  3. On the Properties tab, in the Network and security section, under VPC security group, find the security group for the cluster.
  4. Choose the security group starting with sg-.

This opens a new window to manage the security group.

  1. In the new window, choose the security group ID that begins with sg-.
  2. On the Inbound rules tab, choose Edit inbound rules to grant access to the Lambda function.
  3. Choose Add rule.
  4. For Type, choose Redshift.

This should populate the protocol and port range. If you’re using a custom port for the cluster, choose Custom TCP for the type and manually enter the port number relevant to your cluster.

  1. Optionally, add a description for the rule.
  2. Choose Save rules.

For more information about your VPC security group, see Security groups for your VPC.

Creating event rules with EventBridge

For this post, we schedule the DBA task to drop unused tables every 12 hours. We’re using the us-east-1 Region. We start by adding an EventBridge rule with an identifiable name.

  1. On the EventBridge console, choose Create rule.
  2. For Name, enter cluster-1-drop-table-12-hours.
  3. For Description, add an optional description.
  4. For Define pattern, select Schedule.
  5. For Fixed rate every, choose 12 hours.
  6. In the Select targets section, for Target, choose Lambda function.
  7. From the Function drop-down menu, choose the function that matches the RedshiftExecuteSQLFunction from the CloudFormation stack Outputs
  8. In the Configure input section, select Constant (JSON text).
  9. Add the following JSON data (replace the values for Cluster, Username, Database, and ExecutionQuery as appropriate for your cluster). You must provide the cluster identifier for Cluster, not the endpoint address. The code locates the endpoint address and port for the cluster.
    {
        "Cluster": "redshift-cluster-identifier", 
        "Username": "redshift_user", 
        "Database": "dev", 
        "ExecutionQuery": "CALL sp_drop_unused_tables('dbschema', 14)"
    }

  10. Choose Create.
  11. Follow the same steps to create a second EventBridge rule.

The following rule triggers the DBA task to stop idle connections every 3 hours. The input data used for this method includes the reference for the stored procedure for stopping the idle connection.

  1. Add the following JSON data in (replacing the values for Cluster, Username, Database, and ExecutionQuery as appropriate to your use case):
    {
        "Cluster": "redshift-cluster-identifier", 
        "Username": "redshift_user", 
        "Database": "dev", 
        "ExecutionQuery": "CALL dba.sp_delete_idle_connections(1800)"
    }

The preceding code should set up two different rules with the same target Lambda function. However, the two rules are running two different stored procedures on separate schedules. We can scale this solution to add multiple rules to run on different Amazon Redshift clusters on a different schedule or to run multiple SQL statements against the same Amazon Redshift cluster on a different schedule.

Cleaning up

Before you remove the CloudFormation stack, you should remove the EventBridge rule.

  1. On the EventBridge console, choose Rules.
  2. Select the first rule you added earlier and choose Delete.
  3. Choose Delete again to confirm.
  4. Repeat the same steps for the second rule.

Conclusion

In this post, we provided a solution to automate routine DBA tasks against Amazon Redshift clusters in a secure way. The solution is scaleable to support multiple tasks on corresponding schedules on multiple Amazon Redshift clusters. You can extend this solution to handle more routine tasks and simplify your workflow.


About the Authors

Gagan Brahmi is a Specialist Solutions Architect focused on Big Data & Analytics at Amazon Web Services. Gagan has over 15 years of experience in information technology. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS.

 

 

 

Juan Yu is a Data Warehouse Specialist Solutions Architect at Amazon Web Services, where she helps customers adopt cloud data warehouses and solve analytic challenges at scale. Prior to AWS, she had fun building and enhancing MPP query engines to improve customer experience on Big Data workloads.

 

Optimizing Amazon EMR for resilience and cost with capacity-optimized Spot Instances

Post Syndicated from Ran Sheinberg original https://aws.amazon.com/blogs/big-data/optimizing-amazon-emr-for-resilience-and-cost-with-capacity-optimized-spot-instances/

Amazon EMR now supports the capacity-optimized allocation strategy for Amazon Elastic Compute Cloud (Amazon EC2) Spot Instances for launching Spot Instances from the most available Spot Instance capacity pools by analyzing capacity metrics in real time. You can now specify up to 15 instance types in your EMR task instance fleet configuration. This provides Amazon EMR with more options in choosing the optimal pools to launch Spot Instances from in order to decrease chances of Spot interruptions, and increases the ability to relaunch capacity using other instance types in case Spot Instances are interrupted when Amazon EC2 needs the capacity back.

Background

Amazon EMR is the industry-leading cloud big data platform for processing vast amounts of data using open-source tools such as Apache Spark, Apache Hive, Apache HBase, Apache Flink, Apache Hudi, and Presto. With Amazon EMR, you can run petabyte-scale analysis at less than half of the cost of traditional on-premises solutions, and over three times as fast on Amazon EMR runtime for Apache Spark compared to running without the runtime. If you have existing on-premises deployments of open-source tools such as Apache Spark and Apache Hive, you can also run Amazon EMR clusters on AWS Outposts.

Spot Instances are spare Amazon EC2 compute capacity in the AWS Cloud available to you at savings of up to 90% compared to On-Demand Instance prices. The only difference between On-Demand Instances and Spot Instances is that Amazon EC2 can interrupt Spot Instances with 2 minutes of notification when Amazon EC2 needs the capacity back. Using Spot Instances in Amazon EMR is a common pattern that allows AWS customers to achieve significant cost savings.

The capacity-optimized allocation strategy in the Amazon EC2 fleet (also available for Amazon EC2 Auto Scaling and Spot Fleet) provisions Spot Instances from the most-available Spot Instance pools by analyzing capacity metrics. By offering the possibility of fewer interruptions, the capacity-optimized strategy can lower the overall cost of your workload. For more information about how AWS customers are benefiting from decreased Spot interruptions with the capacity-optimized allocation strategy, see Capacity-Optimized Spot Instance Allocation in Action at Mobileye and Skyscanner.

Amazon EMR uses the Amazon EC2 RunInstances API to provision compute capacity. We are enhancing the way Amazon EMR provisions EC2 instances to provide more flexibility and increased cluster resilience using EC2 Fleet (CreateFleet) in Instant mode, as a drop-in replacement for RunInstances.

Optimizing capacity for greater resilience

With this launch, you can configure Amazon EMR to use allocation strategies.

The capacity-optimized allocation strategy uses real-time capacity data to allocate instances from the Spot Instance pools with the optimal capacity for the number of instances that are launching. This allocation strategy is appropriate for workloads that have a higher cost of interruption. Examples include long-running jobs and multi-tenant persistent clusters running Apache Spark, Apache Hive, and Presto. This allocation strategy lets you specify up to 15 EC2 instance types on task instance fleets to diversify your Spot requests and get steep discounts. Previously, instance fleets allowed a maximum of five instance types. You can now diversify your Spot requests across these 15 pools within each Availability Zone and prioritize deploying into a deeper capacity pool to lower the chance of interruptions. With more instance type diversification, Amazon EMR has more capacity pools to allocate capacity from, and chooses the Spot Instances which are least likely to be interrupted.

For example, if you’re initially using EC2 memory-optimized r5.4xlarge instances (with 16 vCPUs and 128GB of RAM) for your EMR task nodes, you can configure the EMR task instance fleet with different instances types. First, explore different-sized instance types such as r5.2xlarge and r5.8xlarge. Second, add previous generation r4.4xlarge and other R4 instance sizes. After you’ve added different sizes within the same family, as well as previous generation instance types, you can add extra instance types with similar hardware characteristics and vCPU to memory ratio, such as the r5a instance family with AMD processors, r5d instance family with locally attached NVMe storage, and more.

The allocation strategy is also taken into account in case the cluster scales out after the initial provisioning phase—for example, if you manually resize the core or task fleets, or if you’re using managed scaling to automatically increase or decrease the number of instances or units in your cluster based on workload.

For more information about Spot Instance configuration if you’re using Amazon EMR to run Apache Spark workloads, see Best practices for running Apache Spark applications using Amazon EC2 Spot Instances with Amazon EMR. This blog emphasizes best practices that will help you build your Spark workloads with Amazon EMR to achieve deep cost savings.

Amazon EMR has made significant enhancements to improve elasticity and resilience, including graceful decommissioning of Spot Instances running Apache Spark and Apache Hadoop applications. Amazon EMR has customizations to open-source Apache Spark that make it more resilient to node loss—integrating with YARN’s decommissioning mechanism, extending Apache Spark’s blacklisting mechanism and actions on decommissioned nodes.

For example, when Spot Instances are interrupted in a running EMR cluster, stage failures don’t count towards the total number of failures that trigger a total job failure. For more information, see Spark enhancements for elasticity and resilience on Amazon EMR.

New configuration options and IAM policy requirements

To leverage the allocation strategies in your EMR clusters, you need to use a new AllocationStrategy parameter in your cluster configurations. Amazon EMR added support for an On-Demand allocation strategy: you can specify multiple On-Demand Instance types in your core or task instance fleets, and specify an allocation strategy of “lowest-price” to have Amazon EMR provision On-Demand Instances that have the lowest costs. This allows you to also be flexible with your selection of On-Demand instance types.

The following is an example snippet from an Amazon EMR JSON configuration file with the new capabilities:

{
    "Name": "Taskfleet",
    "InstanceFleetType": "TASK",
    "TargetSpotCapacity": 1,
    "TargetOnDemandCapacity": 1,
    "LaunchSpecifications": {
        "OnDemandSpecification": {
            "AllocationStrategy": "lowest-price"
        },
        "SpotSpecification": {
            "AllocationStrategy": "capacity-optimized",
            "TimeoutDurationMinutes": 120,
            "TimeoutAction": "TERMINATE_CLUSTER"
        }
    }
}

For more information about the API options, see InstanceFleetProvisioningSpecifications.

The following IAM policy shows the additional service role permissions required to create a cluster that uses the instance fleet allocation strategy option. If your clusters are using the default role EMR_DefaultRole (which has the default managed policy attached AmazonElasticMapReduceRole), the managed policy is already updated to include these new role permissions. If your clusters are using a different role or policy, make sure you add these new permissions to your policy. See the following IAM policy:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "ec2:DeleteLaunchTemplate",
                "ec2:CreateLaunchTemplate",
                "ec2:DescribeLaunchTemplates",
                "ec2:CreateFleet"
            ],
            "Resource": "*"
        }

Launching an EMR cluster with capacity-optimized Spot Instances and a diversified task fleet

In this section, we look at how to create an EMR cluster that includes allocation strategy configurations and a diversified task fleet. The reason for specifying more instance types is in order to allow Amazon EMR to launch instances from the most optimal capacity pools, and be able to replenish the cluster’s target capacity in case Spot Instances are interrupted. Moreover, by using the capacity-optimized allocation strategy, Spot Instances will be launched from the most available capacity pools, effectively decreasing the chances of Spot interruptions.

The following AWS Command Line Interface (AWS CLI) command launches an EMR cluster in the default AWS Region configured in your AWS CLI configuration, with the master and core nodes running on On-Demand Instances, and a task fleet running Spot Instances.

Amazon EMR uses a wide selection of instance types in the task instance fleet and uses "AllocationStrategy": "CAPACITY_OPTIMIZED" to launch instances from the most available Spot capacity pools and decrease the chances of workload interruptions. By providing a WeightedCapacity for each instance type that is equal to the number of vCPU (or YARN vCores), you can specify a TargetSpotCapacity that defines the number of vCPUs (YARN vCores) in your task fleet and be flexible around the instance sizes, effectively providing more capacity pools to choose from. You should specify a subnet ID per Availability Zone in the AWS Region. While each EMR cluster runs in a single Availability Zone, specifying multiple Availability Zones allows you to architect your workload with increased fault-tolerance.

See the following AWS CLI command for an example of launching an Amazon EMR cluster that adheres to the recommendations in this blog post (uses Allocation Strategies and a diversified set of instance types in the task fleet):

aws emr create-cluster \
--use-default-roles --release-label emr-5.30.1 \
--ec2-attributes SubnetIds=['subnet-1234567890abcdefg','subnet-1234567890abcdefg','subnet-1234567890abcdefg'] \
--name 'EMRCluster-TaskFleet' \
--instance-fleets \InstanceFleetType=MASTER,TargetOnDemandCapacity=1,InstanceTypeConfigs=['{WeightedCapacity=1,InstanceType=m5.xlarge}'] \InstanceFleetType=CORE,TargetOnDemandCapacity=4,LaunchSpecifications={OnDemandSpecification='{AllocationStrategy=LOWEST_PRICE}'},InstanceTypeConfigs=['{WeightedCapacity=4,InstanceType=r4.xlarge},{WeightedCapacity=4,InstanceType=r5.xlarge}'] \InstanceFleetType=TASK,TargetSpotCapacity=64,LaunchSpecifications={SpotSpecification='{TimeoutDurationMinutes=60,AllocationStrategy=CAPACITY_OPTIMIZED,TimeoutAction=TERMINATE_CLUSTER}'},InstanceTypeConfigs=['{WeightedCapacity=4,InstanceType=r5.xlarge},{WeightedCapacity=4,InstanceType=r4.xlarge},{WeightedCapacity=8,InstanceType=r5.2xlarge},{WeightedCapacity=8,InstanceType=r4.2xlarge},{WeightedCapacity=16,InstanceType=r5.4xlarge},{WeightedCapacity=16,InstanceType=r4.4xlarge},{WeightedCapacity=32,InstanceType=r5.8xlarge},{WeightedCapacity=32,InstanceType=r4.8xlarge},{WeightedCapacity=64,InstanceType=r5.16xlarge},{WeightedCapacity=64,InstanceType=r4.16xlarge},{WeightedCapacity=16,InstanceType=r5d.4xlarge},{WeightedCapacity=16,InstanceType=r5a.4xlarge}']

The following screenshot shows the result on the Amazon EMR console.

Conclusion

With this new functionality in Amazon EMR, you can increase the resilience of your organization’s data-processing workloads and optimize your costs by using Spot Instances. The capacity-optimized allocation strategy works to decrease the possibility of Spot interruptions in your cluster and allows you to specify up to 15 different instance types for your task fleet, enabling Amazon EMR to find the most available Spot capacity pools for your workload. With the Amazon EMR enhancements for increased resilience and the capacity-optimized allocation strategy for Spot Instances, you can achieve deep cost savings without compromising on availability.


About the authors

Ran Sheinberg is a principal solutions architect in the EC2 Spot team with Amazon Web Services. He works with AWS customers on cost optimizing their compute spend by utilizing Spot Instances across different types of workloads: stateless web applications, queue workers, containerized workloads, analytics, HPC and others.

 

 

 

 

 

 

Get started with Amazon Redshift cross-database queries (preview)

Post Syndicated from Neeraja Rentachintala original https://aws.amazon.com/blogs/big-data/get-started-with-amazon-redshift-cross-database-queries-preview/

Amazon Redshift is a fast, scalable, secure, and fully managed cloud data warehouse that makes it simple and cost-effective to analyze all your data using standard SQL and your existing ETL, business intelligence (BI), and reporting tools. Tens of thousands of customers use Amazon Redshift to process exabytes of data per day and power analytics workloads such as BI, predictive analytics, and real-time streaming analytics.

We’re excited to announce the public preview of the new cross-database queries capability to query across databases in an Amazon Redshift cluster. In this post, we provide an overview of the cross-database queries and a walkthrough of the key functionality that allows you to manage data and analytics at scale in your organization.

What are cross-database queries?

With cross-database queries, you can seamlessly query data from any database in your Amazon Redshift cluster, regardless of which database you’re connected to. Cross-database queries eliminate data copies and simplify your data organization to support multiple business groups on the same cluster. Support for cross-database queries is available on Amazon Redshift RA3 node types.

Data is organized across multiple databases in Amazon Redshift clusters to support multi-tenant configurations. However, you often need to query and join across these datasets by allowing read access. For example, different business groups and teams that own and manage their datasets in a specific database in the data warehouse need to collaborate with other groups. You might want to perform common ETL staging and processing while your raw data is spread across multiple databases. Organizing data in multiple Amazon Redshift databases is also a common scenario when migrating from traditional data warehouse systems.

With cross-database queries, you can now access data from any database on the Amazon Redshift cluster without having to connect to that specific database. You can also join datasets from multiple databases in a single query. You can access database objects such as tables, views with a simple three-part notation of <database>.<schema>.<object>, and analyze the objects using business intelligence (BI) or analytics tools. You can continue to set up granular access controls for users with standard Amazon Redshift SQL commands and ensure that users can only see the relevant subsets of the data they have permissions for.

Walkthrough overview

In this post, we walk through an end-to-end use case to illustrate cross-database queries, comprising the following steps:

  1. Set up permissions on the data.
  2. Access data and perform several cross-database queries.
  3. Connect from tools.

For this walkthrough, we use SQL Workbench, a SQL query tool, to perform queries on Amazon Redshift. For more information about connecting SQL Workbench to an Amazon Redshift cluster, see Connect to your cluster by using SQL Workbench/J .

Setting up permissions for cross-database queries

You can use standard Redshift SQL GRANT and REVOKE commands to configure appropriate permissions for users and groups. To configure permissions, we connect as an administrator to a database named TPCH_100G on an Amazon Redshift cluster that we set up with an industry standard dataset, TPC-H. You can set up this dataset in your environment using the code and scripts for this dataset on GitHub and the accompanying dataset hosted in a public Amazon Simple Storage Service (Amazon S3) bucket.

The following screenshot shows the configuration for your connection profile.

The TPCH_100G database consists of eight tables loaded in the schema PUBLIC, as shown in the following screenshot.

The following screenshot shows a test query on one of the TPC-H tables, customer.

The database administrator provides read permissions on the three of the tables, customer, orders, and lineitem, to an Amazon Redshift user called demouser. The user typically connects to and operates in their own team’s database TPCH_CONSUMERDB on the same Amazon Redshift cluster.

Performing cross-database queries using three-part notation

In this section, we see how cross-database queries work in action. With cross-database queries, you can connect to any database and query from all the other databases in the cluster without having to reconnect. In this use case, the user demouser connects to their database TPCH_CONSUMERDB (see the following screenshot).

While connected to TPCH_CONSUMERDB, demouser can also perform queries on the data in TPCH_100gG database objects that they have permissions to, referring to them using the simple and intuitive three-part notation TPCH_100G.PUBLIC.CUSTOMER (see the following screenshot).

You can refer to and query objects in any other database in the cluster using this <database>.<schema>.<object> notation as long as you have permissions to do so. The objects can be tables or views (including regular, late binding and materialized views).

In addition to performing queries on objects, you can create views on top of objects in other databases and apply granular access controls as relevant.

Joining data across databases

With cross-database queries, you can join datasets across databases. In the following screenshot, demouser queries and performs joins across the customer, lineitem, and orders tables in the TPCH_100G database.

You can also span joins on objects across databases. In the following query, demouser seamlessly joins the datasets from TPCH_100G (customer, lineitem, and orders tables) with the datasets in TPCH_CONSUMERDB (nation and supplier tables).

With cross-database queries, you get a consistent view of the data irrespective of the database you’re connected to.

Securely accessing relevant datasets by connecting from tools

To support the database hierarchy navigation and exploration introduced with cross-database queries, Amazon Redshift is introducing a new set of metadata views and modified versions of JDBC and ODBC drivers.

In addition, you can create aliases from one database to schemas in any other databases on the Amazon Redshift cluster. You create the aliases using the CREATE EXTERNAL SCHEMA command, which allows you to refer to the objects in cross-database queries with the two-part notation <external schema name>.<object>. For example, in the following screenshot, the database administrator connects to TPCH_CONSUMERDB and creates an external schema alias for the PUBLIC schema in TPC_100G database called TPC_100G_PUBLIC and grants the usage access on the schema to demouser.

Now, when demouser connects to TPCH_CONSUMERDB, they see the external schema in the object hierarchy (as in the following screenshot) with only the relevant objects that they have permissions to: CUSTOMER, LINEITEM, and ORDERS.

Now they can perform queries using the schema alias as if the data is local rather than using a three-part notation.

Summary and next steps

We provided you a glimpse into what you can accomplish with cross-database queries in Amazon Redshift. Cross-database queries allow you to organize and manage data across databases to effectively support multi-tenant data warehouse deployments for a wide variety of use cases. You can get started with your use case leveraging cross-database queries capability by trying out the preview. For more information, refer to the documentation cross-database queries.


About the Authors

Neeraja Rentachintala is a Principal Product Manager with Amazon Redshift. Neeraja is a seasoned Product Management and GTM leader, bringing over 20 years of experience in product vision, strategy and leadership roles in data products and platforms. Neeraja delivered products in analytics, databases, data Integration, application integration, AI/Machine Learning, large scale distributed systems across On-Premise and Cloud, serving Fortune 500 companies as part of ventures including MapR (acquired by HPE), Microsoft SQL Server, Oracle, Informatica and Expedia.com.

 

 

Jenny Chen is a senior database engineer at Amazon Redshift focusing on all aspects of Redshift performance, like Query Processing, Concurrency, Distributed system, Storage, OS and many more. She works together with development team to ensure of delivering highest performance, scalable and easy-of-use database for customer. Prior to her career in cloud data warehouse, she has 10-year of experience in enterprise database DB2 for z/OS in IBM with focus on query optimization, query performance and system performance.

 

Sushim Mitra is a software development engineer on the Amazon Redshift query processing team. His interest areas are Query Optimization problems, SQL Language features and Database security. When not at work, he enjoys reading fiction from all over the world.

 

 

 

Suzhen Lin is a senior software development engineer on the Amazon Redshift transaction processing and storage team. Suzhen Lin has over 15 years of experiences in industry leading analytical database products including AWS Redshift, Gauss MPPDB, Azure SQL Data Warehouse and Teradata as senior architect and developer. Her experiences cover storage, transaction processing, query processing, memory/disk caching and etc in on-premise/cloud database management systems.

 

 

Analyzing healthcare FHIR data with Amazon Redshift PartiQL

Post Syndicated from Amir Bar Or original https://aws.amazon.com/blogs/big-data/analyzing-healthcare-fhir-data-with-amazon-redshift-partiql/

Healthcare organizations across the globe strive to provide the best possible patient care at the lowest cost. In a patient’s care journey, multiple organizations are often involved, including the healthcare provider, diagnostic labs, pharmacies, and health insurance payors. Each of these organizations needs to exchange health data efficiently with the others to ensure care continuity and reimbursement.

The healthcare industry has adopted data exchange standards, many of which are defined by Health Level Seven International (HL7), for several decades. HL7v2, a pipe-delimited data format developed three decades ago, is still in use today despite not conforming to modern best practices for communicating between systems, such as with RESTful APIs. Naturally, these shortcomings can complicate interoperability. More recently, HL7 introduced FHIR (Fast Healthcare Interoperability Resources) to help solve some of the complexity and pave the way for healthcare organizations to modernize how they exchange information. FHIR is built around resources that logically organize healthcare information in a structured but fully extensible format. FHIR is quickly becoming the standard for information exchange in the healthcare industry; for example, the United States’ Centers for Medicare & Medicaid Services (CMS) recently announced the Interoperability and Patient Access final rule (CMS-9115-F), which adopts FHIR as the standard for exchanging health data.

In addition to exchanging information with other entities, healthcare organizations are recognizing the intrinsic value of their own health data flowing within their systems. By bringing analytics to their own health and operational data, leading healthcare organizations are now improving care quality, patient experience, and cost. However, existing tooling for data visualization, statistical analysis, and machine learning often relies on relational schemas that can be easily transformed into vectorized inputs. The majority of existing analytics infrastructure relies on this “flat” storage and presentation of data assets; this can be challenging given FHIR’s heavily nested JSON structure. In addition, data scientists need to consume FHIR format from multiple sources and connect them with each other and existing relational data that resides in existing databases.

In this post, we walk through how to use JSON Schema Induction with Amazon Redshift PartiQL to simplify how you analyze your FHIR data in its native JSON format. Although this post focuses on a simple analysis of claims data, this approach can help data scientists and data analysts reduce the manual work and long cycles of data processing when analyzing patient data by querying and running statistical analysis that is required day to day.

JSON Schema Induction and PartiQL

There are multiple ways to organize and query healthcare data on AWS. One such way is to flatten and normalize the nested JSON FHIR documents so that it’s usable in traditional relational schema. However, such an exercise delivers a subpar final model that results in hundreds of tables with thousands of columns that aren’t naturally extensible the same way FHIR is designed. In addition, consumption of the data from such a relational model is time-consuming and expensive. Alternatively, you can leave the FHIR resources in their hierarchical form and eliminate the need to invest and support a complex ETL pipeline. This approach, however, presents separate challenges because the existing analytics tools data scientists are most comfortable and productive with aren’t often well suited to interrogate deeply hierarchical data structures.

In August, 2019, AWS announced PartiQL, an open source SQL-compatible query language that makes it easy to efficiently query data regardless of where or in what format it is stored. This technology simplifies how data scientists can use SQL to directly query FHIR resources. Our approach is different from using nested types to simplify and optimize query processing. We focus on native hierarchical schema that tends to be much more complex, with many nesting levels and optional structures.

Although Amazon Redshift PartiQL is an enabling technology to query and explore, analysts and scientists also require an understanding of the underlying structure they are interacting with. The FHIR standard incorporates descriptions of data elements as first-class members and presentation of this context alongside the data itself promotes a richer understanding. Despite existing tools having the ability to infer a schema from data (such as Apache Spark and AWS Glue crawlers), they can’t incorporate additional structural information about the data, as found in the FHIR standard. These tools also don’t offer immediate DDL generation, which is very useful to create schema files that define your catalog structure.

As the schema becomes more complex, it’s harder to devise the DDL for it manually, and using our schema induction library becomes necessary. Programmatically inferring the JSON schema is a mechanism that you can use to query FHIR, or any set of JSON documents, in their native structure. We present the tooling required for JSON Schema Induction and provide step-by-step examples to help you get started querying FHIR resources. In this post, we focus on healthcare data in the FHIR format, but our approach for analyzing hierarchical data in JSON or XML formats is applicable to many other data standards in other domains.

To address these challenges and overcome the shortcomings of existing approaches, we have introduced a new open-source library for inferring a schema from a set of JSON documents. As a result of this inference, we can to generate table definitions that significantly streamline the ability to efficiently process FHIR data directly. This library combines the induced schema from the sample data with the descriptive information from the FHIR standard to generate a rich DDL for Amazon Redshift and a JSON tree for UI visualization.

Solution overview

The following diagram illustrates the architecture of the solution.

In this post, we demonstrate how to use our open-source library with conjunction of Amazon Redshift PartiQL queries and Amazon Redshift Spectrum, which enables you to directly query and join data across your data warehouse and data lake. Our approach allows you to design a semi-relational schema, where scientists can quickly combine relational and non-relational data from multiple resource tables.

We use PartiQL in Amazon Redshift Spectrum over data stored in Amazon Simple Storage Service (Amazon S3). Amazon S3 is an object storage built to store and retrieve any amount of data, structured or unstructured, while providing extreme durability and availability, and infinitely scalable data storage infrastructure at very low costs. Amazon Redshift Spectrum runs complex SQL queries directly over Amazon S3 storage without loading or other data preparation, and AWS Glue serves as the meta-store catalog for the Amazon S3 data. That allows us to run PartiQL queries on Amazon S3 prefixes containing FHIR resources stored as JSON or Parquet files. We use Jupyter notebooks to illustrate how to build and use the Schema Induction tool, and for test data we use the public dataset from Fhirbase.

Querying FHIR using PartiQL

To provide a short introduction to PartiQL and illustrate the power of PartiQL, we show a few examples of PartiQL queries over FHIR data. Each example shows different aspects of query functionality varying from simple to aggregate queries. Although you can use this approach for myriad FHIR resources, we focus on claims data as an illustrative example. If you’re already familiar with PartiQL language, you can skip this section.

Assume that we have millions of claim documents as illustrated in claim-example-cms1500-medical.json and in the following code. We want to process attributes regarding ID (line 3), status (line 14), patient (lines 24–26), and diagnosis (lines 58–69):

1.	{
2.	  "resourceType": "Claim",
3.	  "id": "100150",
4.	  "text": {
5.	    "status": "generated",
6.	    "div": "<div xmlns=\"http://www.w3.org/1999/xhtml\">A human-readable rendering of the Oral Health Claim</div>"
7.	  },
8.	  "identifier": [
9.	    {
10.	      "system": "http://happyvalley.com/claim",
11.	      "value": "12345"
12.	    }
13.	  ],
14.	  "status": "active",
15.	  "type": {
16.	    "coding": [
17.	      {
18.	        "system": "http://terminology.hl7.org/CodeSystem/claim-type",
19.	        "code": "oral"
20.	      }
21.	    ]
22.	  },
23.	  "use": "claim",
24.	  "patient": {
25.	    "reference": "Patient/1"
26.	  },
27.	  "created": "2014-08-16",
28.	  "insurer": {
29.	    "reference": "Organization/2"
30.	  },
31.	  "provider": {
32.	    "reference": "Organization/1"
33.	  },
34.	  "priority": {
35.	    "coding": [
36.	      {
37.	        "code": "normal"
38.	      }
39.	    ]
40.	  },
41.	  "payee": {
42.	    "type": {
43.	      "coding": [
44.	        {
45.	          "code": "provider"
46.	        }
47.	      ]
48.	    }
49.	  },
50.	  "careTeam": [
51.	    {
52.	      "sequence": 1,
53.	      "provider": {
54.	        "reference": "Practitioner/example"
55.	      }
56.	    }
57.	  ],
58.	  "diagnosis": [
59.	    {
60.	      "sequence": 1,
61.	      "diagnosisCodeableConcept": {
62.	        "coding": [
63.	          {
64.	            "code": "123456"
65.	          }
66.	        ]
67.	      }
68.	    }
69.	  ],
70.	  "insurance": [
71.	    {
72.	      "sequence": 1,
73.	      "focal": true,
74.	      "identifier": {
75.	        "system": "http://happyvalley.com/claim",
76.	        "value": "12345"
77.	      },
78.	      "coverage": {
79.	        "reference": "Coverage/9876B1"
80.	      }
81.	    }
82.	  ],
83.	  "item": [
84.	    {
85.	      "sequence": 1,
86.	      "careTeamSequence": [
87.	        1
88.	      ],
89.	      "productOrService": {
90.	        "coding": [
91.	          {
92.	            "code": "1200"
93.	          }
94.	        ]
95.	      },
96.	      "servicedDate": "2014-08-16",
97.	      "unitPrice": {
98.	        "value": 135.57,
99.	        "currency": "USD"
100.	      },
101.	      "net": {
102.	        "value": 135.57,
103.	        "currency": "USD"
104.	      }
105.	    }
106.	  ]
107.	}

Creating the claims table DDL

To run queries with Amazon Redshift Spectrum, we first need to create the external table for the claims data. The claims table DDL must use special types such as Struct or Array with a nested structure to fit the structure of the JSON documents. For the FHIR claims document, we use the following DDL to describe the documents:

1.	  create external table fhir.Claims(
2.		"resourceType" varchar(500),
3.		"id" varchar(500),
4.		"status" varchar(500),
5.		"use" varchar(500),
6.		"patient" struct<"reference": varchar(500)>,
7.		"billablePeriod" struct<"start": varchar(500),"end": varchar(500)>,
8.		"organization" struct<"reference": varchar(500)>,
9.		"diagnosis" array<struct<"sequence": double precision,"diagnosisReference": struct<"reference": varchar(500)>>>,
10.		"item" array<struct<"sequence": double precision,"encounter": array<varchar(500)>,"diagnosisLinkId": array<double precision>,"informationLinkId": array<double precision>,"net": struct<"value": double precision,"system": varchar(500),"code": varchar(500)>,"procedureLinkId": array<double precision>>>,
11.		"total" struct<"value": double precision,"system": varchar(500),"code": varchar(500)>,
12.		"information" array<struct<"sequence": double precision,"category": struct<"coding": array<struct<"system": varchar(500),"code": varchar(500)>>>,"valueReference": struct<"reference": varchar(500)>>>,
13.		"procedure" array<struct<"sequence": double precision,"procedureReference": struct<"reference": varchar(500)>>>,
14.		"prescription" struct<"reference": varchar(500)>
15.	)
16.	row format serde 'org.openx.data.jsonserde.JsonSerDe'
17.	with serdeproperties ('dots.in.keys' = 'true','mapping.requesttime' = 'requesttimestamp','strip.outer.array' = 'true')
18.	location 's3://<bucket>/folder1/Claim-1/';

Because each column might be a highly nested JSON structure, this structure requires many levels of type definitions and optional attributes, as shown in lines 9–14. The definition of the preceding table works for a certain type of document in our sample. A table definition that can satisfy any claim document as defined by the FHIR JSON schema is even more complex. This is because the FHIR schema aims to capture any possible medical information, not just the existing data you have, and therefore is more complex than what you currently need.

Traditionally, creating a DDL table is a one-time operation performed by the database administrator, but devising DDL manually isn’t intuitive when it comes to a complex hierarchical dataset. Therefore, we created a new tool: the Schema Induction Tool, which is an open-source library that infers a schema out of a collection of documents and produces rich structure information of the collection of documents scanned.

Generating DDL with the Schema Induction Tool

The Schema Induction Tool is a java utility that reads a collection of JSON documents as stream, learns their common schema, and generates a create table statement for Amazon Redshift Spectrum. In addition, if the documents adhere to a JSON standard schema, the schema file can be provided for additional metadata annotations such as attributes descriptions, concrete datatypes, enumerations, and more. The JSON standard provides a comprehensive data structure for all possible variations of the data, whereas the enriched inferred schema only considers the structure that it has seen from the documents, and doesn’t explode the DDL statement with all the theoretical possibilities that the JSON schema defines. It is therefore imperative to provide a rich and heterogeneous set of documents that can provide the total possible paths in the current data. Because the utility uses stream processing, you can run it on a large volume of documents if time permits.

You can build the code yourself from the GitHub repo, or download a release artifact from Maven central.

The CLI syntax of the command is as follows:

java -jar aws-json-schema-induction.jar 
                        [-ah] [-c=<s3>] [-d=<outDdlFile>] -i=<inputFile> 
                        [-l=<tableLocation>] [-r=<region>] [-root=<rootDefinition>]
                        [-s=<outSchemaFile>] [--stats=<outStatsFile>]
                        [-t=<tableName>]
      -a, --array               is the document a json array
      -c, --cred=<s3>           which type of s3 credentials to use (ec2|profile)
      -d, --ddl=<outDdlFile>    An output ddl file for Redshift
      -h, --help                display a help message
      -i, --input=<inputFile>   An input json file path.
      -l, --location=<tableLocation>
                                table location to use when creating DDL
      -r, --region=<region>     s3 region to use
          -root, --root=<rootDefinition>
                                s3 region to use
      -s, --schema=<outSchemaFile>
                                An output schema file json file path
          --stats=<outStatsFile>
                                An output stats file
      -t, --table=<tableName>   table name to use when creating DDL 

The utility generates a DDL output file that contains the Create Table statement as defined by the ddl switch, and optionally a file that contains the hierarchical schema of the document collection as defined by the schema switch. Finally, a stat file is generated that contains the total number of occurrences of each element from the input documents collection.

Building the Induction Tool

We can now put it all together so we can analyze FHIR Claims data. We assume you have access to AWS account and also to a Linux terminal where you can access the data on Amazon S3 and run commands. We use Amazon SageMaker to host our Jupyter notebook.

  1. Build the Schema induction tool with the following code:
    git clone https://github.com/awslabs/amazon-redshift-json-schema-induction.git
    cd amazon-redshif-json-schema-induction
    mvn package

You use Amazon SageMaker to generate the Create Table statement for the claims.

  1. Create an Amazon SageMaker notebook on the Amazon SageMaker console.
  2. Select the notebook and choose Open JupyterLab.
  3. Copy the example data from your code folder schema-induction/target/schema-induction-1.0.0.jar to the notebook by dragging the file and dropping it into the left panel.
  4. Copy the example data from your code folder data/claims.json to the notebook.
  5. Copy the notebook document from your code folder notebooks/fhir-Partiql-notebook.ipynb.
  6. Open the notebook, set up the variables in the first cell per your account, and run the first cells to download and build the schema induction tool.

Preparing the data

Now the induction tool is ready and we can perform the schema induction.

  1. Run the next cells to download and check the test data.
  2. Check that the schema induction tool is ready and review the help guide.

Creating the FHIR table on Amazon Redshift

  1. Run the tool and review the induced DDL.

You can explore the data structure using the tree control display by running the following cell:

  1. On the AWS Glue console, create a database called fhir and leave location empty.
  2. Create an AWS Identity and Access Management (IAM) role that allows your Amazon Redshift cluster to access the S3 bucket and name it fhir-role.

For more information about creating an Amazon Redshift Spectrum role that can access your S3 buckets, see Authorizing Amazon Redshift to access other AWS services on your behalf.

  1. On your Amazon Redshift cluster, make sure you added the IAM role so you can run the queries and access Amazon S3 and AWS Glue and that the status shows as in-sync.

You can now run your PartiQL queries.

  1. Open the SQL Editor and connect to your database (in this use case, we use the Amazon Redshift Query Editor).
  2. Create the external schema in Amazon Redshift by entering the following code:
    create external schema fhir
    from data catalog
    database 'fhir'
    region '<your region>'
    iam_role 'arn:aws:iam::<account-id>:role/fhir-role';

For more information about creating external schema in Amazon Redshift, see CREATE EXTERNAL SCHEMA.

  1. Create the external table by copying the Create Table DDL from your notebook into your query and running it.
  2. Run your first PartiQL query on a new Query

Querying claims data

Claims are used by providers, payors, and insurers to exchange financial information and supporting clinical information regarding the provision of healthcare services with payors for reporting to regulatory bodies and firms that provide data analytics. In this section, we provide queries as examples of the analyses you can run.

The following query scans all documents in the users.Claims table and retrieves information from each claim.

1.	SELECT c.id,c.status,
2.	       c.patient.reference as patient_ref, 
3.	       SPLIT_PART(c.patient.reference,'/',2) as patient_key,
4.	       d.sequence as diag_seq,
5.	       d.diagnosisReference.reference as diag_ref
6.	
7.	FROM fhir.Claims as c,
8.	     c.diagnosis as d
9.	WHERE c.status = 'active'  

The claim.diagnosis is an array that might contain multiple objects; it is therefore referenced in the SQL From clause and joined with the parent document implicitly, then attributes from the diagnosis element can be retrieved in the SELECT clause. This native SQL approach to un-nest arrays is one of the cornerstones of the PartiQL language. The query also uses the dot notation to access attributes in nested structures, such as c.patient.reference, which accesses the reference attribute inside the patient structure that is in the claim document.

Assuming there are claim documents without diagnosis information, and we want to retrieve the claims even if they have no diagnosis, we want a more permissive join by turning it into a left join. See the following code:

1.	SELECT c.id,c.status,
2.	       c.patient.reference as patient_ref, 
3.	       SPLIT_PART(c.patient.reference,'/',2) as patient_key,
4.	       d.sequence as diag_seq,
5.	       d.diagnosisReference.reference as diag_ref
6.	
7.	FROM fhir.Claims as c LEFT JOIN c.diagnosis as d
8.	WHERE c.status = 'active'  

You don’t need to specify the join keys because the join key is essentially the parent-child relationship of the claim and diagnosis structures.

The following query performs a simple aggregation over the claims data:

1.	SELECT c.status,count(*) as cnt, sum(c.total.value) as total
2.	FROM fhir.Claims as c
3.	GROUP BY c.status,c.patient.reference

It uses the c.patient.reference as the grouping key and the c.total.value as the aggregated value.

The following code queries all the patients’ claims:

1.	SELECT c.id,c.status,
2.	       c.patient.reference as patient_ref, 
3.	       SPLIT_PART(c.patient.reference,'/',2) as patient_key,
4.	       d.sequence as diag_seq,
5.	       d.diagnosisReference.reference as diag_ref
6.	
7.	FROM fhir.Claims as c,
8.	     c.diagnosis as d
9.	WHERE c.status = 'active'  

The following screenshot shows the query output.

Congratulations, you have successfully queried FHIR data in Amazon Redshift Spectrum using PartiQL language.

Conclusion

The FHIR standard looks promising to do great things for healthcare and although it has limitations, we believe the benefits are here to stay. You can directly analyze FHIR format without creating massive normalized design, and can leapfrog over competitors to save tremendous investments in data modeling and ETL maintenance. This post shows you the power of Amazon Redshift PartiQL and how your data science and data engineer team can handle complex data formats like FHIR easily with automation. Adopting these technologies can bring new innovation to the healthcare world and unlock new benefits and cost-optimization.

If you have any questions, please leave your thoughts in the comments section. We look forward to your feedback on our new open-source Schema Induction Tool in GitHub.


About the Authors

Amir Bar Or is a Principal Data Architect at AWS Professional Services. After 20 years leading software organizations and developing data analytics platforms and products, he is now sharing his experience with large enterprise customers and helping them scale their data analytics in the cloud.

 

 

Dr. Aaron Friedman a Principal Solutions Architect at Amazon Web Services working with healthcare and life sciences startups to accelerate science and improve patient care. His passion is working at the intersection of science, big data, and software. Outside of work, he’s with his family outdoors or learning a new thing to cook.

 

 

Crafting serverless streaming ETL jobs with AWS Glue

Post Syndicated from Radhika Ravirala original https://aws.amazon.com/blogs/big-data/crafting-serverless-streaming-etl-jobs-with-aws-glue/

Organizations across verticals have been building streaming-based extract, transform, and load (ETL) applications to more efficiently extract meaningful insights from their datasets. Although streaming ingest and stream processing frameworks have evolved over the past few years, there is now a surge in demand for building streaming pipelines that are completely serverless. Since 2017, AWS Glue has made it easy to prepare and load your data for analytics using ETL.

In this post, we dive into streaming ETL in AWS Glue, a feature that allows you to build continuous ETL applications on streaming data. Streaming ETL in AWS Glue is based on Apache Spark’s Structured Streaming engine, which provides a fault-tolerant, scalable, and easy way to achieve end-to-end stream processing with exactly-once semantics. This post walks you through an example of building a stream processing pipeline using AWS Glue that includes reading streaming data from Amazon Kinesis Data Streams, schema discovery, running streaming ETL, and writing out the results to a sink.

Serverless streaming ETL architecture

For this post, our use case is relevant to our current situation with the COVID-19 pandemic. Ventilators are in high demand and are increasingly used in different settings: hospitals, nursing homes, and even private residences. Ventilators generate data that must be monitored, and an increase in ventilator usage means there is a tremendous amount of streaming data that needs to be processed promptly, so patients can be attended to as quickly as possible as the need arises. In this post, we build a streaming ETL job on ventilator metrics and enhance the data with details to raise the alert level if the metrics fall outside of the normal range. After you enrich the data, you can use it to visualize on monitors.

In our streaming ETL architecture, a Python script generates sample ventilator metrics and publishes them as a stream into Kinesis Data Streams. We create a streaming ETL job in AWS Glue that consumes continuously generated ventilator metrics in micro-batches, applies transformations, performs aggregations, and delivers the data to a sink, so the results can be visualized or used in downstream processes.

Because businesses often augment their data lakes built on Amazon Simple Storage Service (Amazon S3) with streaming data, our first use case applies transformations on the streaming JSON data ingested via Kinesis Data Streams and loads the results in Parquet format to an Amazon S3 data lake. After ingested to Amazon S3, you can query the data with Amazon Athena and build visual dashboards using Amazon QuickSight.

For the second use case, we ingest the data from Kinesis Data Streams, join it with reference data in Amazon DynamoDB to calculate alert levels, and write the results to an Amazon DynamoDB sink. This approach allows you to build near real-time dashboards with alert notifications.

The following diagram illustrates this architecture.

AWS Glue streaming ETL jobs

With AWS Glue, you can now create ETL pipelines on streaming data using continuously running jobs. You can ingest streaming data from Kinesis Data Streams and Amazon Managed Streaming for Kafka (Amazon MSK). AWS Glue streaming jobs can perform aggregations on data in micro-batches and deliver the processed data to Amazon S3. You can read from the data stream and write to Amazon S3 using the AWS Glue DynamicFrame API. You can also write to arbitrary sinks using native Apache Spark Structured Streaming APIs.

The following sections walk you through building a streaming ETL job in AWS Glue.

Creating a Kinesis data stream

First, we need a streaming ingestion source to consume continuously generated streaming data. For this post, we create a Kinesis data stream with five shards, which allows us to push 5,000 records per second into the stream.

  1. On the Amazon Kinesis dashboard, choose Data streams.
  2. Choose Create data stream.
  3. For Data stream name, enter ventilatorsstream.
  4. For Number of open shards, choose 5.

If you prefer to use the AWS Command Line Interface (AWS CLI), you can create the stream with the following code:

aws kinesis create-stream \
    --stream-name ventilatorstream \
    --shard-count 5

Generating streaming data

We can synthetically generate ventilator data in JSON format using a simple Python application (see the GitHub repo) or the Kinesis Data Generator (KDG).

Using a Python-based data generator

To generate streaming data using a Python script, you can run the following command from your laptop or Amazon Elastic Compute Cloud (Amazon EC2) instance. Make sure you have installed the faker library on your system and set up the boto3 credentials correctly before you run the script.

python3 generate_data.py --streamname glue_ventilator_stream

Using the Kinesis Data Generator

Alternatively, you can also use the Kinesis Data Generator with the ventilator template available on the GitHub repo. The following screenshot shows the template on the KDG console.

We start pushing the data after we create our AWS Glue streaming job.

Defining the schema

We need to specify a schema for our streaming data, which we can do one of two ways:

  • Retrieve a small batch of the data (before starting your streaming job) from the streaming source, infer the schema in batch mode, and use the extracted schema for your streaming job
  • Use the AWS Glue Data Catalog to manually create a table

For this post, we use the AWS Glue Data Catalog to create a ventilator schema.

  1. On the AWS Glue console, choose Data Catalog.
  2. Choose Tables.
  3. From the Add Table drop-down menu, choose Add table manually.
  4. For the table name, enter ventilators_table.
  5. Create a database with the name ventilatordb.
  6. Choose Kinesis as the type of source.
  7. Enter the name of the stream and https://kinesis.<aws-region>.amazonaws.com.
  8. For the classification, choose JSON.
  9. Define the schema according to the following table.
Column name Data type
ventilatorid int
eventtime string
serialnumber string
pressurecontrol int
o2stats int
minutevolume int
manufacturer string

 

  1. Choose Finish.

Creating an AWS Glue streaming job to hydrate a data lake on Amazon S3

With the streaming source and schema prepared, we’re now ready to create our AWS Glue streaming jobs. We first create a job to ingest data from the streaming source using AWS Glue DataFrame APIs.

  1. On the AWS Glue console, under ETL, choose Jobs.
  2. Choose Add job.
  3. For Name, enter a UTF-8 String with no more than 255 characters.
  4. For IAM role¸ specify a role that is used for authorization to resources used to run the job and access data stores. Because streaming jobs require connecting to sources and sinks, you need to make sure that the AWS Identity and Access Management (IAM) role has permissions to read from Kinesis Data Stream, write to Amazon S3 and read, write to Amazon DynamoDB. Refer to Managing Access Permissions for AWS Glue Resources for details.
  5. For Type, choose Spark Streaming.
  6. For Glue Version, choose Spark 2.4, Python 3.
  7. For This job runs, select A new script authored by you.

You can have AWS Glue generate the streaming ETL code for you, but for this post, we author one from scratch.

  1. For Script file name, enter GlueStreaming-S3.
  2. For S3 path where script is stored, enter your S3 path.
  3. For Job bookmark, choose Disable.

For this post, we use the checkpointing mechanism of AWS Glue to keep track of the data read instead of a job bookmark.

  1. For Monitoring options, select Job metrics and Continuous logging.
  2. For Log filtering, select Standard filter and Spark UI.
  3. For Amazon S3 prefix for Spark event logs, enter the S3 path for the event logs.
  4. For Job parameters, enter the following key-values:
    1. –output path – The S3 path where the final aggregations are persisted
    2. –aws_region – The Region where you run the job

  5. Skip the connections part and choose Save and edit the script.

Streaming ETL to an Amazon S3 sink

We use the AWS Glue DynamicFrameReader class’s from_catalog method to read the streaming data. We specify the table name that has been associated with the data stream as the source of data (see the section Defining the schema). We add additional_options to indicate the starting position to read from in Kinesis Data Streams. TRIM_HORIZON allows us to start reading from the oldest record in the shard.

# Read from Kinesis Data Stream
sourceData = glueContext.create_data_frame.from_catalog( \
    database = "ventilatordb", \
    table_name = "ventilatortable", \
    transformation_ctx = "datasource0", \
    additional_options = {"startingPosition": "TRIM_HORIZON", "inferSchema": "true"})

In the preceding code, sourceData represents a streaming DataFrame. We use the foreachBatch API to invoke a function (processBatch) that processes the data represented by this streaming DataFrame. The processBatch function receives a static DataFrame, which holds streaming data for a window size of 100s (default). It creates a DynamicFrame from the static DataFrame and writes out partitioned data to Amazon S3. See the following code:

glueContext.forEachBatch(frame = sourceData, batch_function = processBatch, options = {"windowSize": "100 seconds", "checkpoint_locationation": checkpoint_location})

To transform the DynamicFrame to fix the data type for eventtime (from string to timestamp) and write the ventilator metrics to Amazon S3 in Parquet format, enter the following code:

def processBatch(data_frame, batchId):
    now = datetime.datetime.now()
    year = now.year
    month = now.month
    day = now.day
    hour = now.hour
    minute = now.minute
    if (data_frame.count() > 0):
        dynamic_frame = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame")
        apply_mapping = ApplyMapping.apply(frame = dynamic_frame, mappings = [ \
            ("ventilatorid", "long", "ventilatorid", "long"), \
            ("eventtime", "string", "eventtime", "timestamp"), \
            ("serialnumber", "string", "serialnumber", "string"), \
            ("pressurecontrol", "long", "pressurecontrol", "long"), \
            ("o2stats", "long", "o2stats", "long"), \
            ("minutevolume", "long", "minutevolume", "long"), \
            ("manufacturer", "string", "manufacturer", "string")],\
            transformation_ctx = "apply_mapping")

        dynamic_frame.printSchema()

        # Write to S3 Sink
        s3path = s3_target + "/ingest_year=" + "{:0>4}".format(str(year)) + "/ingest_month=" + "{:0>2}".format(str(month)) + "/ingest_day=" + "{:0>2}".format(str(day)) + "/ingest_hour=" + "{:0>2}".format(str(hour)) + "/"
        s3sink = glueContext.write_dynamic_frame.from_options(frame = apply_mapping, connection_type = "s3", connection_options = {"path": s3path}, format = "parquet", transformation_ctx = "s3sink")

Putting it all together

In the Glue ETL code editor, enter the following code, then save and run the job:

import sys
import datetime
import boto3
import base64
from pyspark.sql import DataFrame, Row
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame

args = getResolvedOptions(sys.argv, \
                            ['JOB_NAME', \
                            'aws_region', \
                            'output_path'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# S3 sink locations
aws_region = args['aws_region']
output_path = args['output_path']

s3_target = output_path + "ventilator_metrics"
checkpoint_location = output_path + "cp/"
temp_path = output_path + "temp/"


def processBatch(data_frame, batchId):
    now = datetime.datetime.now()
    year = now.year
    month = now.month
    day = now.day
    hour = now.hour
    minute = now.minute
    if (data_frame.count() > 0):
        dynamic_frame = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame")
        apply_mapping = ApplyMapping.apply(frame = dynamic_frame, mappings = [ \
            ("ventilatorid", "long", "ventilatorid", "long"), \
            ("eventtime", "string", "eventtime", "timestamp"), \
            ("serialnumber", "string", "serialnumber", "string"), \
            ("pressurecontrol", "long", "pressurecontrol", "long"), \
            ("o2stats", "long", "o2stats", "long"), \
            ("minutevolume", "long", "minutevolume", "long"), \
            ("manufacturer", "string", "manufacturer", "string")],\
            transformation_ctx = "apply_mapping")

        dynamic_frame.printSchema()

        # Write to S3 Sink
        s3path = s3_target + "/ingest_year=" + "{:0>4}".format(str(year)) + "/ingest_month=" + "{:0>2}".format(str(month)) + "/ingest_day=" + "{:0>2}".format(str(day)) + "/ingest_hour=" + "{:0>2}".format(str(hour)) + "/"
        s3sink = glueContext.write_dynamic_frame.from_options(frame = apply_mapping, connection_type = "s3", connection_options = {"path": s3path}, format = "parquet", transformation_ctx = "s3sink")

# Read from Kinesis Data Stream
sourceData = glueContext.create_data_frame.from_catalog( \
    database = "ventilatordb", \
    table_name = "ventilatortable", \
    transformation_ctx = "datasource0", \
    additional_options = {"startingPosition": "TRIM_HORIZON", "inferSchema": "true"})

sourceData.printSchema()

glueContext.forEachBatch(frame = sourceData, batch_function = processBatch, options = {"windowSize": "100 seconds", "checkpoint_locationation": checkpoint_location})
job.commit()

Querying with Athena

When the processed streaming data is written in Parquet format to Amazon S3, we can run queries on Athena. Run the AWS Glue crawler on the Amazon S3 location where the streaming data is written out. The following screenshot shows our query results.

For instructions on building visual dashboards with the streaming data in Amazon S3, see Quick Start: Create an Analysis with a Single Visual Using Sample Data. The following dashboards show distribution of metrics, averages, and alerts based on anomalies on an hourly basis, but you can create more advanced dashboards with much granular (minute) intervals.

Streaming ETL to a DynamoDB sink

For the second use case, we transform the streaming data as it arrives without micro-batching and persist the data to a DynamoDB table. Scripts to create DynamoDB tables are available in the GitHub repo. We use Apache Spark’s Structured Streaming API to read ventilator-generated data from the data stream, join it with reference data for normal metrics range in a DynamoDB table, compute the status based on the deviation from normal metric values, and write the processed data to a DynamoDB table. See the following code:

import sys
import datetime
import base64
import decimal
import boto3
from pyspark.sql import DataFrame, Row
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame

args = getResolvedOptions(sys.argv, \
                            ['JOB_NAME', \
                            'aws_region', \
                            'checkpoint_location', \
                            'dynamodb_sink_table', \
                            'dynamodb_static_table'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Read parameters
checkpoint_location = args['checkpoint_location']
aws_region = args['aws_region']

# DynamoDB config
dynamodb_sink_table = args['dynamodb_sink_table']
dynamodb_static_table = args['dynamodb_static_table']

def write_to_dynamodb(row):
    '''
    Add row to DynamoDB.
    '''
    dynamodb = boto3.resource('dynamodb', region_name=aws_region)
    start = str(row['window'].start)
    end = str(row['window'].end)
    dynamodb.Table(dynamodb_sink_table).put_item(
      Item = { 'ventilatorid': row['ventilatorid'], \
                'status': str(row['status']), \
                'start': start, \
                'end': end, \
                'avg_o2stats': decimal.Decimal(str(row['avg_o2stats'])), \
                'avg_pressurecontrol': decimal.Decimal(str(row['avg_pressurecontrol'])), \
                'avg_minutevolume': decimal.Decimal(str(row['avg_minutevolume']))})

dynamodb_dynamic_frame = glueContext.create_dynamic_frame.from_options( \
    "dynamodb", \
    connection_options={
    "dynamodb.input.tableName": dynamodb_static_table,
    "dynamodb.throughput.read.percent": "1.5"
  }
)

dynamodb_lookup_df = dynamodb_dynamic_frame.toDF().cache()

# Read from Kinesis Data Stream
streaming_data = spark.readStream \
                    .format("kinesis") \
                    .option("streamName","glue_ventilator_stream") \
                    .option("endpointUrl", "https://kinesis.us-east-1.amazonaws.com") \
                    .option("startingPosition", "TRIM_HORIZON") \
                    .load()

# Retrieve Sensor columns and do a simple projection
ventilator_fields = streaming_data \
    .select(from_json(col("data") \
    .cast("string"),glueContext.get_catalog_schema_as_spark_schema("ventilatordb","ventilators_table")) \
    .alias("ventilatordata")) \
    .select("ventilatordata.*") \
    .withColumn("event_time", to_timestamp(col('eventtime'), "yyyy-MM-dd HH:mm:ss")) \
    .withColumn("ts", to_timestamp(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))

# Stream static join, ETL to augment with status
ventilator_joined_df = ventilator_fields.join(dynamodb_lookup_df, "ventilatorid") \
    .withColumn('status', when( \
    ((ventilator_fields.o2stats < dynamodb_lookup_df.o2_stats_min) | \
    (ventilator_fields.o2stats > dynamodb_lookup_df.o2_stats_max)) & \
    ((ventilator_fields.pressurecontrol < dynamodb_lookup_df.pressure_control_min) | \
    (ventilator_fields.pressurecontrol > dynamodb_lookup_df.pressure_control_max)) & \
    ((ventilator_fields.minutevolume < dynamodb_lookup_df.minute_volume_min) | \
    (ventilator_fields.minutevolume > dynamodb_lookup_df.minute_volume_max)), "RED") \
    .when( \
    ((ventilator_fields.o2stats >= dynamodb_lookup_df.o2_stats_min) |
    (ventilator_fields.o2stats <= dynamodb_lookup_df.o2_stats_max)) & \
    ((ventilator_fields.pressurecontrol >= dynamodb_lookup_df.pressure_control_min) | \
    (ventilator_fields.pressurecontrol <= dynamodb_lookup_df.pressure_control_max)) & \
    ((ventilator_fields.minutevolume >= dynamodb_lookup_df.minute_volume_min) | \
    (ventilator_fields.minutevolume <= dynamodb_lookup_df.minute_volume_max)), "GREEN") \
    .otherwise("ORANGE"))

ventilator_joined_df.printSchema()

# Drop the normal metric values
ventilator_transformed_df = ventilator_joined_df \
                            .drop('eventtime', 'o2_stats_min', 'o2_stats_max', \
                            'pressure_control_min', 'pressure_control_max', \
                            'minute_volume_min', 'minute_volume_max')

ventilator_transformed_df.printSchema()

ventilators_df = ventilator_transformed_df \
    .groupBy(window(col('ts'), '10 minute', '5 minute'), \
    ventilator_transformed_df.status, ventilator_transformed_df.ventilatorid) \
    .agg( \
    avg(col('o2stats')).alias('avg_o2stats'), \
    avg(col('pressurecontrol')).alias('avg_pressurecontrol'), \
    avg(col('minutevolume')).alias('avg_minutevolume') \
    )

ventilators_df.printSchema()

# Write to DynamoDB sink
ventilator_query = ventilators_df \
    .writeStream \
    .foreach(write_to_dynamodb) \
    .outputMode("update") \
    .option("checkpointLocation", checkpoint_location) \
    .start()

ventilator_query.awaitTermination()

job.commit()

After the above code is run, ventilator metric aggregations get persisted in the Amazon DynamoDB table as follows. You can build custom user interface applications with the data in Amazon DynamoDB to create dashboards.

Conclusion

Streaming applications have become a core component of data lake architectures. With AWS Glue streaming, you can create serverless ETL jobs that run continuously, consuming data from streaming services like Kinesis Data Streams and Amazon MSK. You can load the results of streaming processing into an Amazon S3-based data lake, JDBC data stores, or arbitrary sinks using the Structured Streaming API.

For more information about streaming AWS Glue ETL jobs, see the following:

We encourage you to build a serverless streaming application using AWS Glue streaming ETL and share your experience with us. If you have any questions or suggestions, share them in the comments.


About the Author


Radhika Ravirala is a specialist solutions architect at Amazon Web Services, where she helps customers craft distributed analytics applications on the AWS platform. Prior to her cloud journey, she worked as a software engineer and designer for technology companies in Silicon Valley.

Automating EMR workloads using AWS Step Functions

Post Syndicated from Afsar Jahangir original https://aws.amazon.com/blogs/big-data/automating-emr-workloads-using-aws-step-functions/

Amazon EMR allows you to process vast amounts of data quickly and cost-effectively at scale. Using open-source tools such as Apache Spark, Apache Hive, and Presto, and coupled with the scalable storage of Amazon Simple Storage Service (Amazon S3), Amazon EMR gives analytical teams the engines and elasticity to run petabyte-scale analysis for a fraction of the cost of traditional on-premises clusters. Developers and analysts can use Jupyter-based Amazon EMR notebooks for iterative development, collaboration, and access to data stored across AWS data products.

What happens if you have Amazon EMR code that needs to run automatically on a regular basis? Maybe the job only runs when for certain events, like new data arriving in Amazon S3. Or maybe you want to run a job every Friday afternoon at 2:00 PM. What if there is a multiple step process?

To run Amazon EMR workloads on a schedule, you can automate everything with AWS Step Functions. This post walks through how to use Step Functions state machines and the callback pattern to automate EMR jobs. You can download the code examples from the GitHub repo.

Prerequisites

To follow along with this walkthrough, you must have the following:

Solution overview

For this use case, I want to run two applications on my EMR cluster. The start of the second application depends on the successful completion and output of the first. At a high level, I want to launch an EMR cluster automatically, run the code, and remove the cluster. Specifically, when the first program successfully completes, I want to run the second program.

At the conclusion of the second application, in some cases I may want to run both programs multiple times (with different dataset sizes, perhaps). I need a way to decide to run the process again with the same cluster. Whether the steps succeed or fail, at the conclusion, I always want to delete the CloudFormation stack that contains my EMR cluster to reduce cost. The following diagram illustrates this high-level overview of the pipeline operation.

Workflow details

I run two programs, and I need the first program to complete before running the second one. I optionally want to repeat those two programs with different datasets to get the final state of the data. To orchestrate the jobs, I can run through the same steps multiple times with the same active EMR cluster.

To facilitate automating the pipeline, I use an inner state machine to check the cluster status and submit EMR job steps. I then wrap that inner state machine in an outer state machine. The outer state machine starts the cluster and submits information to the inner state machine. It waits for all steps to complete, then deletes the EMR cluster.

The following flow chart illustrates the steps and checkpoints that make up the pipeline.

Deploying the pipeline state machines

To simplify pipeline deployment, I use AWS SAM, an open-source framework for building serverless applications. AWS SAM provides a single deployment configuration, extensions to CloudFormation templates, built-in best practices, and local debugging and testing. You can use AWS SAM with a suite of AWS tools for building serverless applications. For more information, see What Is the AWS Serverless Application Model (AWS SAM)?

Initiating the application

Navigate to the path where you want to download the files and initiate the AWS SAM application. I want to run the code from my local machine and have created the following location:

~/Documents/projects/blog/steppipeline/automation-ml-step-data-pipeline

From this directory, I initialize the application using sam init. This connects to the repository and downloads the files for creation of the ML pipeline. See the following code:

sam init -l https://github.com/aws-samples/automation-ml-step-data-pipeline.git

Creating the S3 bucket and moving dependencies

For this post, I orchestrate an existing process from the post Anomaly Detection Using PySpark, Hive, and Hue on Amazon EMR, which runs on Amazon EMR. The pipeline reads code artifacts from Amazon S3, where the EMR cluster has read permission. There are two programs: kmeansandey.py and kmeanswsssey.py.

First, create the bucket from the command line using the aws s3 mb command and upload the code. Your bucket name must be globally unique:

aws s3 mb s3://<your bucket name>

Move the artifacts to your bucket, replacing <your bucket name> with your bucket name:

aws s3 cp sample_ml_code/kmeansandey.py s3://<your bucket name>/testcode/kmeansandey.py
aws s3 cp sample_ml_code/kmeanswsssey.py s3://<your bucket name>/testcode/kmeanswsssey.py
aws s3 cp emr/bootstrapactions.sh s3://<your bucket name>/emr-bootstrap-scripts/bootstrapactions.sh
aws s3 cp emr/emr-cluster-config.json s3://<your bucket name>/emr-cluster-config.json
aws s3 cp emr/emr-cluster-sample.yaml s3://<your bucket name>/emr-cluster-sample.yaml

Deploying the application

Deploy the build artifacts to the AWS Cloud using the following code:

sam deploy --guided

AWS SAM prompts you for the parameters that you need to build and deploy the application. I have provided some default values where possible.

The final output of your deployment process should indicate that all stacks were built:

Successfully created/updated stack - step-pipeline in us-east-1

After deployment, you receive an email to confirm your subscription. Choose the confirmation link in the email to receive pipeline notifications.

Submitting a workload to your Step Functions state machine

To create a cluster and submit EMR jobs, the outer state machine needs a JSON payload. This contains the location of the programs in Amazon S3, the Amazon EMR CloudFormation template, and the parameter files used to launch the EMR cluster.

Creating an Amazon EC2 key pair

To use the same sample programs and EMR cluster template that you used to test your pipeline, you need to use an Amazon EC2 key pair for SSH credentials. When you create a cluster, you can specify the Amazon Elastic Compute Cloud (Amazon EC2) key pair to use for SSH connections to all cluster instances. The name of the keypair for this cluster is referenced in the emr-cluster-config.json file. See the following code:

<snip>
  {
    "ParameterKey": "Keyname",
    "ParameterValue": "emrcluster-launch"
  },
</snip>

To use the example as-is with the parameters unchanged, create an Amazon EC2 key pair on the AWS Management Console or AWS Command Line Interface (AWS CLI).

  1. On the Amazon EC2 console, under Network & Security, choose Key Pairs.
  2. On the Key Pairs page, choose Create Key Pair.
  3. For Key pair name, enter emrcluster-launch.
  4. Choose Create.
  5. When the console prompts you to save the private key file, save it in a safe place.

This is the only chance for you to save the private key file.

Inputting JSON for launching the pipeline

The simplest way for you to run the pipeline is to use the Start execution feature on the Step Functions console. The console gives you full functionality to initiate the function and submit a payload. In the example test_input.json, update the bucket values, security group, and subnet with the information for your account:

{  
    "ModelName": "PipelineTest_01",  
    "ModelProgram": "s3://<your bucket name>/testcode/kmeansandey.py",  
    "PreProcessingProgram": "s3://<your bucket name>/testcode/kmeanswsssey.py",  
    "EMRCloudFormation": "https://s3.amazonaws.com/<your bucket name>/emr-cluster-sample.yaml",  
    "EMRParameters": "https://s3.amazonaws.com/<your bucket name>/emr-cluster-config.json",  
    "JobInput": "s3://aws-bigdata-blog/artifacts/anomaly-detection-using-pyspark/sensorinputsmall/",  
    "SecurityGroup": "<your security group>",  
    "SubNet": "<your subnet>",  
    "ClusterSize": "4",  
    "ProcessingMode": ["TRAINING"]
}

The payload includes the following information:

  • ModelName – A short descriptive identifier used to identify the transient EMR cluster created during this process. This name shows on the Amazon EMR console for easy identification.
  • ModelProgram – The Amazon S3 URL location of the program that runs when the model initiates on the EMR cluster (step 3).
  • PreProcessingProgram – The Amazon S3 URL location of the program that runs when preprocessing initiates on the EMR cluster (step 2).
  • EMRCloudFormation – The S3 bucket HTTPS location of the CloudFormation template for launching the transient EMR cluster.
  • EMRParameters – The Amazon S3 HTTPS location of the parameter file supporting the Amazon EMR CloudFormation template.
  • JobInput – The Amazon S3 URL location of the input data for the preprocessing program.
  • SecurityGroup – The security group with ingress and egress rules for the launched EMR cluster
  • SubNet – The subnet identifier where you place your EMR cluster.
  • ClusterSize – Denotes the number of EMR cluster nodes to run the job and can be changed based on the compute need. I use 4 nodes as the input value for the sample program.
  • ProcessingMode – This is an array of values. The pipeline runs steps 2 and 3 for each value in the array. The value is passed into the program unchanged and can be used to internally control how the program runs. For this use case, it runs a single time on the small dataset.

Opening the Step Functions Executions page

On the Step Functions console, choose MLStateMachine. This is the outer state machine. On the detail page for the outer state machine, choose Start execution.

Entering your payload

On the New execution page, enter the JSON for your pipeline based on the example test_input.json. Choose Start execution.

Reviewing the workflow as it runs

You can see the pipeline running in the visual workflow and review the event history on the detail page. The following diagram shows the state machine definition used:

Diving into the pipelines details

There are four processes that run in the outer state machine pipeline:

  1. Step 1 launches an EMR cluster using the CloudFormation template. The AWS Lambda function downloads the template and parameter file from the specified Amazon S3 location and initiates the stack build.
  2. When the EMR cluster is ready, step 2 initiates the first set of code against the newly created EMR cluster, passing in the remaining parameters to the inner state machine. It adds the stack id, EMR cluster id, and status to the payload. These values are obtained from the output of the CloudFormation stack. See the following code:
    "ModelName": "PipelineTest_01",  
    "PreProcessingProgram": "s3://<your bucket name>/testcode/kmeanswsssey.py",  
    "JobInput": "s3://aws-bigdata-blog/artifacts/anomaly-detection-using-pyspark/sensorinputsmall/",  
    "ClusterSize": "4",  
    "ProcessingMode": ["TRAINING"],
    "StackId": "arn:aws:cloudformation:us-east-1:575444587305:stack/PipelineTest01-auto-emr-02142020041612/bc5fd7a0-4ee0-11ea-a395-0e4c53e0aefd",
    "Status": "CREATE_COMPLETE",
    "ClusterId": "j-MF6LWBLJZ88K"

The code contains the following information:

  • ModelName is used in the EMR cluster name to make it easier to identify in the console and AWS CLI output.
  • PreProcessingProgram in our use case points to the first code step (py). The code is passed through the first state machine and submitted to the second state machine and Amazon EMR.
  • JobInput, ClusterSize, ClusterId, StackId, and ProcessingMode are passthrough values that the program needs to run.

The step initiates the Lambda function awsblog-testproject-inner-sm-AddStepLambda-x45123xxxxxx, which engages the inner state machine asynchronously to run a high-level process of checking the cluster, adding a step, checking to see if the step is complete, and exiting back to the outer state machine when complete.

  1. Next, the outer state machine runs the Model program code (step 3) by submitting it to the Lambda function awsblog-testproject-inner-sm-AddStepLambda-x45123xxxxxx to engage the inner state machine for the second set of code (py). The process is the same as step 2 but the code it runs is from a different file and the output from the preprocessing step becomes the input for the step. See the following code:
    "ModelProgram": "s3://<your bucket name>/testcode/kmeansandey.py",

When the inner state machine is complete, it moves to a step that removes the first value from the ProcessingMode array. For this use case, there is only one value (TRAINING), which is removed, leaving the array empty. The next step in the state machine looks for remaining values; if there are none, it marks all steps as complete and moves to Delete EMR cluster.

  1. The final step in the outer state machine is to remove the EMR cluster. The Delete EMR cluster step passes the CloudFormation stack ID into lambda/delete_cfn_stack.py, initiating the deletion of the stack and cleaning up all the resources.

The output of the test programs is stored in Amazon S3 in two folders under the pipeline artifacts. The preprocessing folder contains data that is used to drive the output in the model folder. The following screenshot shows the folders in Amazon S3.

Conclusion

The Step Functions workflow in this post is a repeatable two-step pipeline. It starts an EMR cluster, runs a program that outputs data, and initiates a second program that depends on the previous job finishing. It then deletes all resources automatically.

You can adapt the workflow to respond to Amazon S3 events, a message received in a queue, a file checked into a code repository, or a schedule. Any event that can invoke Lambda can initiate the pipeline. For more information, see Invoking AWS Lambda functions.

You can download the example code from the GitHub repo and adapt it for your use case. Let me know in the comments what you built for your environment.


About the Authors

Mohammed “Afsar ” Jahangir Ali is a Senior Big Data Consultant with Amazon since January 2018. He is a data enthusiast helping customers shape their data lakes and analytic journeys on AWS.In his spare time, he enjoys taking pictures, listening to music, and spend time with family.

 

 

 

Wendy Neu has worked as a Data Architect with Amazon since January 2015. Prior to joining Amazon, she worked as a consultant in Cincinnati, OH helping customers integrate and manage their data from different unrelated data sources.

 

 

 

Event-driven refresh of SPICE datasets in Amazon QuickSight

Post Syndicated from Dylan Qu original https://aws.amazon.com/blogs/big-data/event-driven-refresh-of-spice-datasets-in-amazon-quicksight/

Businesses are increasingly harnessing data to improve their business outcomes. To enable this transformation to a data-driven business, customers are bringing together data from structured and unstructured sources into a data lake. Then they use business intelligence (BI) tools, such as Amazon QuickSight, to unlock insights from this data.

To provide fast access to datasets, QuickSight provides a fully managed calculation engine called SPICE—the Super-fast, Parallel, In-Memory Calculation Engine. At the time of writing, SPICE enables you to cache up to 250 million rows or 500 GB of data per dataset.

To extract value from the data quickly, you need access to new data as soon as it’s available. In this post, we describe how to achieve this by refreshing SPICE datasets as part of your extract, transform, and load (ETL) pipelines.

Solution architecture

In this post, you automate the refresh of SPICE datasets by implementing the following architecture.

This architecture consists of two parts: an example ETL job and a decoupled event-driven process to refresh SPICE.

For the ETL job, you use Amazon Simple Storage Service (Amazon S3) as your primary data store. Data lands in an S3 bucket, which we refer to as the raw zone. An Amazon S3 trigger configured on this bucket triggers an AWS Lambda function, which starts an AWS Glue ETL job. This job processes the raw data and outputs processed data into another S3 bucket, which we refer to as the processed zone.

This sample ETL job converts the data to Apache Parquet format and stores it in the processed S3 bucket. You can modify the ETL job to achieve other objectives, like more granular partitioning, compression, or enriching of the data. The Glue Data Catalog stores the metadata and QuickSight datasets are created using Amazon Athena data sources.

To trigger the SPICE dataset refresh, after the ETL job finishes, an Amazon EventBridge rule triggers a Lambda function that initiates the refresh.

In summary, this pipeline transforms your data and updates QuickSight SPICE datasets upon completion.

Deploying the automated data pipeline using AWS CloudFormation

Before deploying the AWS CloudFormation template, make sure you have signed up for QuickSight in one of the 11 supported Regions:

  • US East (Ohio)
  • US East (N. Virginia)
  • US West (Oregon)
  • Asia Pacific (Mumbai)
  • Asia Pacific (Seoul)
  • Asia Pacific (Singapore)
  • Asia Pacific (Sydney)
  • Asia Pacific (Tokyo)
  • EU (Frankfurt)
  • EU (Ireland)
  • EU (London)

This post works with both Standard and Enterprise editions of QuickSight. Enterprise Edition provides richer features and higher limits compared to Standard Edition.

  1. After you sign up for QuickSight, you can use CloudFormation templates to create all the necessary resources by choosing Launch stack:
  2. Enter a stack name; for example, SpiceRefreshBlog.
  3. Acknowledge the AWS Identity and Access Management (IAM) resource creation.
  4. Choose Create stack.

The CloudFormation template creates the following resources in your AWS account:

  • Three S3 buckets to store the following:
    • AWS Glue ETL job script
    • Raw data
    • Processed data
  • Three Lambda functions to do the following:
    • Create the ETL job
    • Initiate the ETL job upon upload of new data in the raw zone
    • Initiate the SPICE dataset refresh when the ETL job is complete
  • An AWS Glue database
  • Two AWS Glue tables to store the following:
    • Raw data
    • Processed data
  • An ETL job to convert the raw data from CSV into Apache Parquet format
  • Four IAM roles: One each for the Lambda functions and one for the ETL job
  • An EventBridge rule that triggers on an AWS Glue job state change event with a state of Succeeded and invokes a Lambda function that performs the SPICE dataset refresh

Importing the dataset

For this post, you use the taxi Trip Record Data dataset publicly available from the NYC Taxi & Limousine Commission Trip Record Data dataset. You upload monthly data in CSV format to the raw zone S3 bucket.

This data is available in Amazon S3 through Open Data on AWS, a service designed to let you spend more time on data analysis rather than data acquisition.

You start by copying the For Hire Vehicle (FHV) data for March 2020. Because the data is already available in Amazon S3 through Open Data, run the following command to copy the data into the raw zone. Make sure you replace <raw bucket name> with the name of the raw bucket created by the CloudFormation template:

aws s3 cp "s3://nyc-tlc/trip data/fhv_tripdata_2020-03.csv" s3://<raw bucket name>

After you copy the data into the raw zone, the Amazon S3 event trigger invokes the Lambda function that triggers the ETL job. You can see the job status on the AWS Glue console by choosing Jobs in the navigation pane. The process takes about 2 minutes.

When the job is complete, check that you can see the Parquet files in the processed zone S3 bucket.

Creating a QuickSight analysis of the data

To visualize the taxi data, we create a QuickSight analysis.

First, you need to give QuickSight the necessary permissions to access the processed zone S3 bucket. For instructions, see I Can’t Connect to Amazon S3.

Then complete the following steps to create an analysis of the taxi data:

  1. On the QuickSight console, choose Datasets.
  2. Choose New dataset.
  3. Choose Athena and provide a name for the data source (such as Athena).
  4. Choose Create data source.
  5. For Database, choose the name of the taxi AWS Glue database (starting with taxigluedatabase).
  6. For Tables, select processed_taxi_data as the table to visualize.
  7. Choose Select.
  8. Ensure Import to SPICE for quicker analytics is selected and choose Visualize.

After the data is imported into SPICE, you can create visuals to display the data. For example, the following screenshot shows a key performance indicator (KPI) of the number of taxi journeys aggregated at the month level and the number of journeys over time.

We use this dashboard to visualize the dataset again after we refresh SPICE with more data.

Automating the SPICE refresh

To refresh the SPICE dataset when the ETL job is complete, the CloudFormation template we deployed created an EventBridge rule that triggers a Lambda function each time an AWS Glue ETL job successfully completes. The following screenshot shows the code for the event pattern.

We need to configure the Lambda function with the ETL job name and the ID of the SPICE dataset we created in QuickSight.

  1. Locate the ETL job name on the AWS Glue console, named TaxiTransformationGlueJob-<unique id>.
  2. To find the SPICE dataset ID, run the following command using the AWS Command Line Interface (AWS CLI):
    aws quicksight list-data-sets --aws-account-id <your AWS account id> 

    The following screenshot shows the output with the dataset ID.

  3. On the Lambda console, open the Lambda function named SpiceRefreshBlog-QuicksightUpdateLambda-<unique id>.
  4. Update line 9 of the code to replace ReplaceWithGlueJobName with the AWS Glue job name and ReplaceWithYourDatasetID with the dataset ID.

Once a Glue job succeeds, this Lambda function is triggered. The EventBridge event that triggers the Lambda contains the name of the job. You can access this from the event as follows, as seen on line 25 of the function:

succeededJob = event[‘detail’][‘jobName’]

The Lambda function looks up the job name in the data_set_map dictionary. If the dictionary contains the job name, the dataset ID is accessed and the function calls the QuickSight Create Ingestion API to refresh the SPICE datasets.

You can extend the data_set_map dictionary to include additional job names and associated SPICE dataset IDs to be refreshed. If using this approach at scale, you might choose to move this configuration information to an Amazon DynamoDB table.

  1. Save the Lambda function by choosing Deploy.

Testing the automated refresh

Now that you have configured the Lambda function, we can test the ETL end-to-end process and make the next month’s data available for analysis.

To add the FHV data for April, run the following AWS CLI command:

aws s3 cp "s3://nyc-tlc/trip data/fhv_tripdata_2020-04.csv" s3://<raw bucket name>

As before, this upload to the raw zone triggers the Lambda function that starts the ETL job. You can to see the progress of the job on the AWS Glue console.

When the job is complete, navigate to QuickSight and open the taxi analysis (or, if you still have it open, refresh the window).

You can now see that both months’ data is available for analysis. This step might take 1–2 minutes to load.

To see the status of each SPICE refresh, navigate back to the dataset on the QuickSight console and choose View History.

The following screenshot shows the status of previous refreshes and the number of rows that have been ingested into SPICE.

Now that you have tested the end-to-end process, you can try copying more FHV data to the raw zone and see the data within your QuickSight analysis.

Cleaning up

To clean up the resources you created by following along with this post, complete the following steps:

  1. Delete the QuickSight analysis you created.
  2. Delete the QuickSight dataset that you created.
  3. Delete the QuickSight data source:
    1. Choose New dataset.
    2. Select the data source and choose Delete data source.
  4. On the Amazon S3 console, delete the contents of the raw and processed S3 buckets.
  5. On the AWS CloudFormation console, select the stack SpiceRefreshBlog and choose Delete.

Conclusion

Using an event-based architecture to automate the refresh of your SPICE datasets makes sure that your business analysts are always viewing the latest available data. This reduction in time to analysis can help your business unlock insights quicker without having to wait for a manual or scheduled process. Additionally, by only refreshing SPICE when new data is available, the underlying data storage resources are used efficiently, so you only pay for what you need!

Get started with QuickSight today!


About the Authors

Rob Craig is a Senior Solutions Architect with AWS. He supports customers in the UK with their cloud journey, providing them with architectural advice and guidance to help them achieve their business outcomes.

 

 

 

 

Dylan Qu is an AWS solutions architect responsible for providing architectural guidance across the full AWS stack with a focus on Data Analytics, AI/ML and DevOps.

Using administrative dashboards for a centralized view of Amazon QuickSight objects

Post Syndicated from Ying Wang original https://aws.amazon.com/blogs/big-data/using-administrative-dashboards-for-a-centralized-view-of-amazon-quicksight-objects/

“Security is job 0” is the primary maxim of all endeavors undertaken at AWS. Amazon QuickSight, the fast-growing, cloud-native business intelligence (BI) platform from AWS, allows security controls in a variety of means, including web browsers and API calls. These controls apply to various functions, such as user management, authorization, authentication, and data governance.

This post demonstrates how to build a workflow to enable a centralized visualization of QuickSight groups and user information, as well as QuickSight objects access permission auditing information. Combined with AWS CloudTrail logs, the solution enables your security team to detect any abnormal behavior in near-real time to ensure security compliance.

Benefits of a centralized dashboard

A group in QuickSight consists of a set of users. Using groups makes it easy to manage access and security. For example, you can configure three groups, called Marketing, HR, and BI Developer, and each has specific access privileges:

  • The users in the Marketing group can only view the dashboards with marketing data
  • The users in the HR group can only view the human resources data
  • The users in the BI Developer group can edit all objects, including data sources, datasets, and dashboards

After the users and groups are configured, BI administrators can check and edit the object access permission by choosing Share for dashboards, datasets, and all other objects. The following screenshot shows the Manage dashboard sharing page on the QuickSight console.

As of this writing, individual object permission information is available on the QuickSight console, and user information is provided on the user management view on the QuickSight console. Our solution integrates QuickSight APIs with other AWS services to create an administrative dashboard that provides a centralized view of essential security information. This dashboard covers not only user lists and individual object access permission information available on the current platform, but also additional security information like group lists, user-group mapping information, and overall objects access permissions. This dashboard allows you to acquire unique security insights with its collection of comprehensive security information.

This post provides a detailed workflow that covers the data pipeline, sample Python codes, the AWS CloudFormation template, and a sample administrative dashboard. With the guidance of this post, you can configure a centralized information center in your own environment.

Solution overview

The following diagram illustrates the workflow of the solution.

The workflow involves the following steps:

  1. A new user creation event in the CloudTrail log triggers the Amazon CloudWatch Events rule CreateUser.
  2. The CreateUser rule triggers the AWS Lambda function User_Initiation. This function checks if the new user belongs to an existing group (for this post, we assume that their AWS Identity and Access Management (IAM) role equates to the group they should belong to in QuickSight). If such a group exists, the function adds the user into the group (CreateGroupMembership). Otherwise, it creates a new group (CreateGroup). The following is the process flow diagram of the Lambda function.
  3. If the CreateGroupMembership event occurs, it triggers the Lambda function Data_Prepare. This function calls QuickSight APIs to get QuickSight group, user, and object access permissions information and saves the results to an Amazon Simple Storage Service (Amazon S3) bucket.
  1. If the Lambda function User_Initiation creates a new QuickSight group in Step 2, it triggers a CloudWatch rule CreateGroup and the Lambda function Group_Initiation. The Group_Initiation function updates the QuickSight objects permission for the new group, such as granting it permission to view a dashboard.
  2. The update object permission event in Step 4 triggers the Lambda function Data_Prepare, which updates the object access permissions information and saves the updated information to an S3 bucket.
  3. The DeleteUser and DeleteGroup events also trigger the Lambda function Data_Prepare.
  4. Based on the file in S3 that contains user-group mapping information and the QuickSight objects access permissions information, an Amazon Athena table is created.
  5. A QuickSight dataset fetches the data in the Athena table created in Step 7 through DirectQuery Another QuickSight dataset is created based on the CloudTrail logs data. Then, based on these two datasets, a QuickSight dashboard is created.

Prerequisites

For this walkthrough, you should have the following prerequisites:

  • An AWS account
  • Access to the following AWS services:
    • QuickSight
    • Athena
    • Lambda
    • Amazon S3
  • Basic knowledge of Python
  • Security Assertion Markup Language 2.0 (SAML 2.0) or OpenID Connect (OIDC) single sign-on (SSO) configured for QuickSight access

Creating resources

Create your resources by downloading the following AWS Cloud Development Kit (AWS CDK) stack from the GitHub repo.

Pull the Administrative Dashboard folder and run the command cdk deploy QuickSightStack to deploy the resources. For more information, see AWS CDK Intro Workshop: Python Workshop.

Implementing the solution

This solution assumes that the users log in to their QuickSight account with identity federation through SAML or OIDC. For instructions on setting up SAML SSO, see Single Sign-On Access to Amazon QuickSight Using SAML 2.0Federate Amazon QuickSight access with Okta and Enabling Amazon QuickSight federation with Azure AD. For OIDC SSO, see Use Amazon QuickSight Federated Single Sign-On with Amazon Cognito User Pools.

After you set up the IAM policy of the web identity or SAML federation role, you don’t need to invite users manually. A QuickSight user is provisioned automatically when opening QuickSight for the first time.

In this solution, one SAML federation role corresponds to a QuickSight group. There are four sample SAML roles: Marketing, HR, BI-Admin, and BI Developer.

The following code is the sample CreateUser CloudTrail event:

{
    "userIdentity": {
        "type": "AssumedRole",
        "principalId": "AROAZEAWJBC3FBJ7KDH2N:[email protected]",
        "arn": "arn:aws:sts::<aws_account_id>:assumed-role/ BI-Developer/[email protected]",
        "accountId": <aws_account_id>,
        "sessionContext": {
            "sessionIssuer": {
                "type": "Role",
                "principalId": "AROAZEAWJBC3FBJ7KDH2N",
                "arn": "arn:aws:iam:: <aws_account_id>:role/BI-Developer",
                "accountId": <aws_account_id>,
                "userName": " BI-Developer"}
        }
    },
    "eventSource": "quicksight.amazonaws.com",
    "eventName": "CreateUser",
    "awsRegion": "us-east-1",
    "eventType": "AwsServiceEvent",
…    
}

This event triggers the CloudWatch events rule CreateUser. The following screenshot shows the details of this rule.

The CreateUser rule triggers the Lambda function User_Initiation. This function gets the QuickSight group name (Marketing, HR, BI-Admin, or BI Developer) and compares the group name with the existing group list. If such a group exists, it adds this new user into that group (CreateGroupMembership). Otherwise, it creates a new group (CreateGroup).

The Data_Prepare Lambda function is triggered by adding a new user into a group event (CreateGroupMembership). This function calls the QuickSight API describe_data_set_permissions, describe_dashboard_permissions, or describe_data_source_permissions to get the object access permissions. It also calls the APIs list_user_groups and list_users to get the list of users and the groups of each user. Finally, this function creates two files containing QuickSight group, user, or object access information, and saves these files into a S3 bucket.

If a new QuickSight group is created, it triggers the Lambda function Group_Initiation to update the QuickSight dashboard, dataset, or data source permission for this new group. For example, if the HR group is created, the Group_Initiation function lets the HR group view the Employee Information dashboard.

The UpdateDashboardPermissions, UpdateDatasetPermissions, and UpdateDatasourcePermissions events trigger the Lambda function Data_Prepare to update the object access permissions information stored in the S3 bucket.

To create two Athena tables (Groups and Objects), run an AWS Glue crawler.

The following screenshot is sample data of the Groups table.

The following screenshot is sample data of the Objects table.

You can create a DirectQuery dataset in QuickSight with the two new Athena tables joined. See the following screenshot.

The Objects table contains the information of objects (such as dashboards and datasets) belonging to each group or user. Furthermore, we can create a calculated field called Ownership based on Permissions information (the actions column in objects table) to provide the objects owner with viewer or user information (the object owner can delete this object, whereas the viewer or user can’t do the deletion action).

The following screenshot shows the relevant code.

For instructions on building an Athena table with CloudTrail events, see Amazon QuickSight Now Supports Audit Logging with AWS CloudTrail. For this post, we create the table cloudtrail_logs in the default database.

After that, run the following SQL query to build an Athena view with QuickSight events for the last 24 hours:

CREATE OR REPLACE VIEW qsctlog_last_24h AS 
SELECT "useridentity"."type", "split_part"("useridentity"."sessioncontext"."sessionissuer"."arn", '/', 2) "group_name"
, COALESCE("useridentity"."username", "split_part"("useridentity"."arn", '/', 3)) "user_name"
, "eventtime"
, "eventname"
, "awsregion"
, "resources"
, "eventtype"
, "recipientaccountid"
, "serviceeventdetails"
FROM default.cloudtrail_logs
WHERE (("eventsource" = 'quicksight.amazonaws.com') AND (CAST("split_part"("eventtime", 'T', 1) AS date) > "date_add"('hour', -24, "now"()))) 

Running queries in Athena

Now we have the datasets ready in Athena and can run SQL queries against them to answer some common administrative questions.

To create a QuickSight dataset to catch all orphan users that don’t belong to any group, as well as the events done by these users in the last 24 hours, run the following SQL query:

SELECT g.*
,log.group_name as role_name
,log.user_name as log_user_name
,log.type
,log.eventtime
,log.eventname
,log.awsregion
,log.eventtype
,log.recipientaccountid
,log.serviceeventdetails
FROM "default"."qsctlog_last_24h" as log 
full outer join 
"default"."groups" as g 
on log.awsregion=g.aws_region AND log.group_name=g.group_name AND log.user_name=g.user_name 
where g.group_name is null or g.group_name=''

To create a QuickSight dataset to list objects belonging to each group or user, run the following query:

SELECT group_name AS "Group/User Name"
, object_name
, object_type
, if((actions LIKE '%Delete%'), 'Owner', 'Viewer/User') AS Ownership
FROM "default"."object" full outer
JOIN "default"."groups"
    ON group_name=principal_name
WHERE principal_type='group'
UNION
SELECT user_name AS "Group/User Name"
, object_name
, object_type
, if((actions LIKE '%Delete%'), 'Owner', 'Viewer/User') AS Ownership
FROM "default"."object" full outer
JOIN "default"."groups"
    ON user_name=principal_name
WHERE principal_type='user'
ORDER BY  "Group/User Name" asc;

The following screenshot shows the sample data.

Building dashboards

In addition to running queries directly in Athena, we can build a dashboard using this same data in QuickSight. The following screenshot shows an example dashboard that you can make using our data.

You can interactively play with the sample dashboard in the following Interactive Dashboard Demo.

Cleaning up

To avoid incurring future charges, delete the resources you created by running the following command:

cdk destroy QuickSightStack 

Then, on the Amazon S3 console, delete the S3 bucket administrative-dashboard<your_aws_account_id>.

Conclusion

This post discussed how BI administrators can use the QuickSight dashboard, Lambda functions, and other AWS services to create a centralized view of groups, users, and objects access permission information and abnormal access auditing. We also presented a serverless data pipeline to support the administrative dashboard. This dashboard can provide you with unique security insights with its collection of comprehensive security information.


About the Author

Ying Wang is a Data Visualization Engineer with the Data & Analytics Global Specialty Practice in AWS Professional Services.

Analyze and improve email campaigns with Amazon Simple Email Service and Amazon QuickSight

Post Syndicated from Apoorv Gakhar original https://aws.amazon.com/blogs/messaging-and-targeting/analyze-and-improve-email-campaigns-with-amazon-simple-email-service-and-amazon-quicksight/

Email is a popular channel for applications, used in both marketing campaigns and other outbound customer communications. The challenge with email is that it can become increasingly complex to manage for companies that must send large quantities of messages per month. This complexity is especially true when companies need to measure detailed email engagement metrics to track campaign success.

As a marketer, you want to monitor several metrics, including open rates, click-through rates, bounce rates, and delivery rates. If you do not track your email results, you could potentially be wasting your campaign resources. Monitoring and interpreting your sending results can help you deliver the best content possible to your subscribers’ inboxes, and it can also ensure that your IP reputation stays high. Mailbox providers prioritize inbox placement for senders that deliver relevant content. As a business professional, tracking your emails can also help you stay on top of hot leads and important clients. For example, if someone has opened your email multiple times in one day, it might be a good idea to send out another follow-up email to touch base.

Building a large-scale email solution is a complex and expensive challenge for any business. You would need to build infrastructure, assemble your network, and warm up your IP addresses. Alternatively, working with some third-party email solutions require contract negotiations and upfront costs.

Fortunately, Amazon Simple Email Service (SES) has a highly scalable and reliable backend infrastructure to reduce the preceding challenges. It has improved content filtering techniques, reputation management features, and a vast array of analytics and reporting functions. These features help email senders reach their audiences and make it easier to manage email channels across applications. Amazon SES also provides API operations to monitor your sending activities through simple API calls. You can publish these events to Amazon CloudWatch, Amazon Kinesis Data Firehose, or by using Amazon Simple Notification Service (SNS).

In this post, you learn how to build and automate a serverless architecture that analyzes email events. We explore how to track important metrics such as open and click rate of the emails.

Solution overview

 

The metrics that you can measure using Amazon SES are referred to as email sending events. You can use Amazon CloudWatch to retrieve Amazon SES event data. You can also use Amazon SNS to interpret Amazon SES event data. However, in this post, we are going to use Amazon Kinesis Data Firehose to monitor our user sending activity.

Enable Amazon SES configuration sets with open and click metrics and publish email sending events to Amazon Kinesis Data Firehose as JSON records. A Lambda function is used to parse the JSON records and publish the content in the Amazon S3 bucket.

Ingested data lands in an Amazon S3 bucket that we refer to as the raw zone. To make that data available, you have to catalog its schema in the AWS Glue data catalog. You create and run the AWS Glue crawler that crawls your data sources and construct your Data Catalog. The Data Catalog uses pre-built classifiers for many popular source formats and data types, including JSON, CSV, and Parquet.

When the crawler is finished creating the table definition and schema, you analyze the data using Amazon Athena. It is an interactive query service that makes it easy to analyze data in Amazon S3 using SQL. Point to your data in Amazon S3, define the schema, and start querying using standard SQL, with most results delivered in seconds.

Now you can build visualizations, perform ad hoc analysis, and quickly get business insights from the Amazon SES event data using Amazon QuickSight. You can easily run SQL queries using Amazon Athena on data stored in Amazon S3, and build business dashboards within Amazon QuickSight.

 

Deploying the architecture:

Configuring Amazon Kinesis Data Firehose to write to Amazon S3:

  1. Navigate to the Amazon Kinesis in the AWS Management Console. Choose Kinesis Data Firehose and create a delivery stream.
  2. Enter delivery stream name as “SES_Firehose_Demo”.
  3. Under the source category, select “Direct Put or other sources”.
  4. On the next page, make sure to enable Data Transformation of source records with AWS Lambda. We use AWS Lambda to parse the notification contents that we only process the required information as per the use case.
  5. Click the “Create New” Lambda function.
  6. Click on “General Kinesis Data FirehoseProcessing” Lambda blueprint and this opens up the Lambda console. Enter following values in Lambda
    • Name: SES-Firehose-Json-Parser
    • Execution role: Create a new role with basic Lambda permissions.
  7. Click “Create Function”. Now replace the Lambda code with the following provided code and save the function.
    • 'use strict';
      console.log('Loading function');
      exports.handler = (event, context, callback) => {
         /* Process the list of records and transform them */
          const output = event.records.map((record) => {
              console.log(record.recordId);
              const payload =JSON.parse((Buffer.from(record.data, 'base64').toString()))
              console.log("payload : " + payload);
              
              if (payload.eventType == "Click") {
              const resultPayLoadClick = {
                      eventType : payload.eventType,
                      destinationEmailId : payload.mail.destination[0],
                      sourceIp : payload.click.ipAddress,
                  };
              console.log("resultPayLoad : " + resultPayLoadClick.eventType + resultPayLoadClick.destinationEmailId + resultPayLoadClick.sourceIp);
              
              //const parsed = resultPayLoad[0];
              //console.log("parsed : " + (Buffer.from(JSON.stringify(resultPayLoad))).toString('base64'));
              
              
              return{
                  recordId: record.recordId,
                  result: 'Ok',
                  data: (Buffer.from(JSON.stringify(resultPayLoadClick))).toString('base64'),
              };
              }
              else {
                  const resultPayLoadOpen = {
                      eventType : payload.eventType,
                      destinationEmailId : payload.mail.destination[0],
                      sourceIp : payload.open.ipAddress,
                  };
              console.log("resultPayLoad : " + resultPayLoadOpen.eventType + resultPayLoadOpen.destinationEmailId + resultPayLoadOpen.sourceIp);
              
              //const parsed = resultPayLoad[0];
              //console.log("parsed : " + (Buffer.from(JSON.stringify(resultPayLoad))).toString('base64'));
              
              
              return{
                  recordId: record.recordId,
                  result: 'Ok',
                  data: (Buffer.from(JSON.stringify(resultPayLoadOpen))).toString('base64'),
              };
              }
          });
          console.log("Output : " + output.data);
          console.log(`Processing completed.  Successful records ${output.length}.`);
          callback(null, { records: output });
      };

      Please note:

      For this blog, we are only filtering out three fields i.e. Eventname, destination_Email, and SourceIP. If you want to store other parameters you can modify your code accordingly. For the list of information that we receive in notifications, you may check out the following document.

      https://docs.aws.amazon.com/ses/latest/DeveloperGuide/event-publishing-retrieving-firehose-examples.html

  8. Now, navigate back to your Amazon Kinesis Data Firehose console and choose the newly created Lambda function.
  9. Keep the convert record format disabled and click “Next”.
  10. In the destination, choose Amazon S3 and select a target Amazon S3 bucket. Create a new bucket if you do not want to use the existing bucket.
  11. Enter the following values for Amazon S3 Prefix and Error Prefix. When event data is published.
    • Prefix:
      fhbase/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/
    • Error Prefix:
      fherroroutputbase/!{firehose:random-string}/!{firehose:error-output-type}/!{timestamp:yyyy/MM/dd}/
  12. You may utilize the above values in the Amazon S3 prefix and error prefix. If you use your own prefixes make sure to accordingly update the target values in AWS Glue which you will see in further process.
  13. Keep the Amazon S3 backup option disabled and click “Next”.
  14. On the next page, under the Permissions section, select create a new role. This opens up a new tab and then click “Allow” to create the role.
  15. Navigate back to the Amazon Kinesis Data Firehose console and click “Next”.
  16. Review the changes and click on “Create delivery stream”.

Configure Amazon SES to publish event data to Kinesis Data Firehose:

  1. Navigate to Amazon SES console and select “Email Addresses” from the left side.
  2. Click on “Verify a New Email Address” on the top. Enter your email address to which you send a test email.
  3. Go to your email inbox and click on the verify link. Navigate back to the Amazon SES console and you will see verified status on the email address provided.
  4. Open the Amazon SES console and select Configuration set from the left side.
  5. Create a new configuration set. Enter “SES_Firehose_Demo”  as the configuration set name and click “Create”.
  6. Choose Kinesis Data Firehose as the destination and provide the following details.
    • Name: OpenClick
    • Event Types: Open and Click
  7. In the IAM Role field, select ‘Let SES make a new role’. This allows SES to create a new role and add sufficient permissions for this use case in that role.
  8. Click “Save”.

Sending a Test email:

  1. Navigate to Amazon SES console, click on “Email Addresses” on the left side.
  2. Select your verified email address and click on “Send a Test email”.
  3. Make sure you select the raw email format. You may use the following format to send out a test email from the console. Make sure you send out this email to a recipient inbox to which you have the access.
    • X-SES-CONFIGURATION-SET: SES_Firehose_Demo
      X-SES-MESSAGE-TAGS: Email=NULL
      From: [email protected]
      To: [email protected]
      Subject: Test email
      Content-Type: multipart/alternative;
          		boundary="----=_boundary"
      
      ------=_boundary
      Content-Type: text/html; charset=UTF-8
      Content-Transfer-Encoding: 7bit
      This is a test email.
      
      <a href="https://aws.amazon.com/">Amazon Web Services</a>
      ------=_boundary
  4. Once the email is received in the recipient’s inbox, open the email and click the link present in the same. This generates a click and open event and send the response back to SES.

Creating Glue Crawler:

  1. Navigate to the AWS Glue console, select “crawler” from the left side, and then click on “Add crawler” on the top.
  2. Enter the crawler name as “SES_Firehose_Crawler” and click “Next”.
  3. Under Crawler source type, select “Data stores” and click “Next”.
  4. Select Amazon S3 as the data source and prove the required path. Include the path until the “fhbase” folder.
  5. Select “no” under Add another data source section.
  6. In the IAM role, select the option to ‘Create an IAM role’. Enter the name as “SES_Firehose-Crawler”. This provides the necessary permissions automatically to the newly created role.
  7. In the frequency section, select run on demand and click “Next”. You may choose this value as per your use case.
  8. Click on add Database and provide the name as “ses_firehose_glue_db”. Click on create and then click “Next”.
  9. Review your Glue crawler setting and click on “Finish”.
  10. Run the above-created crawler. This crawls the data from the specified Amazon S3 bucket and create a catalog and table definition.
  11. Now navigate to “tables” on the left, and verify a “fhbase” table is created after you run the crawler.

If you want to analyze the data stored until now, you can use Amazon Athena and test the queries. If not, you can move to the Amazon Quicksight directly.

Analyzing the data using Amazon Athena:

  1. Open Athena console and select the database, which is created using AWS Glue
  2. Click on “setup a query result location in Amazon S3” as shown in the following screenshot.
  3. Navigate to the Amazon S3 bucket created in earlier steps and create a folder called “AthenaQueryResult”. We store our Athena query result in this bucket.
  4. Now navigate back to Amazon Athena and select the Amazon S3 bucket with the folder location as shown in the following screenshot and click “Save”.
  5. Run the following query to test the sample output and accordingly modify your SQL query to get the desired output.
    • Select * from “ses_firehose_glue_db”.”fhbase”

Note: If you want to track the opened emails by unique Ip addresses then you can modify your SQL query accordingly. This is because every time an email gets opened, you will receive a notification even if the same email was previously opened.

 

Visualizing the data in Amazon QuickSight dashboards:

  1. Now, let’s analyze this data using Amazon Athena via Amazon Quicksight.
  2. Log into Amazon Quicksight and choose Manage data, New dataset. Choose Amazon Athena as a new data source.
  3. Enter the data source name as “SES-Demo” and click on “Create the data source”.
  4. Select your database from the drop-down as “ses_firehose_glue_db” and table “fhbase” that you have created in AWS Glue.
  5. And add a custom SQL based on your use case and click on “Confirm query”. Refer to the example below.
  6. You can perform ad hoc analysis and modify your query according to your business needs as shown in the following image. Click “Save & Visualize”.
  7. You can now visualize your event data on Amazon Quicksight dashboard. You can use various graphs to represent your data. For this demo, the default graph is used and two fields are selected to populate on the graph, as shown below.

 

Conclusion:

This architecture shows how to track your email sending activity at a granular level. You set up Amazon SES to publish event data to Amazon Kinesis Data Firehose based on fine-grained email characteristics that you define. You can also track several types of email sending events, including sends, deliveries, bounces, complaints, rejections, rendering failures, and delivery delays. This information can be useful for operational and analytical purposes.

To get started with Amazon SES, follow this quick start guide and you can learn more about monitoring sending activity here.

About the Authors

Chirag Oswal is a solutions architect and AR/VR specialist working with the public sector India. He works with AWS customers to help them adopt the cloud operating model on a large scale.

Apoorv Gakhar is a Cloud Support Engineer and an Amazon SES Expert. He is working with AWS to help the customers integrate their applications with various AWS Services.

 

Additional Resources:

Amazon SES Dedicated IP Pools

Amazon Personalize optimizer using Amazon Pinpoint events

Template Personalization using Amazon Pinpoint