Tag Archives: AWS Glue

Build and automate a serverless data lake using an AWS Glue trigger for the Data Catalog and ETL jobs

Post Syndicated from Saurabh Shrivastava original https://aws.amazon.com/blogs/big-data/build-and-automate-a-serverless-data-lake-using-an-aws-glue-trigger-for-the-data-catalog-and-etl-jobs/

Today, data is flowing from everywhere, whether it is unstructured data from resources like IoT sensors, application logs, and clickstreams, or structured data from transaction applications, relational databases, and spreadsheets. Data has become a crucial part of every business. This has resulted in a need to maintain a single source of truth and automate the entire pipeline—from data ingestion to transformation and analytics— to extract value from the data quickly.

There is a growing concern over the complexity of data analysis as the data volume, velocity, and variety increases. The concern stems from the number and complexity of steps it takes to get data to a state that is usable by business users. Often data engineering teams spend most of their time on building and optimizing extract, transform, and load (ETL) pipelines. Automating the entire process can reduce the time to value and cost of operations. In this post, we describe how to create a fully automated data cataloging and ETL pipeline to transform your data.

Architecture

In this post, you learn how to build and automate the following architecture.

You build your serverless data lake with Amazon Simple Storage Service (Amazon S3) as the primary data store. Given the scalability and high availability of Amazon S3, it is best suited as the single source of truth for your data.

You can use various techniques to ingest and store data in Amazon S3. For example, you can use Amazon Kinesis Data Firehose to ingest streaming data. You can use AWS Database Migration Service (AWS DMS) to ingest relational data from existing databases. And you can use AWS DataSync to ingest files from an on-premises Network File System (NFS).

Ingested data lands in an Amazon S3 bucket that we refer to as the raw zone. To make that data available, you have to catalog its schema in the AWS Glue Data Catalog. You can do this using an AWS Lambda function invoked by an Amazon S3 trigger to start an AWS Glue crawler that catalogs the data. When the crawler is finished creating the table definition, you invoke a second Lambda function using an Amazon CloudWatch Events rule. This step starts an AWS Glue ETL job to process and output the data into another Amazon S3 bucket that we refer to as the processed zone.

The AWS Glue ETL job converts the data to Apache Parquet format and stores it in the processed S3 bucket. You can modify the ETL job to achieve other objectives, like more granular partitioning, compression, or enriching of the data. Monitoring and notification is an integral part of the automation process. So as soon as the ETL job finishes, another CloudWatch rule sends you an email notification using an Amazon Simple Notification Service (Amazon SNS) topic. This notification indicates that your data was successfully processed.

In summary, this pipeline classifies and transforms your data, sending you an email notification upon completion.

Deploy the automated data pipeline using AWS CloudFormation

First, you use AWS CloudFormation templates to create all of the necessary resources. This removes opportunities for manual error, increases efficiency, and ensures consistent configurations over time.

Launch the AWS CloudFormation template with the following Launch stack button.

Be sure to choose the US East (N. Virginia) Region (us-east-1). Then enter the appropriate stack name, email address, and AWS Glue crawler name to create the Data Catalog. Add the AWS Glue database name to save the metadata tables. Acknowledge the IAM resource creation as shown in the following screenshot, and choose Create.

Note: It is important to enter your valid email address so that you get a notification when the ETL job is finished.

This AWS CloudFormation template creates the following resources in your AWS account:

  • Two Amazon S3 buckets to store both the raw data and processed Parquet data.
  • Two AWS Lambda functions: one to create the AWS Glue Data Catalog and another function to publish topics to Amazon SNS.
  • An Amazon Simple Queue Service (Amazon SQS) queue for maintaining the retry logic.
  • An Amazon SNS topic to inform you that your data has been successfully processed.
  • Two CloudWatch Events rules: one rule on the AWS Glue crawler and another on the AWS Glue ETL job.
  • AWS Identity and Access Management (IAM) roles for accessing AWS Glue, Amazon SNS, Amazon SQS, and Amazon S3.

When the AWS CloudFormation stack is ready, check your email and confirm the SNS subscription. Choose the Resources tab and find the details.

Follow these steps to verify your email subscription so that you receive an email alert as soon as your ETL job finishes.

  1. On the Amazon SNS console, in the navigation pane, choose Topics. An SNS topic named SNSProcessedEvent appears in the display.

  1. Choose the ARN The topic details page appears, listing the email subscription as Pending confirmation. Be sure to confirm the subscription for your email address as provided in the Endpoint column.

If you don’t see an email address, or the link is showing as not valid in the email, choose the corresponding subscription endpoint. Then choose Request confirmation to confirm your subscription. Be sure to check your email junk folder for the request confirmation link.

Configure an Amazon S3 bucket event trigger

In this section, you configure a trigger on a raw S3 bucket. So when new data lands in the bucket, you trigger GlueTriggerLambda, which was created in the AWS CloudFormation deployment.

To configure notifications:

  1. Open the Amazon S3 console.
  2. Choose the source bucket. In this case, the bucket name contains raws3bucket, for example, <stackname>-raws3bucket-1k331rduk5aph.
  3. Go to the Properties tab, and under Advanced settings, choose Events.

  1. Choose Add notification and configure a notification with the following settings:
  • Name– Enter a name of your choice. In this example, it is crawlerlambdaTrigger.
  • Events– Select the All object create events check box to create the AWS Glue Data Catalog when you upload the file.
  • Send to– Choose Lambda function.
  • Lambda– Choose the Lambda function that was created in the deployment section. Your Lambda function should contain the string GlueTriggerLambda.

See the following screenshot for all the settings. When you’re finished, choose Save.

For more details on configuring events, see How Do I Enable and Configure Event Notifications for an S3 Bucket? in the Amazon S3 Console User Guide.

Download the dataset

For this post, you use a publicly available New York green taxi dataset in CSV format. You upload monthly data to your raw zone and perform automated data cataloging using an AWS Glue crawler. After cataloging, an automated AWS Glue ETL job triggers to transform the monthly green taxi data to Parquet format and store it in the processed zone.

You can download the raw dataset from the NYC Taxi & Limousine Commission trip record data site. Download the monthly green taxi dataset and upload only one month of data. For example, first upload only the green taxi January 2018 data to the raw S3 bucket.

Automate the Data Catalog with an AWS Glue crawler

One of the important aspects of a modern data lake is to catalog the available data so that it’s easily discoverable. To run ETL jobs or ad hoc queries against your data lake, you must first determine the schema of the data along with other metadata information like location, format, and size. An AWS Glue crawler makes this process easy.

After you upload the data into the raw zone, the Amazon S3 trigger that you created earlier in the post invokes the GlueTriggerLambdafunction. This function creates an AWS Glue Data Catalog that stores metadata information inferred from the data that was crawled.

Open the AWS Glue console. You should see the database, table, and crawler that were created using the AWS CloudFormation template. Your AWS Glue crawler should appear as follows.

Browse to the table using the left navigation, and you will see the table in the database that you created earlier.

Choose the table name, and further explore the metadata discovered by the crawler, as shown following.

You can also view the columns, data types, and other details.  In following screenshot, Glue Crawler has created schema from files available in Amazon S3 by determining column name and respective data type. You can use this schema to create external table.

Author ETL jobs with AWS Glue

AWS Glue provides a managed Apache Spark environment to run your ETL job without maintaining any infrastructure with a pay as you go model.

Open the AWS Glue console and choose Jobs under the ETL section to start authoring an AWS Glue ETL job. Give the job a name of your choice, and note the name because you’ll need it later. Choose the already created IAM role with the name containing <stackname>– GlueLabRole, as shown following. Keep the other default options.

AWS Glue generates the required Python or Scala code, which you can customize as per your data transformation needs. In the Advanced properties section, choose Enable in the Job bookmark list to avoid reprocessing old data.

On the next page, choose your raw Amazon S3 bucket as the data source, and choose Next. On the Data target page, choose the processed Amazon S3 bucket as the data target path, and choose Parquet as the Format.

On the next page, you can make schema changes as required, such as changing column names, dropping ones that you’re less interested in, or even changing data types. AWS Glue generates the ETL code accordingly.

Lastly, review your job parameters, and choose Save Job and Edit Script, as shown following.

On the next page, you can modify the script further as per your data transformation requirements. For this post, you can leave the script as is. In the next section, you automate the execution of this ETL job.

Automate ETL job execution

As the frequency of data ingestion increases, you will want to automate the ETL job to transform the data. Automating this process helps reduce operational overhead and free your data engineering team to focus on more critical tasks.

AWS Glue is optimized for processing data in batches. You can configure it to process data in batches on a set time interval. How often you run a job is determined by how recent the end user expects the data to be and the cost of processing. For information about the different methods, see Triggering Jobs in AWS Glue in the AWS Glue Developer Guide.

First, you need to make one-time changes and configure your ETL job name in the Lambda function and the CloudWatch Events rule. On the console, open the ETLJobLambda Lambda function, which was created using the AWS CloudFormation stack.

Choose the Lambda function link that appears, and explore the code. Change the JobName value to the ETL job name that you created in the previous step, and then choose Save.

As shown in in the following screenshot, you will see an AWS CloudWatch Events rule CrawlerEventRule that is associated with an AWS Lambda function. When the CloudWatch Events rule receives a success status, it triggers the ETLJobLambda Lambda function.

Now you are all set to trigger your AWS Glue ETL job as soon as you upload a file in the raw S3 bucket. Before testing your data pipeline, set up the monitoring and alerts.

Monitoring and notification with Amazon CloudWatch Events

Suppose that you want to receive a notification over email when your AWS Glue ETL job is completed. To achieve that, the CloudWatch Events rule OpsEventRule was deployed from the AWS CloudFormation template in the data pipeline deployment section. This CloudWatch Events rule monitors the status of the AWS Glue ETL job and sends an email notification using an SNS topic upon successful completion of the job.

As the following image shows, you configure your AWS Glue job name in the Event pattern section in CloudWatch. The event triggers an SNS topic configured as a target when the AWS Glue job state changes to SUCCEEDED. This SNS topic sends an email notification to the email address that you provided in the deployment section to receive notification.

Let’s make one-time configuration changes in the CloudWatch Events rule OpsEventRule to capture the status of the AWS Glue ETL job.

  1. Open the CloudWatch console.
  2. In the navigation pane, under Events, choose Rules. Choose the rule name that contains OpsEventRule, as shown following.

  1. In the upper-right corner, choose Actions, Edit.

  1. Replace Your-ETL-jobName with the ETL job name that you created in the previous step.

  1. Scroll down and choose Configure details. Then choose Update rule.

Now that you have set up an entire data pipeline in an automated way with the appropriate notifications and alerts, it’s time to test your pipeline. If you upload new monthly data to the raw Amazon S3 bucket (for example, upload the NY green taxi February 2018 CSV), it triggers the GlueTriggerLambda AWS Lambda function. You can navigate to the AWS Glue console, where you can see that the AWS Glue crawler is running.

Upon completion of the crawler, the CloudWatch Events rule CrawlerEventRule triggers your ETLJobLambda Lambda function. You can notice now that the AWS Glue ETL job is running.

When the ETL job is successful, the CloudWatch Events rule OpsEventRule sends an email notification to you using an Amazon SNS topic, as shown following, hence completing the automation cycle.

Be sure to check your processed Amazon S3 bucket, where you will find transformed data processed by your automated ETL pipeline. Now that the processed data is ready in Amazon S3, you need to run the AWS Glue crawler on this Amazon S3 location. The crawler creates a metadata table with the relevant schema in the AWS Glue Data Catalog.

After the Data Catalog table is created, you can execute standard SQL queries using Amazon Athena and visualize the data using Amazon QuickSight. To learn more, see the blog post Harmonize, Query, and Visualize Data from Various Providers using AWS Glue, Amazon Athena, and Amazon QuickSight

Conclusion

Having an automated serverless data lake architecture lessens the burden of managing data from its source to destination—including discovery, audit, monitoring, and data quality. With an automated data pipeline across organizations, you can identify relevant datasets and extract value much faster than before. The advantage of reducing the time to analysis is that businesses can analyze the data as it becomes available in real time. From the BI tools, queries return results much faster for a single dataset than for multiple databases.

Business analysts can now get their job done faster, and data engineering teams can free themselves from repetitive tasks. You can extend it further by loading your data into a data warehouse like Amazon Redshift or making it available for machine learning via Amazon SageMaker.

Additional resources

See the following resources for more information:

 


About the Author

Saurabh Shrivastava is a partner solutions architect and big data specialist working with global systems integrators. He works with AWS partners and customers to provide them with architectural guidance for building scalable architecture in hybrid and AWS environments. He enjoys spending time with his family outdoors and traveling to new destinations to discover new cultures.

 

 

 

Luis Lopez Soria is a partner solutions architect and serverless specialist working with global systems integrators. He works with AWS partners and customers to help them with adoption of the cloud operating model at a large scale. He enjoys doing sports in addition to traveling around the world exploring new foods and cultures.

 

 

 

Chirag Oswal is a partner solutions architect and AR/VR specialist working with global systems integrators. He works with AWS partners and customers to help them with adoption of the cloud operating model at a large scale. He enjoys video games and travel.

How to visualize Amazon GuardDuty findings: serverless edition

Post Syndicated from Ben Romano original https://aws.amazon.com/blogs/security/how-to-visualize-amazon-guardduty-findings-serverless-edition/

Note: This blog provides an alternate solution to Visualizing Amazon GuardDuty Findings, in which the authors describe how to build an Amazon Elasticsearch Service-powered Kibana dashboard to ingest and visualize Amazon GuardDuty findings.

Amazon GuardDuty is a managed threat detection service powered by machine learning that can monitor your AWS environment with just a few clicks. GuardDuty can identify threats such as unusual API calls or potentially unauthorized users attempting to access your servers. Many customers also like to visualize their findings in order to generate additional meaningful insights. For example, you might track resources affected by security threats to see how they evolve over time.

In this post, we provide a solution to ingest, process, and visualize your GuardDuty finding logs in a completely serverless fashion. Serverless applications automatically run and scale in response to events you define, rather than requiring you to provision, scale, and manage servers. Our solution covers how to build a pipeline that ingests findings into Amazon Simple Storage Service (Amazon S3), transforms their nested JSON structure into tabular form using Amazon Athena and AWS Glue, and creates visualizations using Amazon QuickSight. We aim to provide both an easy-to-implement and cost-effective solution for consuming and analyzing your GuardDuty findings, and to more generally showcase a repeatable example for processing and visualizing many types of complex JSON logs.

Many customers already maintain centralized logging solutions using Amazon Elasticsearch Service (Amazon ES). If you want to incorporate GuardDuty findings with an existing solution, we recommend referencing this blog post to get started. If you don’t have an existing solution or previous experience with Amazon ES, if you prefer to use serverless technologies, or if you’re familiar with more traditional business intelligence tools, read on!

Before you get started

To follow along with this post, you’ll need to enable GuardDuty in order to start generating findings. See Setting Up Amazon GuardDuty for details if you haven’t already done so. Once enabled, GuardDuty will automatically generate findings as events occur. If you have public-facing compute resources in the same region in which you’ve enabled GuardDuty, you may soon find that they are being scanned quite often. All the more reason to continue reading!

You’ll also need Amazon QuickSight enabled in your account for the visualization sections of this post. You can find instructions in Setting Up Amazon QuickSight.

Architecture from end to end

 

Figure 1:  Complete architecture from findings to visualization

Figure 1: Complete architecture from findings to visualization

Figure 1 highlights the solution architecture, from finding generation all the way through final visualization. The steps are as follows:

  1. Deliver GuardDuty findings to Amazon CloudWatch Events
  2. Push GuardDuty Events to S3 using Amazon Kinesis Data Firehose
  3. Use AWS Lambda to reorganize S3 folder structure
  4. Catalog your GuardDuty findings using AWS Glue
  5. Configure Views with Amazon Athena
  6. Build a GuardDuty findings dashboard in Amazon QuickSight

Below, we’ve included an AWS CloudFormation template to launch a complete ingest pipeline (Steps 1-4) so that we can focus this post on the steps dedicated to building the actual visualizations (Steps 5-6). We cover steps 1-4 briefly in the next section to provide context, and we provide links to the pertinent pages in the documentation for those of you interested in building your own pipeline.
 
Select this image to open a link that starts building the CloudFormation stack

Ingest (Steps 1-4): Get Amazon GuardDuty findings into Amazon S3 and AWS Glue Data Catalog

 

Figure 2: In this section, we'll cover the services highlighted in blue

Figure 2: In this section, we’ll cover the services highlighted in blue

Step 1: Deliver GuardDuty findings to Amazon CloudWatch Events

GuardDuty has integration with and can deliver findings to Amazon CloudWatch Events. To perform this manually, follow the instructions in Creating a CloudWatch Events Rule and Target for GuardDuty.

Step 2: Push GuardDuty events to Amazon S3 using Kinesis Data Firehose

Amazon CloudWatch Events can write to an Kinesis Data Firehose delivery stream to store your GuardDuty events in S3, where you can use AWS Lambda, AWS Glue, and Amazon Athena to build the queries you’ll need to visualize the data. You can create your own delivery stream by following the instructions in Creating a Kinesis Data Firehose Delivery Stream and then adding it as a target for CloudWatch Events.

Step 3: Use AWS Lambda to reorganize Amazon S3 folder structure

Kinesis Data Firehose will automatically create a datetime-based file hierarchy to organize the findings as they come in. Due to the variability of the GuardDuty finding types, we recommend reorganizing the file hierarchy with a folder for each finding type, with separate datetime subfolders for each. This will make it easier to target findings that you want to focus on in your visualization. The provided AWS CloudFormation template utilizes an AWS Lambda function to rewrite the files in a new hierarchy as new files are written to S3. You can use the code provided in it along with Using AWS Lambda with S3 to trigger your own function that reorganizes the data. Once the Lambda function has run, the S3 bucket structure should look similar to the structure we show in figure 3.
 

Figure 3: Sample S3 bucket structure

Figure 3: Sample S3 bucket structure

Step 4: Catalog the GuardDuty findings using AWS Glue

With the reorganized findings stored in S3, use an AWS Glue crawler to scan and catalog each finding type. The CloudFormation template we provided schedules the crawler to run once a day. You can also run it on demand as needed. To build your own crawler, refer to Cataloging Tables with a Crawler. Assuming GuardDuty has generated findings in your account, you can navigate to the GuardDuty findings database in the AWS Glue Data Catalog. It should look something like figure 4:
 

Figure 4: List of finding type tables in the AWS Glue Catalog

Figure 4: List of finding type tables in the AWS Glue Catalog

Note: Because AWS Glue crawlers will attempt to combine similar data into one table, you might need to generate sample findings to ensure enough variability for each finding type to have its own table. If you only intend to build your dashboard from a small subset of finding types, you can opt to just edit the crawler to have multiple data sources and specify the folder path for each desired finding type.

Explore the table structure

Before moving on to the next step, take some time to explore the schema structure of the tables. Selecting one of the tables will bring you to a page that looks like what’s shown in figure 5.
 

Figure 5: Schema information for a single finding table

Figure 5: Schema information for a single finding table

You should see that most of the columns contain basic information about each finding, but there’s a column named detail that is of type struct. Select it to expand, as shown in figure 6.
 

Figure 6: The "detail" column expanded

Figure 6: The “detail” column expanded

Ah, this is where the interesting information is tucked away! The tables for each finding may differ slightly, but in all cases the detail column will hold the bulk of the information you’ll want to visualize. See GuardDuty Active Finding Types for information on what you should expect to find in the logs for each finding type. In the next step, we’ll focus on unpacking detail to prepare it for visualization!

Process (Step 5): Unpack nested JSON and configure views with Amazon Athena

 

Figure 7: In this section, we'll cover the services highlighted in blue

Figure 7: In this section, we’ll cover the services highlighted in blue

Note: This step picks up where the CloudFormation template finishes

Explore the table structure (again) in the Amazon Athena console

Begin by navigating to Athena from the AWS Management Console. Once there, you should see a drop-down menu with a list of databases. These are the same databases that are available in the AWS Glue Data Catalog. Choose the database with your GuardDuty findings and expand a table.
 

Figure 8: Expanded table in the Athena console

Figure 8: Expanded table in the Athena console

This should look very familiar to the table information you explored in step 4, including the detail struct!

You’ll need a method to unpack the struct in order to effectively visualize the data. There are many methods and tools to approach this problem. One that we recommend (and will show) is to use SQL queries within Athena to construct tabular views. This approach will allow you to push the bulk of the processing work to Athena. It will also allow you to simplify building visualizations when using Amazon QuickSight by providing a more conventional tabular format.

Extract details for use in visualization using SQL

The following examples contain SQL statements that will provide everything necessary to extract the necessary fields from the detail struct of the Recon:EC2/PortProbeUnprotectedPort finding to build the Amazon QuickSight dashboard we showcase in the next section. The examples also cover most of the operations you’ll need to work with the elements found in GuardDuty findings (such as deeply nested data with lists), and they serve as a good starting point for constructing your own custom queries. In general, you’ll want to traverse the nested layers (i.e. root.detail.service.count) and create new records for each item in an embedded list that you want to target using the UNNEST function. See this blog for even more examples of constructing queries on complex JSON data using Amazon Athena.

Simply copy the SQL statements that you want into the Athena query field to build the port_probe_geo and affected_instances views.

Note: If your account has yet to generate Recon:EC2/PortProbeUnprotectedPort findings, you can generate sample findings to follow along.


CREATE OR REPLACE VIEW "port_probe_geo" AS

WITH getportdetails AS (
    SELECT id, portdetails
    FROM by_finding_type
    CROSS JOIN UNNEST(detail.service.action.portProbeAction.portProbeDetails) WITH ORDINALITY AS p (portdetails, portdetailsindex)
)

SELECT 
    root.id AS id,
    root.region AS region,
    root.time AS time,
    root.detail.type AS type,
    root.detail.service.count AS count, 
    portdetails.localportdetails.port AS localport, 
    portdetails.localportdetails.portname AS localportname, 
    portdetails.remoteipdetails.geolocation.lon AS longitude, 
    portdetails.remoteipdetails.geolocation.lat AS latitude, 
    portdetails.remoteipdetails.country.countryname AS country, 
    portdetails.remoteipdetails.city.cityname AS city 

FROM 
    by_finding_type  as root, getPortDetails
    
WHERE 
    root.id = getportdetails.id

CREATE OR REPLACE VIEW "affected_instances" AS

SELECT 
    max(root.detail.service.count) AS count,
    date_parse(root.time,'%Y-%m-%dT%H:%i:%sZ') as time,
    root.detail.resource.instancedetails.instanceid

FROM 
    recon_ec2_portprobeunprotectedport  AS root

GROUP BY  
    root.detail.resource.instancedetails.instanceid, 
    time

Visualize (Step 6): Build a GuardDuty findings dashboard in Amazon QuickSight

 

Figure 9: In this section we will cover the services highlighted in blue

Figure 9: In this section we will cover the services highlighted in blue

Now that you’ve created tabular views using Athena, you can jump into Amazon QuickSight from the AWS Management Console and begin visualizing! If you haven’t already done so, enable Amazon QuickSight in your account by following the instructions for Setting Up Amazon QuickSight.

For this example, we’ll leverage the geo_port_probe view to build a geographic visualization and see the locations from which nefarious actors are launching port probes.

Creating an analysis

In the upper left-hand corner of the Amazon QuickSight console select New analysis and then New data set.
 

Figure 10: Create a new analysis

Figure 10: Create a new analysis

To utilize the views you built in the previous step, select Athena as the data source. Give your data source a name (in our example, we use “port probe geo”), and select the database that contains the views you created in the previous section. Then select Visualize.
 

Figure 11: Available data sources in Amazon QuickSight. Be sure to choose Athena!

Figure 11: Available data sources in Amazon QuickSight. Be sure to choose Athena!

 

Figure 12: Select the "port prob geo view" you created in step 5

Figure 12: Select the “port prob geo view” you created in step 5

Viz time!

From the Visual types menu in the bottom left corner, select the globe icon to create a map. Then select the latitude and longitude geospatial coordinates. Choose count (with a max aggregation) for size. Finally, select localportname to break the data down by color.
 

Figure 13: A visual containing a map of port probe scans in Amazon QuickSight

Figure 13: A visual containing a map of port probe scans in Amazon QuickSight

Voila! A detailed map of your environment’s attackers!

Build out a dashboard

Once you like how everything looks, you can move on to adding more visuals to create a full monitoring dashboard.

To add another visual to the analysis, select Add and then Add visual.
 

Figure 14: Add another visual using the 'Add' option from the Amazon QuickSight menu bar

Figure 14: Add another visual using the ‘Add’ option from the Amazon QuickSight menu bar

If the new visual will use the same dataset, then you can immediately start selecting fields to build it. If you want to create a visual from a different data set (our example dashboard below adds the affected_instances view), follow the Creating Data Sets guide to add a new data set. Then return to the current analysis and associate the data set with the analysis by selecting the pencil icon shown below and selecting Add data set.
 

Figure 15: Adding a new data set to your Amazon QuickSight analysis

Figure 15: Adding a new data set to your Amazon QuickSight analysis

Repeat this process until you’ve built out everything you need in your monitoring dashboard. Once it’s completed, you can publish the dashboard by selecting Share and then Publish dashboard.
 

Figure 16: Publish your dashboard using the "Share" option of the Amazon QuickSight menu

Figure 16: Publish your dashboard using the “Share” option of the Amazon QuickSight menu

Here’s an example of a dashboard we created using the port_probe_geo and affected_instances views:
 

Figure 17: An example dashboard created using the "port_probe_geo" and "affected_instances" views

Figure 17: An example dashboard created using the “port_probe_geo” and “affected_instances” views

What does something like this cost?

To get an idea of the scale of the cost, we’ve provided a small pricing example (accurate as of the writing of this blog) that assumes 10,000 GuardDuty findings per month with an average payload size of 5KB.

ServicePricing StructureAmount ConsumedTotal Cost
Amazon CloudWatch Events$1 per million events/td>

10000 events $0.01
Amazon Kinesis Data Firehose$0.029 per GB ingested0.05GB ingested $0.00145
Amazon S3$0.029 per GB stored per month0.1GB stored $0.00230
AWS LambdaFirst million invocations free~200 invocations $0
Amazon Athena$5 per TB Scanned0.003TB scanned (Assume 2 full data scans per day to refresh views) $0.015
AWS Glue$0.44 per DPU hour (2 DPU minimum and 10 minute minimum) = $0.15 per crawler run30 crawler runs $4.50
Total Processing Cost$4.53

Oh, the joys of a consumption-based model: Less than five dollars per month for all of that processing!

From here, all that remains are your visualization costs using Amazon QuickSight. This pricing is highly dependent upon your number of users and their respective usage patterns. See the Amazon QuickSight pricing page for more specific details.

Summary

In this post, we demonstrated how you can ingest your GuardDuty findings into S3, process them with AWS Glue and Amazon Athena, and visualize with Amazon QuickSight. All serverless! Each portion of what we showed can be used in tandem or on its own for this or many other data sets. Go launch the template and get started monitoring your AWS environment!

Want more AWS Security how-to content, news, and feature announcements? Follow us on Twitter.

Author

Ben Romano

Ben is a Solutions Architect in AWS supporting customers in their journey to the cloud with a focus on big data solutions. Ben loves to delight customers by diving deep on AWS technologies and helping them achieve their business and technology objectives.

Author

Jimmy Boyle

Jimmy is a Solutions Architect in AWS with a background in software development. He enjoys working with all things serverless because he doesn’t have to maintain infrastructure. Jimmy enjoys delighting customers to drive their business forward and design solutions that will scale as their business grows.

Create real-time clickstream sessions and run analytics with Amazon Kinesis Data Analytics, AWS Glue, and Amazon Athena

Post Syndicated from Hugo Rozestraten original https://aws.amazon.com/blogs/big-data/create-real-time-clickstream-sessions-and-run-analytics-with-amazon-kinesis-data-analytics-aws-glue-and-amazon-athena/

Clickstream events are small pieces of data that are generated continuously with high speed and volume. Often, clickstream events are generated by user actions, and it is useful to analyze them.

For example, you can detect user behavior in a website or application by analyzing the sequence of clicks a user makes, the amount of time the user spends, where they usually begin the navigation, and how it ends. By tracking this user behavior in real time, you can update recommendations, perform advanced A/B testing, push notifications based on session length, and much more. To track and analyze these events, you need to identify and create sessions from them. The process of identifying events in the data and creating sessions is known as sessionization.

Capturing and processing data clickstream events in real time can be difficult. As the number of users and web and mobile assets you have increases, so does the volume of data. Amazon Kinesis provides you with the capabilities necessary to ingest this data in real time and generate useful statistics immediately so that you can take action.

When you run sessionization on clickstream data, you identify events and assign them to a session with a specified key and lag period. After each event has a key, you can perform analytics on them. The use cases for sessionization vary widely, and have different requirements. For example, you might need to identify and create sessions from events in web analytics to track user actions. Sessionization is also broadly used across many different areas, such as log data and IoT.

This blog post demonstrates how to identify and create sessions from real-time clickstream events and then analyze them using Amazon Kinesis Data Analytics.

Why did we choose Kinesis Data Analytics?

Clickstream data arrives continuously as thousands of messages per second receiving new events. When you analyze the effectiveness of new application features, site layout, or marketing campaigns, it is important to analyze them in real time so that you can take action faster.

To perform the sessionization in batch jobs, you could use a tool such as AWS Glue or Amazon EMR. But with daily schedules, queries and aggregation, it can take more resources and time because each aggregation involves working with large amounts of data. Performing sessionization in Kinesis Data Analytics takes less time and gives you a lower latency between the sessions generation. You can trigger real-time alerts with AWS Lambda functions based on conditions, such as session time that is shorter than 20 seconds, or a machine learning endpoint.

Identifying a session among thousands of clicks

A session is a short-lived and interactive exchange between two or more devices and/or users. For example, it can be a user browsing and then exiting your website, or an IoT device waking up to perform a job and then going back to sleep. These interactions result in a series of events that occur in sequence that start and end, or a session. A start and an end of a session can be difficult to determine, and are often defined by a time period without a relevant event associated with a user or device. A session starts when a new event arrives after a specified “lag” time period has passed without an event arriving. A session ends in a similar manner, when a new event does not arrive within the specified lag period.

This blog post relies on several other posts about performing batch analytics on SQL data with sessions. My two favorite posts on this subject are Sessionization in SQL, Hive, Pig and Python from Dataiku and Finding User Session with SQL by Benn Stancil at Mode. Both posts take advantage of SQL window functions to identify and build sessions from clickstream events.

ANSI added SQL window functions to the SQL standard in 2003 and has since expanded them. Window functions work naturally with streaming data and enable you to easily translate batch SQL examples to Kinesis Data Analytics.

In this use case, I group the events of a specific user as described in the following simplified example. In this example, I use distinct navigation patterns from three users to analyze user behavior. To begin, I group events by user ID to obtain some statistics from data, as shown following:

In this example, for “User ID 20,” the minimum timestamp is 2018-11-29 23:35:10 and the maximum timestamp is 2018-11-29 23:35:44. This provides a 34 seconds-long session, starting with action “B_10” and ending with action “A_02.” These “actions” are identification of the application’s buttons in this example.

Suppose that after several minutes, new “User ID 20” actions arrive. Would you consider them as running in the same session? A user can abort a navigation or start a new one. Also, applications often have timeouts. You have to decide what is the maximum session length to consider it a new session. A session can run anywhere from 20 to 50 seconds, or from 1 to 5 minutes.

There are other elements that you might want to consider, such as a client IP or a machine ID. These elements allow you to separate sessions that occur on different devices.

High-level solution overview

The end-to-end scenario described in this post uses Amazon Kinesis Data Streams to capture the clickstream data and Kinesis Data Analytics to build and analyze the sessions. The aggregated analytics are used to trigger real-time events on Lambda and then send them to Kinesis Data Firehose. Kinesis Data Firehose sends data to an Amazon S3 bucket, where it is ingested to a table by an AWS Glue crawler and made available for running queries with Amazon Athena. You can use this table for ad hoc analysis.

The following diagram shows an end-to-end sessionization solution.

  • Data ingestion: You can use Kinesis Data Streams to build custom applications that process or analyze streaming data for specialized needs. Kinesis Data Streams can continuously capture and store terabytes of data per hour from hundreds of thousands of sources, such as website clickstreams, financial transactions, social media feeds, IT logs, and location-tracking events.
  • Data sessionization: Kinesis Data Analytics is the easiest way to process streaming data in real time with standard SQL without having to learn new programming languages or processing frameworks. With Kinesis Data Analytics, you can query streaming data or build entire streaming applications using SQL, so that you can gain actionable insights and respond to your business and customer needs promptly.
  • Data processing and storage: The sessionization stream is read from Kinesis Data Analytics using an AWS Lambda function. The function triggers two events: one real-time dashboard in Amazon CloudWatch and a second one to persist data with Kinesis Data Firehose.
  • Data analysis: AWS Glue is used to crawl Amazon S3 and build or update metadata definition for Amazon Athena tables.

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena provides connectivity to any application using JDBC or ODBC drivers.

  • Data visualization: Amazon QuickSight is a visualization tool that is natively used to build dashboards over Amazon Athena data.
  • Monitoring: Amazon CloudWatch is a tool that lets you monitor the streaming activities, such as the number of bytes processed or delivered per second, or the number of failures.

After you finish the sessionization stage in Kinesis Data Analytics, you can output data into different tools. For example, you can use a Lambda function to process the data on the fly and take actions such as send SMS alerts or roll back a deployment. To learn how to implement such workflows based on AWS Lambda output, see my other blog post Implement Log Analytics using Amazon Kinesis Data Analytics. In this post, we send data to Amazon CloudWatch, and build a real-time dashboard.

Lambda clickstream generator

To generate the workload, you can use a Python Lambda function with random values, simulating a beer-selling application.

The same user ID can have sessions on different devices, such as a tablet, a browser, or a phone application. This information is captured by the device ID. As a result, the data for the Lambda function payload has these parameters: a user ID, a device ID, a client event, and a client timestamp, as shown in the following example.

The following is the code for the Lambda function payload generator, which is scheduled using CloudWatch Events scheduled events:

...
def getReferrer():
    x = random.randint(1,5)
    x = x*50 
    y = x+30 
    data = {}
    data['user_id'] = random.randint(x,y)
    data['device_id'] = random.choice(['mobile','computer', 'tablet', 'mobile','computer'])
    data['client_event'] = random.choice(['beer_vitrine_nav','beer_checkout','beer_product_detail',
    'beer_products','beer_selection','beer_cart'])
    now = datetime.datetime.now()
    str_now = now.isoformat()
    data['client_timestamp'] = str_now
    return data

def lambda_handler(event, context):
...
        data = json.dumps(getReferrer())
        kinesis.put_record(
                StreamName='sessionsclicks',
                Data=data,
                PartitionKey='partitionkey')

As a result, the following payloads are sent to Kinesis Data Analytics:

Using window SQL functions in Kinesis Data Analytics

Grouping sessions lets us combine all the events from a given user ID or a device ID that occurred during a specific time period. Amazon Kinesis Data Analytics SQL queries in your application code execute continuously over in-application streams. You need to specify bounded queries using a window defined in terms of time or rows. These queries are called window SQL functions.

I had three available options for windowed query functions in Kinesis Data Analytics: sliding windows, tumbling windows, and stagger windows. I chose stagger window because it has some good features for the sessionization use case, as follows:

  • Stagger windows open when the first event that matches a partition key condition arrives. So for each key, it evaluates its particular window as opposed to the other window functions that evaluate one unique window for all the partition keys matched.
  • When dealing with clickstreams, you cannot rely on the order that events arrive in the stream, but when the stream was generated. Stagger windows handle the arrival of out-of-order events well. The time when the window is opened and when the window closes is considered based on the age specified, which is measured from the time when the window opened.

To partition by the timestamp, I chose to write two distinct SQL functions.

In Kinesis Data Analytics, SOURCE_SQL_STREAM_001 is by default the main stream from the source. In this case, it’s receiving the source payload from Kinesis Data Streams.

Kinesis Data Analytics SQL – Create a stream

The following function creates a stream to receive the query aggregation result:

-- CREATE a Stream to receive the query aggregation result
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"
(
  session_id VARCHAR(60),
  user_id INTEGER,
  device_id VARCHAR(10),
  timeagg timestamp,
  events INTEGER,
  beginnavigation VARCHAR(32),
  endnavigation VARCHAR(32),
  beginsession VARCHAR(25),
  endsession VARCHAR(25),
  duration_sec INTEGER
);

Kinesis Data Analytics SQL – Using a SECOND interval “STEP” function

The following function creates the PUMP and inserts it as SELECT to STREAM:

-- Create the PUMP
CREATE OR REPLACE PUMP "WINDOW_PUMP_SEC" AS INSERT INTO "DESTINATION_SQL_STREAM"
-- Insert as Select 
    SELECT  STREAM
-- Make the Session ID using user_ID+device_ID and Timestamp
    UPPER(cast("user_id" as VARCHAR(3))|| '_' ||SUBSTRING("device_id",1,3)
    ||cast( UNIX_TIMESTAMP(STEP("client_timestamp" by interval '30' second))/1000 as VARCHAR(20))) as session_id,
    "user_id" , "device_id",
-- create a common rounded STEP timestamp for this session
    STEP("client_timestamp" by interval '30' second),
-- Count the number of client events , clicks on this session
    COUNT("client_event") events,
-- What was the first navigation action
    first_value("client_event") as beginnavigation,
-- what was the last navigation action    
    last_value("client_event") as endnavigation,
-- begining minute and second  
    SUBSTRING(cast(min("client_timestamp") AS VARCHAR(25)),15,19) as beginsession,
-- ending minute and second      
    SUBSTRING(cast(max("client_timestamp") AS VARCHAR(25)),15,19) as endsession,
-- session duration    
    TSDIFF(max("client_timestamp"),min("client_timestamp"))/1000 as duration_sec
-- from the source stream    
    FROM "SOURCE_SQL_STREAM_001"
-- using stagger window , with STEP to Seconds, for Seconds intervals    
    WINDOWED BY STAGGER (
                PARTITION BY "user_id", "device_id", STEP("client_timestamp" by interval '30' second) 
                RANGE INTERVAL '30' SECOND );

Kinesis Data Analytics SQL – Using a MINUTE interval “FLOOR” function

The following code creates the PUMP and inserts as SELECT to STREAM:

-- Create the PUMP
CREATE OR REPLACE PUMP "WINDOW_PUMP_MIN" AS INSERT INTO "DESTINATION_SQL_STREAM"
-- Insert as Select 
SELECT  STREAM
-- Make the Session ID using user_ID+device_ID and Timestamp
UPPER(cast("user_id" as VARCHAR(3))|| '_' ||SUBSTRING("device_id",1,3)
||cast(UNIX_TIMESTAMP(FLOOR("client_timestamp" TO MINUTE))/1000 as VARCHAR(20))) as session_id,
"user_id" , "device_id",
-- create a common rounded timestamp for this session
FLOOR("client_timestamp" TO MINUTE),
-- Count the number of client events , clicks on this session
COUNT("client_event") events,
-- What was the first navigation action
first_value("client_event") as beginnavigation,
-- what was the last navigation action
last_value("client_event") as endnavigation,
-- begining minute and second
SUBSTRING(cast(min("client_timestamp") AS VARCHAR(25)),15,19) as beginsession,
-- ending minute and second
SUBSTRING(cast(max("client_timestamp") AS VARCHAR(25)),15,19) as endsession,
-- session duration
TSDIFF(max("client_timestamp"),min("client_timestamp"))/1000 as duration_sec
-- from the source stream
FROM "SOURCE_SQL_STREAM_001"
-- using stagger window , with floor to Minute, for Minute intervals
WINDOWED BY STAGGER (
            PARTITION BY "user_id", "device_id", FLOOR("client_timestamp" TO MINUTE) 
            RANGE INTERVAL '1' MINUTE);

Sessions

In Kinesis Data Analytics, you can view the resulting data transformed by the SQL, with the sessions identification and information. Session_ID is calculated by User_ID + (3 Chars) of DEVICE_ID + rounded Unix timestamp without the milliseconds.

Automated deployment with AWS CloudFormation

All the steps of this end-to-end solution are included in an AWS CloudFormation template. Fire up the template, add the code on your web server, and voilà, you get real-time sessionization.

This AWS CloudFormation template is intended to be deployed only in the us-east-1 Region.

Create the stack

Step 1: To get started, sign into the AWS Management Console, and then open the stagger window template.

Step 2: On the AWS CloudFormation console, choose Next, and complete the AWS CloudFormation parameters:

  • Stack name: The name of the stack (blog-sessionization or sessions-blog)
  • StreamName: sessionsblog
  • Stream Shard Count: 1 or 2 (1 MB/s) per shard.
  • Bucket Name:  Change to a unique name, for example session-n-bucket-hhug123121.
  • Buffer Interval: 60–900 seconds buffering hint for Kinesis Data Firehose before the data is send to Amazon S3 from Kinesis Data Firehose.
  • Buffer Size: 1–128 MB per file, if the interval is not achieved first.
  • Destination Prefix: Aggregated (internal folder of the bucket to save aggregated data).
  • Base sessions on seconds or minutes: Choose which you want (minutes will start with 1 minute, seconds will start with 30 seconds).

Step 3: Check if the launch has completed, and if it has not, check for errors.

The most common error is when you point to an Amazon S3 bucket that already exists.

Process the data

Step 1: After the deployment, navigate to the solution on the Amazon Kinesis console.

Step 2: Go to the Kinesis Analytics applications page, and choose AnalyticsApp-blog-sessionizationXXXXX, as follows.

Step 3: Choose Run application to start the application.

Step 4: Wait a few seconds for the application to be available, and then choose Application details.

Step 5: On the Application details page, choose Go to SQL results.

Step 6: Examine the SQL code and SOURCE_SQL_STREAM, and change the INTERVAL if you’d like.

Step 7: Choose the Real-time analytics tab to check the DESTINATION_SQL_STREAM results.

 

Step 8: Check the Destination tab to view the AWS Lambda function as the destination to your aggregation.

Step 8: Check the CloudWatch real-time dashboard.

Open the Sessionization-<your cloudformation stack name> dashboard.

Check the number of “events” during the sessions, and the “session duration” behavior from a timeframe. Then you can make decisions, such as whether you need to roll back a new site layout or new features of your application.

Step 9: Open the AWS Glue console and run the crawler that the AWS CloudFormation template created for you.

Choose the crawler job, and then choose Run crawler.

Analyze the data

Step 1: After the job finishes, open the Amazon Athena console and explore the data.

On the Athena console, choose the sessionization database in the list. You should see two tables created based on the data in Amazon S3: rawdata and aggregated.

Step 2: Choose the vertical ellipsis (three dots) on the right side to explore each of the tables, as shown in the following screenshots.

Step 3: Create a view on the Athena console to query only today’s data from your aggregated table, as follows:

CREATE OR REPLACE VIEW clicks_today AS
SELECT 
*
FROM "aggregated" 
WHERE
cast(partition_0 as integer)=year(current_date) and
cast(partition_1 as integer)=month(current_date) and
cast(partition_2 as integer)=day(current_date) ;

The successful query appears on the console as follows:

Step 4: Create a view to query only the current month data from your aggregated table, as in the following example:

CREATE OR REPLACE VIEW clicks_month AS
SELECT 
*
FROM "aggregated" 
WHERE
cast(partition_0 as integer)=year(current_date) and
cast(partition_1 as integer)=month(current_date) ;

The successful query appears as follows:

Step 5: Query data with the sessions grouped by the session duration ordered by sessions, as follows:

SELECT duration_sec, count(1) sessions 
FROM "clicks_today"
where duration_sec>0
group by duration_sec
order by sessions desc;

The query results appear as follows:

Visualize the data

Step 1: Open the Amazon QuickSight console.

If you have never used Amazon QuickSight, perform this setup first.

Step 2: Set up Amazon QuickSight account settings to access Athena and your S3 bucket.

First, select the Amazon Athena check box. Select the Amazon S3 check box to edit Amazon QuickSight access to your S3 buckets.

Choose the buckets that you want to make available, and then choose Select buckets.

Step 3: Choose Manage data.

Step 4: Choose NEW DATASET.

In the list of data sources, choose Athena.

Step 5: Enter daily_session as your data source name.

Step 6: Choose the view that you created for daily sessions, and choose Select.

Step 7: Then you can choose to use either SPICE (cache) or direct query access.

Step 8: Choose beginnavigation and duration_sec as metrics.

Step 9: Choose +Add to add a new visualization.

Step 10: In Visual types, choose the Tree map graph type.

Step 11: For Group by, choose device_id; for Size, choose duration_sec (Sum); and for Color, choose events (Sum).

Summary

In this post, I described how to perform sessionization of clickstream events and analyze them in a serverless architecture. The use of a Kinesis Data Analytics stagger window makes the SQL code short and easy to write and understand. The integration between the services enables a complete data flow with minimal coding.

You also learned about ways to explore and visualize this data using Amazon Athena, AWS Glue, and Amazon QuickSight.

To learn more about the Amazon Kinesis family of use cases, check the Amazon Kinesis Big Data Blog page.

If you have questions or suggestions, please leave a comment below.

Do more with Amazon Kinesis Data Analytics

To explore other ways to gain insights using Kinesis Data Analytics, see Real-time Clickstream Anomaly Detection with Amazon Kinesis Analytics.

 


About the Author

Hugo is an analytics and database specialist solutions architect at Amazon Web Services out of São Paulo (Brazil). He is currently engaged with several Data Lake and Analytics projects for customers in Latin America. He loves family time, dogs and mountain biking.

 

 

 

 

Our data lake story: How Woot.com built a serverless data lake on AWS

Post Syndicated from Karthik Kumar Odapally original https://aws.amazon.com/blogs/big-data/our-data-lake-story-how-woot-com-built-a-serverless-data-lake-on-aws/

In this post, we talk about designing a cloud-native data warehouse as a replacement for our legacy data warehouse built on a relational database.

At the beginning of the design process, the simplest solution appeared to be a straightforward lift-and-shift migration from one relational database to another. However, we decided to step back and focus first on what we really needed out of a data warehouse. We started looking at how we could decouple our legacy Oracle database into smaller microservices, using the right tool for the right job. Our process wasn’t just about using the AWS tools. More, it was about having a mind shift to use cloud-native technologies to get us to our final state.

This migration required developing new extract, transform, load (ETL) pipelines to get new data flowing in while also migrating existing data. Because of this migration, we were able to deprecate multiple servers and move to a fully serverless data warehouse orchestrated by AWS Glue.

In this blog post, we are going to show you:

  • Why we chose a serverless data lake for our data warehouse.
  • An architectural diagram of Woot’s systems.
  • An overview of the migration project.
  • Our migration results.

Architectural and design concerns

Here are some of the design points that we considered:

  • Customer experience. We always start with what our customer needs, and then work backwards from there. Our data warehouse is used across the business by people with varying level of technical expertise. We focused on the ability for different types of users to gain insights into their operations and to provide better feedback mechanisms to improve the overall customer experience.
  • Minimal infrastructure maintenance. The “Woot data warehouse team” is really just one person—Chaya! Because of this, it’s important for us to focus on AWS services that enable us to use cloud-native technologies. These remove the undifferentiated heavy lifting of managing infrastructure as demand changes and technologies evolve.
  • Responsiveness to data source changes. Our data warehouse gets data from a range of internal services. In our existing data warehouse, any updates to those services required manual updates to ETL jobs and tables. The response times for these data sources are critical to our key stakeholders. This requires us to take a data-driven approach to selecting a high-performance architecture.
  • Separation from production systems. Access to our production systems is tightly coupled. To allow multiple users, we needed to decouple it from our production systems and minimize the complexities of navigating resources in multiple VPCs.

Based on these requirements, we decided to change the data warehouse both operationally and architecturally. From an operational standpoint, we designed a new shared responsibility model for data ingestion. Architecturally, we chose a serverless model over a traditional relational database. These two decisions ended up driving every design and implementation decision that we made in our migration.

As we moved to a shared responsibility model, several important points came up. First, our new way of data ingestion was a major cultural shift for Woot’s technical organization. In the past, data ingestion had been exclusively the responsibility of the data warehouse team and required customized pipelines to pull data from services. We decided to shift to “push, not pull”: Services should send data to the data warehouse.

This is where shared responsibility came in. For the first time, our development teams had ownership over their services’ data in the data warehouse. However, we didn’t want our developers to have to become mini data engineers. Instead, we had to give them an easy way to push data that fit with the existing skill set of a developer. The data also needed to be accessible by the range of technologies used by our website.

These considerations led us to select the following AWS services for our serverless data warehouse:

The following diagram shows at a high level how we use these services.

Tradeoffs

These components together met all of our requirements and enabled our shared responsibility model. However, we made few tradeoffs compared to a lift-and-shift migration to another relational database:

  • The biggest tradeoff was upfront effort vs. ongoing maintenance. We effectively had to start from scratch with all of our data pipelines and introduce a new technology into all of our website services, which required a concerted effort across multiple teams. Minimal ongoing maintenance was a core requirement. We were willing to make this tradeoff to take advantage of the managed infrastructure of the serverless components that we use.
  • Another tradeoff was balancing usability for nontechnical users vs. taking advantage of big data technologies. Making customer experience a core requirement helped us navigate the decision-making when considering these tradeoffs. Ultimately, only switching to another relational database would mean that our customers would have the same experience, not a better one.

Building data pipelines with Kinesis Data Firehose and Lambda

Because our site already runs on AWS, using an AWS SDK to send data to Kinesis Data Firehose was an easy sell to developers. Things like the following were considerations:

  • Direct PUT ingestion for Kinesis Data Firehose is natural for developers to implement, works in all languages used across our services, and delivers data to Amazon S3.
  • Using S3 for data storage means that we automatically get high availability, scalability, and durability. And because S3 is a global resource, it enables us to manage the data warehouse in a separate AWS account and avoid the complexity of navigating multiple VPCs.

We also consume data stored in Amazon DynamoDB tables. Kinesis Data Firehose again provided the core of the solution, this time combined with DynamoDB Streams and Lambda. For each DynamoDB table, we enabled DynamoDB Streams and then used the stream to trigger a Lambda function.

The Lambda function cleans the DynamoDB stream output and writes the cleaned JSON to Kinesis Data Firehose using boto3. After doing this, it converges with the other process and outputs the data to S3. For more information, see How to Stream Data from Amazon DynamoDB to Amazon Aurora using AWS Lambda and Amazon Kinesis Firehose on the AWS Database Blog.

Lambda gave us more fine-grained control and enabled us to move files between accounts:

  • We enabled S3 event notifications on the S3 bucket and created an Amazon SNS topic to receive notifications whenever Kinesis Data Firehose put an object in the bucket.
  • The SNS topic triggered a Lambda function, which took the Kinesis output and moved it to the data warehouse account in our chosen partition structure.

S3 event notifications can trigger Lambda functions, but we chose SNS as an intermediary because the S3 bucket and Lambda function were in separate accounts.

Migrating existing data with AWS DMS and AWS Glue

We needed to migrate data from our existing RDS database to S3, which we accomplished with AWS DMS. DMS natively supports S3 as a target, as described in the DMS documentation.

Setting this up was relatively straightforward. We exported data directly from our production VPC to the separate data warehouse account by tweaking the connection attributes in DMS. The string that we used was this:

"cannedAclForObjects=BUCKET_OWNER_FULL_CONTROL;compressionType=GZIP;addColumnName=true;”

This code gives ownership to the bucket owner (the destination data warehouse account), compresses the files to save on storage costs, and includes all column names. After the data was in S3, we used an AWS Glue crawler to infer the schemas of all exported tables and then compared against the source data.

With AWS Glue, some of the challenges we overcame were these:

  • Unstructured text data, such as forum and blog posts. DMS exports these to CSV. This approach conflicted with the commas present in the text data. We opted to use AWS Glue to export data from RDS to S3 in Parquet format, which is unaffected by commas because it encodes columns directly.
  • Cross-account exports. We resolved this by including the code

"glueContext._jsc.hadoopConfiguration().set("fs.s3.canned.acl", "BucketOwnerFullControl”)”

at the top of each AWS Glue job to grant bucket owner access to all S3 files produced by AWS Glue.

Overall, AWS DMS was quicker to set up and great for exporting large amounts of data with rule-based transformations. AWS Glue required more upfront effort to set up jobs, but provided better results for cases where we needed more control over the output.

If you’re looking to convert existing raw data (CSV or JSON) into Parquet, you can set up an AWS Glue job to do that. The process is described in the AWS Big Data Blog post Build a data lake foundation with AWS Glue and Amazon S3.

Bringing it all together with AWS Glue, Amazon Athena, and Amazon QuickSight

After data landed in S3, it was time for the real fun to start: actually working with the data! Can you tell I’m a data engineer? For me, a big part of the fun was exploring AWS Glue:

  • AWS Glue handles our ETL job scheduling.
  • AWS Glue crawlers manage the metadata in the AWS Glue Data Catalog.

Crawlers are the “secret sauce” that enables us to be responsive to schema changes. Throughout the pipeline, we chose to make each step as schema-agnostic as possible, which allows any schema changes to flow through until they reach AWS Glue.

However, raw data is not ideal for most of our business users, because it often has duplicates or incorrect data types. Most importantly, the data out of Firehose is in JSON format, but we quickly observed significant query performance gains from using Parquet format. Here, we used one of the performance tips in the Big Data Blog post Top 10 performance tuning tips for Amazon Athena.

With our shared responsibility model, the data warehouse and BI teams are responsible for the final processing of data into curated datasets ready for reporting. Using Lambda and AWS Glue enables these teams to work in Python and SQL (the core languages for Amazon data engineering and BI roles). It also enables them to deploy code with minimal infrastructure setup or maintenance.

Our ETL process is as follows:

  • Scheduled triggers.
  • Series of conditional triggers that control the flow of subsequent jobs that depend on previous jobs.
  • A similar pattern across many jobs of reading in the raw data, deduplicating the data, and then writing to Parquet. We centralized this logic by creating a Python library of functions and uploading it to S3. We then included that library in the AWS Glue job as an additional Python library. For more information on how to do this, see Using Python Libraries with AWS Glue in the AWS Glue documentation.

We also migrated complex jobs used to create reporting tables with business metrics:

  • The AWS Glue use of PySpark simplified the migration of these queries, because you can embed SparkSQL queries directly in the job.
  • Converting to SparkSQL took some trial and error, but ultimately required less work than translating SQL queries into Spark methods. However, for people on our BI team who had previously worked with Pandas or Spark, working with Spark dataframes was a natural transition. As someone who used SQL for several years before learning Python, I appreciate that PySpark lets me quickly switch back and forth between SQL and an object-oriented framework.

Another hidden benefit of using AWS Glue jobs is that the AWS Glue version of Python (like Lambda) already has boto3 installed. Thus, ETL jobs can directly use AWS API operations without additional configuration.

For example, some of our longer-running jobs created read inconsistency if a user happened to query that table while AWS Glue was writing data to S3. We modified the AWS Glue jobs to write to a temporary directory with Spark and then used boto3 to move the files into place. Doing this reduced read inconsistency by up to 90 percent. It was great to have this functionality readily available, which may not have been the case if we managed our own Spark cluster.

Comparing previous state and current state

After we had all the datasets in place, it was time for our customers to come on board and start querying. This is where we really leveled up the customer experience.

Previously, users had to download a SQL client, request a user name and password, set it up, and learn SQL to get data out. Now, users just sign in to the AWS Management Console through automatically provisioned IAM roles and run queries in their browser with Athena. Or if they want to skip SQL altogether, they can use our Amazon QuickSight account with accounts managed through our pre-existing Active Directory server.

Integration with Active Directory was a big win for us. We wanted to enable users to get up and running without having to wait for an account to be created or managing separate credentials. We already use Active Directory across the company for access to multiple resources. Upgrading to Amazon QuickSight Enterprise Edition enabled us to manage access with our existing AD groups and credentials.

Migration results

Our legacy data warehouse was developed over the course of five years. We recreated it as a serverless data lake using AWS Glue in about three months.

In the end, it took more upfront effort than simply migrating to another relational database. We also dealt with more uncertainty because we used many products that were relatively new to us (especially AWS Glue).

However, in the months since the migration was completed, we’ve gotten great feedback from data warehouse users about the new tools. Our users have been amazed by these things:

  • How fast Athena is.
  • How intuitive and beautiful Amazon QuickSight is. They love that no setup is required—it’s easy enough that even our CEO has started using it!
  • That Athena plus the AWS Glue Data Catalog have given us the performance gains of a true big data platform, but for end users it retains the look and feel of a relational database.

Summary

From an operational perspective, the investment has already started to pay off. Literally: Our operating costs have fallen by almost 90 percent.

Personally, I was thrilled that recently I was able to take a three-week vacation and didn’t get paged once, thanks to the serverless infrastructure. And for our BI engineers in addition to myself, the S3-centric architecture is enabling us to experiment with new technologies by integrating seamlessly with other services, such as Amazon EMR, Amazon SageMaker, Amazon Redshift Spectrum, and Lambda. It’s been exciting to see how these services have grown in the time since we’ve adopted them (for example, the recent AWS Glue launch of Amazon CloudWatch metrics and Athena’s launch of views).

We are thrilled that we’ve invested in technologies that continue to grow as we do. We are incredibly proud of our team for accomplishing this ambitious migration. We hope our experience can inspire other engineers to dive in to building a data lake of their own.

For additional information, see these similar AWS Big Data blog posts:


About the authors

Chaya Carey is a data engineer at Woot.com. At Woot, she’s responsible for managing the data warehouse and other scalable data solutions. Outside of work, she’s passionate about Seattle’s bar and restaurant scene, books, and video games.

 

 

 

Karthik Odapally is a senior solutions architect at AWS. His passion is to build cost-effective and highly scalable solutions on the cloud. In his spare time, he bakes cookies and cupcakes for family and friends here in the PNW. He loves vintage racing cars.

 

 

 

 

Building Simpler Genomics Workflows on AWS Step Functions

Post Syndicated from Christie Gifrin original https://aws.amazon.com/blogs/compute/building-simpler-genomics-workflows-on-aws-step-functions/

This post is courtesy of Ryan Ulaszek, AWS Genomics Partner Solutions Architect and Aaron Friedman, AWS Healthcare and Life Sciences Partner Solutions Architect

In 2017, we published a four part blog series on how to build a genomics workflow on AWS. In part 1, we introduced a general architecture highlighting three common layers: job, batch and workflow.  In part 2, we described building the job layer with Docker and Amazon Elastic Container Registry (Amazon ECR).  In part 3, we tackled the batch layer and built a batch engine using AWS Batch.  In part 4, we built out the workflow layer using AWS Step Functions and AWS Lambda.

Since then, we’ve worked with many AWS customers and APN partners to implement this solution in genomics as well as in other workloads-of-interest. Today, we wanted to highlight a new feature in Step Functions that simplifies how customers and partners can build high-throughput genomics workflows on AWS.

Step Functions now supports native integration with AWS Batch, which simplifies how you can create an AWS Batch state that submits an asynchronous job and waits for that job to finish.

Before, you needed to build a state machine building block that submitted a job to AWS Batch, and then polled and checked its execution. Now, you can just submit the job to AWS Batch using the new AWS Batch task type.  Step Functions waits to proceed until the job is completed. This reduces the complexity of your state machine and makes it easier to build a genomics workflow with asynchronous AWS Batch steps.

The new integrations include support for the following API actions:

  • AWS Batch SubmitJob
  • Amazon SNS Publish
  • Amazon SQS SendMessage
  • Amazon ECS RunTask
  • AWS Fargate RunTask
  • Amazon DynamoDB
    • PutItem
    • GetItem
    • UpdateItem
    • DeleteItem
  • Amazon SageMaker
    • CreateTrainingJob
    • CreateTransformJob
  • AWS Glue
    • StartJobRun

You can also pass parameters to the service API.  To use the new integrations, the role that you assume when running a state machine needs to have the appropriate permissions.  For more information, see the AWS Step Functions Developer Guide.

Using a job status poller

In our 2017 post series, we created a job poller “pattern” with two separate Lambda functions. When the job finishes, the state machine proceeds to the next step and operates according to the necessary business logic.  This is a useful pattern to manage asynchronous jobs when a direct integration is unavailable.

The steps in this building block state machine are as follows:

  1. A job is submitted through a Lambda function.
  2. The state machine queries the AWS Batch API for the job status in another Lambda function.
  3. The job status is checked to see if the job has completed.  If the job status equals SUCCESS, the final job status is logged. If the job status equals FAILED, the execution of the state machine ends. In all other cases, wait 30 seconds and go back to Step 2.

Both of the Submit Job and Get Job Lambda functions are available as example Lambda functions in the console.  The job status poller is available in the Step Functions console as a sample project.

Here is the JSON representing this state machine.

{
  "Comment": "A simple example that submits a job to AWS Batch",
  "StartAt": "SubmitJob",
  "States": {
    "SubmitJob": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:<account-id>::function:batchSubmitJob",
      "Next": "GetJobStatus"
    },
    "GetJobStatus": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:<account-id>:function:batchGetJobStatus",
      "Next": "CheckJobStatus",
      "InputPath": "$",
      "ResultPath": "$.status"
    },
    "CheckJobStatus": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.status",
          "StringEquals": "FAILED",
          "End": true
        },
        {
          "Variable": "$.status",
          "StringEquals": "SUCCEEDED",
          "Next": "GetFinalJobStatus"
        }
      ],
      "Default": "Wait30Seconds"
    },
    "Wait30Seconds": {
      "Type": "Wait",
      "Seconds": 30,
      "Next": "GetJobStatus"
    },
    "GetFinalJobStatus": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:<account-id>:function:batchGetJobStatus",
      "End": true
    }
  }
}

With Step Functions Service Integrations

With Step Functions service integrations, it is now simpler to submit and wait for an AWS Batch job, or any other supported service.

The following code block is the JSON representing the new state machine for an asynchronous batch job. If you are familiar with the AWS Batch SubmitJob API action, you may notice that the parameters are consistent with what you would see in that API call. You can also use the optional AWS Batch parameters in addition to JobDefinition, JobName, and JobQueue.

{
 "StartAt": "RunBatchJob",
 "States": {
     "RunIsaacJob":{
     "Type":"Task",
     "Resource":"arn:aws:states:::batch:submitJob.sync",
     "Parameters":{
        "JobDefinition":"Isaac",
        "JobName.$":"$.isaac.JobName",
        "JobQueue":"HighPriority",
        "Parameters.$": "$.isaac"
     },
     "TimeoutSeconds": 900,
     "HeartbeatSeconds": 60,
     "Next":"Parallel",
     "InputPath":"$",
     "ResultPath":"$.status",
     "Retry" : [
        {
          "ErrorEquals": [ "States.Timeout" ],
          "IntervalSeconds": 3,
          "MaxAttempts": 2,
          "BackoffRate": 1.5
        }
     ]
  }
}

Here is an example of the workflow input JSON.  Pass all of the container parameters that were being constructed in the submit job Lambda function.

{
  "isaac": {
    "WorkingDir": "/scratch",
    "JobName": "isaac-1",
    "FastQ1S3Path": "s3://aws-batch-genomics-resources/fastq/SRR1919605_1.fastq.gz",
    "BAMS3FolderPath": "s3://aws-batch-genomics-resources/fastq/SRR1919605_2.fastq.gz",
    "FastQ2S3Path": "s3://bccn-genome-data/fastq/NIST7035_R2_trimmed.fastq.gz",
    "ReferenceS3Path": "s3://aws-batch-genomics-resources/reference/isaac/"
  }
}

When you deploy the job definition, add the command attribute that was previously being constructed in the Lambda function launching the AWS Batch job.

IsaacJobDefinition:
    Type: AWS::Batch::JobDefinition
    Properties:
      JobDefinitionName: "Isaac"
      Type: container
      RetryStrategy:
        Attempts: 1
      Parameters:
        BAMS3FolderPath: !Sub "s3://${JobResultsBucket}/NA12878_states_1/bam"
        FastQ1S3Path: "s3://aws-batch-genomics-resources/fastq/SRR1919605_1.fastq.gz"
        FastQ2S3Path: "s3://aws-batch-genomics-resources/fastq/SRR1919605_2.fastq.gz"
        ReferenceS3Path: "s3://aws-batch-genomics-resources/reference/isaac/"
        WorkingDir: "/scratch"
      ContainerProperties:
        Image: "rulaszek/isaac"
        Vcpus: 32
        Memory: 80000
        JobRoleArn:
          Fn::ImportValue: !Sub "${RoleStackName}:ECSTaskRole"
        Command:
          - "--bam_s3_folder_path"
          - "Ref::BAMS3FolderPath"
          - "--fastq1_s3_path"
          - "Ref::FastQ1S3Path"
          - "--fastq2_s3_path"
          - "Ref::FastQ2S3Path"
          - "--reference_s3_path"
          - "Ref::ReferenceS3Path"
          - "--working_dir"
          - "Ref::WorkingDir"
        MountPoints:
          - ContainerPath: "/scratch"
            ReadOnly: false
            SourceVolume: docker_scratch
        Volumes:
          - Name: docker_scratch
            Host:
              SourcePath: "/docker_scratch"

The key-value parameters passed into the workflow are mapped using Parameters.$ to the values in the job definition using the keys.  Value substitutions do take place. The Docker run looks like the following:

docker run <isaac_container_uri> --bam_s3_folder_path s3://batch-genomics-pipeline-jobresultsbucket-1kzdu216m2b0k/NA12878_states_3/bam
                                 --fastq1_s3_path s3://aws-batch-genomics-resources/fastq/SRR1919605_1.fastq.gz
                                 --fastq2_s3_path s3://aws-batch-genomics-resources/fastq/SRR1919605_2.fastq.gz 
                                 --reference_s3_path s3://aws-batch-genomics-resources/reference/isaac/ 
                                 --working_dir /scratch

Genomics workflow: Before and after

Overall, connectors dramatically simplify your genomics workflow.  The following workflow is a simple genomics secondary analysis pipeline, which we highlighted in our original post series.

The first step aligns the sample against a reference genome.  When alignment is complete, variant calling and QA metrics are calculated in two parallel steps.  When variant calling is complete, variant annotation is performed.  Before, our genomics workflow looked like this:

Now it looks like this:

Here is the new workflow JSON:

{
   "Comment":"A simple genomics secondary-analysis workflow",
   "StartAt":"RunIsaacJob",
   "States":{
      "RunIsaacJob":{
         "Type":"Task",
         "Resource":"arn:aws:states:::batch:submitJob.sync",
         "Parameters":{
            "JobDefinition":"Isaac",
            "JobName.$":"$.isaac.JobName",
            "JobQueue":"HighPriority",
            "Parameters.$": "$.isaac"
         },
         "TimeoutSeconds": 900,
         "HeartbeatSeconds": 60,
         "Next":"Parallel",
         "InputPath":"$",
         "ResultPath":"$.status",
         "Retry" : [
            {
              "ErrorEquals": [ "States.Timeout" ],
              "IntervalSeconds": 3,
              "MaxAttempts": 2,
              "BackoffRate": 1.5
            }
         ]
      },
      "Parallel":{
         "Type":"Parallel",
         "Next":"FinalState",
         "Branches":[
            {
               "StartAt":"RunStrelkaJob",
               "States":{
                  "RunStrelkaJob":{
                     "Type":"Task",
                     "Resource":"arn:aws:states:::batch:submitJob.sync",
                     "Parameters":{
                        "JobDefinition":"Strelka",
                        "JobName.$":"$.strelka.JobName",
                        "JobQueue":"HighPriority",
                        "Parameters.$": "$.strelka"
                     },
                     "TimeoutSeconds": 900,
                     "HeartbeatSeconds": 60,
                     "Next":"RunSnpEffJob",
                     "InputPath":"$",
                     "ResultPath":"$.status",
                     "Retry" : [
                        {
                          "ErrorEquals": [ "States.Timeout" ],
                          "IntervalSeconds": 3,
                          "MaxAttempts": 2,
                          "BackoffRate": 1.5
                        }
                     ]
                  },
                  "RunSnpEffJob":{
                     "Type":"Task",
                     "Resource":"arn:aws:states:::batch:submitJob.sync",
                     "Parameters":{
                        "JobDefinition":"SNPEff",
                        "JobName.$":"$.snpeff.JobName",
                        "JobQueue":"HighPriority",
                        "Parameters.$": "$.snpeff"
                     },
                     "TimeoutSeconds": 900,
                     "HeartbeatSeconds": 60,
                     "Retry" : [
                        {
                          "ErrorEquals": [ "States.Timeout" ],
                          "IntervalSeconds": 3,
                          "MaxAttempts": 2,
                          "BackoffRate": 1.5
                        }
                     ],
                     "End":true
                  }
               }
            },
            {
               "StartAt":"RunSamtoolsStatsJob",
               "States":{
                  "RunSamtoolsStatsJob":{
                     "Type":"Task",
                     "Resource":"arn:aws:states:::batch:submitJob.sync",
                     "Parameters":{
                        "JobDefinition":"SamtoolsStats",
                        "JobName.$":"$.samtools.JobName",
                        "JobQueue":"HighPriority",
                        "Parameters.$": "$.samtools"
                     },
                     "TimeoutSeconds": 900,
                     "HeartbeatSeconds": 60,
                     "End":true,
                     "Retry" : [
                        {
                          "ErrorEquals": [ "States.Timeout" ],
                          "IntervalSeconds": 3,
                          "MaxAttempts": 2,
                          "BackoffRate": 1.5
                        }
                     ]
                  }
               }
            }
         ]
      },
      "FinalState":{
         "Type":"Pass",
         "End":true
      }
   }
}

Here is the new Amazon CloudFormation template for deploying the AWS Batch job definitions for each tool:

AWSTemplateFormatVersion: 2010-09-09

Description: Batch job definitions for batch genomics

Parameters:
  RoleStackName:
    Description: "Stack that deploys roles for genomic workflow"
    Type: String
  VPCStackName:
    Description: "Stack that deploys vps for genomic workflow"
    Type: String
  JobResultsBucket:
    Description: "Bucket that holds workflow job results"
    Type: String

Resources:
  IsaacJobDefinition:
    Type: AWS::Batch::JobDefinition
    Properties:
      JobDefinitionName: "Isaac"
      Type: container
      RetryStrategy:
        Attempts: 1
      Parameters:
        BAMS3FolderPath: !Sub "s3://${JobResultsBucket}/NA12878_states_1/bam"
        FastQ1S3Path: "s3://aws-batch-genomics-resources/fastq/SRR1919605_1.fastq.gz"
        FastQ2S3Path: "s3://aws-batch-genomics-resources/fastq/SRR1919605_2.fastq.gz"
        ReferenceS3Path: "s3://aws-batch-genomics-resources/reference/isaac/"
        WorkingDir: "/scratch"
      ContainerProperties:
        Image: "rulaszek/isaac"
        Vcpus: 32
        Memory: 80000
        JobRoleArn:
          Fn::ImportValue: !Sub "${RoleStackName}:ECSTaskRole"
        Command:
          - "--bam_s3_folder_path"
          - "Ref::BAMS3FolderPath"
          - "--fastq1_s3_path"
          - "Ref::FastQ1S3Path"
          - "--fastq2_s3_path"
          - "Ref::FastQ2S3Path"
          - "--reference_s3_path"
          - "Ref::ReferenceS3Path"
          - "--working_dir"
          - "Ref::WorkingDir"
        MountPoints:
          - ContainerPath: "/scratch"
            ReadOnly: false
            SourceVolume: docker_scratch
        Volumes:
          - Name: docker_scratch
            Host:
              SourcePath: "/docker_scratch"

  StrelkaJobDefinition:
    Type: AWS::Batch::JobDefinition
    Properties:
      JobDefinitionName: "Strelka"
      Type: container
      RetryStrategy:
        Attempts: 1
      Parameters:
        BAMS3Path: !Sub "s3://${JobResultsBucket}/NA12878_states_1/bam/sorted.bam"
        BAIS3Path: !Sub "s3://${JobResultsBucket}/NA12878_states_1/bam/sorted.bam.bai"
        ReferenceS3Path: "s3://aws-batch-genomics-resources/reference/hg38.fa"
        ReferenceIndexS3Path: "s3://aws-batch-genomics-resources/reference/hg38.fa.fai"
        VCFS3Path: !Sub "s3://${JobResultsBucket}/NA12878_states_1/vcf"
        WorkingDir: "/scratch"
      ContainerProperties:
        Image: "rulaszek/strelka"
        Vcpus: 32
        Memory: 32000
        JobRoleArn:
          Fn::ImportValue: !Sub "${RoleStackName}:ECSTaskRole"
        Command:
          - "--bam_s3_path"
          - "Ref::BAMS3Path"
          - "--bai_s3_path"
          - "Ref::BAIS3Path"
          - "--reference_s3_path"
          - "Ref::ReferenceS3Path"
          - "--reference_index_s3_path"
          - "Ref::ReferenceIndexS3Path"
          - "--vcf_s3_path"
          - "Ref::VCFS3Path"
          - "--working_dir"
          - "Ref::WorkingDir"
        MountPoints:
          - ContainerPath: "/scratch"
            ReadOnly: false
            SourceVolume: docker_scratch
        Volumes:
          - Name: docker_scratch
            Host:
              SourcePath: "/docker_scratch"

  SnpEffJobDefinition:
    Type: AWS::Batch::JobDefinition
    Properties:
      JobDefinitionName: "SNPEff"
      Type: container
      RetryStrategy:
        Attempts: 1
      Parameters:
        VCFS3Path: !Sub "s3://${JobResultsBucket}/NA12878_states_1/vcf/variants/genome.vcf.gz"
        AnnotatedVCFS3Path: !Sub "s3://${JobResultsBucket}/NA12878_states_1/vcf/genome.anno.vcf"
        CommandArgs: " -t hg38 "
        WorkingDir: "/scratch"
      ContainerProperties:
        Image: "rulaszek/snpeff"
        Vcpus: 4
        Memory: 10000
        JobRoleArn:
          Fn::ImportValue: !Sub "${RoleStackName}:ECSTaskRole"
        Command:
          - "--annotated_vcf_s3_path"
          - "Ref::AnnotatedVCFS3Path"
          - "--vcf_s3_path"
          - "Ref::VCFS3Path"
          - "--cmd_args"
          - "Ref::CommandArgs"
          - "--working_dir"
          - "Ref::WorkingDir"
        MountPoints:
          - ContainerPath: "/scratch"
            ReadOnly: false
            SourceVolume: docker_scratch
        Volumes:
          - Name: docker_scratch
            Host:
              SourcePath: "/docker_scratch"

  SamtoolsStatsJobDefinition:
    Type: AWS::Batch::JobDefinition
    Properties:
      JobDefinitionName: "SamtoolsStats"
      Type: container
      RetryStrategy:
        Attempts: 1
      Parameters:
        ReferenceS3Path: "s3://aws-batch-genomics-resources/reference/hg38.fa"
        BAMS3Path: !Sub "s3://${JobResultsBucket}/NA12878_states_1/bam/sorted.bam"
        BAMStatsS3Path: !Sub "s3://${JobResultsBucket}/NA12878_states_1/bam/sorted.bam.stats"
        WorkingDir: "/scratch"
      ContainerProperties:
        Image: "rulaszek/samtools-stats"
        Vcpus: 4
        Memory: 10000
        JobRoleArn:
          Fn::ImportValue: !Sub "${RoleStackName}:ECSTaskRole"
        Command:
          - "--bam_s3_path"
          - "Ref::BAMS3Path"
          - "--bam_stats_s3_path"
          - "Ref::BAMStatsS3Path"
          - "--reference_s3_path"
          - "Ref::ReferenceS3Path"
          - "--working_dir"
          - "Ref::WorkingDir"
        MountPoints:
          - ContainerPath: "/scratch"
            ReadOnly: false
            SourceVolume: docker_scratch
        Volumes:
          - Name: docker_scratch
            Host:
              SourcePath: "/docker_scratch"

Here is the new CloudFormation script that deploys the new workflow:

AWSTemplateFormatVersion: 2010-09-09

Description: State Machine for batch benomics

Parameters:
  RoleStackName:
    Description: "Stack that deploys roles for genomic workflow"
    Type: String
  VPCStackName:
    Description: "Stack that deploys vps for genomic workflow"
    Type: String

Resources:
  # S3
  GenomicWorkflow:
    Type: AWS::StepFunctions::StateMachine
    Properties:
      RoleArn:
        Fn::ImportValue: !Sub "${RoleStackName}:StatesExecutionRole"
      DefinitionString: !Sub |-
        {
           "Comment":"A simple example that submits a job to AWS Batch",
           "StartAt":"RunIsaacJob",
           "States":{
              "RunIsaacJob":{
                 "Type":"Task",
                 "Resource":"arn:aws:states:::batch:submitJob.sync",
                 "Parameters":{
                    "JobDefinition":"Isaac",
                    "JobName.$":"$.isaac.JobName",
                    "JobQueue":"HighPriority",
                    "Parameters.$": "$.isaac"
                 },
                 "TimeoutSeconds": 900,
                 "HeartbeatSeconds": 60,
                 "Next":"Parallel",
                 "InputPath":"$",
                 "ResultPath":"$.status",
                 "Retry" : [
                    {
                      "ErrorEquals": [ "States.Timeout" ],
                      "IntervalSeconds": 3,
                      "MaxAttempts": 2,
                      "BackoffRate": 1.5
                    }
                 ]
              },
              "Parallel":{
                 "Type":"Parallel",
                 "Next":"FinalState",
                 "Branches":[
                    {
                       "StartAt":"RunStrelkaJob",
                       "States":{
                          "RunStrelkaJob":{
                             "Type":"Task",
                             "Resource":"arn:aws:states:::batch:submitJob.sync",
                             "Parameters":{
                                "JobDefinition":"Strelka",
                                "JobName.$":"$.strelka.JobName",
                                "JobQueue":"HighPriority",
                                "Parameters.$": "$.strelka"
                             },
                             "TimeoutSeconds": 900,
                             "HeartbeatSeconds": 60,
                             "Next":"RunSnpEffJob",
                             "InputPath":"$",
                             "ResultPath":"$.status",
                             "Retry" : [
                                {
                                  "ErrorEquals": [ "States.Timeout" ],
                                  "IntervalSeconds": 3,
                                  "MaxAttempts": 2,
                                  "BackoffRate": 1.5
                                }
                             ]
                          },
                          "RunSnpEffJob":{
                             "Type":"Task",
                             "Resource":"arn:aws:states:::batch:submitJob.sync",
                             "Parameters":{
                                "JobDefinition":"SNPEff",
                                "JobName.$":"$.snpeff.JobName",
                                "JobQueue":"HighPriority",
                                "Parameters.$": "$.snpeff"
                             },
                             "TimeoutSeconds": 900,
                             "HeartbeatSeconds": 60,
                             "Retry" : [
                                {
                                  "ErrorEquals": [ "States.Timeout" ],
                                  "IntervalSeconds": 3,
                                  "MaxAttempts": 2,
                                  "BackoffRate": 1.5
                                }
                             ],
                             "End":true
                          }
                       }
                    },
                    {
                       "StartAt":"RunSamtoolsStatsJob",
                       "States":{
                          "RunSamtoolsStatsJob":{
                             "Type":"Task",
                             "Resource":"arn:aws:states:::batch:submitJob.sync",
                             "Parameters":{
                                "JobDefinition":"SamtoolsStats",
                                "JobName.$":"$.samtools.JobName",
                                "JobQueue":"HighPriority",
                                "Parameters.$": "$.samtools"
                             },
                             "TimeoutSeconds": 900,
                             "HeartbeatSeconds": 60,
                             "End":true,
                             "Retry" : [
                                {
                                  "ErrorEquals": [ "States.Timeout" ],
                                  "IntervalSeconds": 3,
                                  "MaxAttempts": 2,
                                  "BackoffRate": 1.5
                                }
                             ]
                          }
                       }
                    }
                 ]
              },
              "FinalState":{
                 "Type":"Pass",
                 "End":true
              }
           }
        }

Outputs:
  GenomicsWorkflowArn:
    Description: GenomicWorkflow ARN
    Value: !Ref GenomicWorkflow
  StackName:
    Description: StackName
    Value: !Sub ${AWS::StackName}

Conclusion

AWS Step Functions service integrations are a great way to simplify creating complex workflows with asynchronous steps. While we highlighted the use case with AWS Batch today, there are many other ways that healthcare and life sciences customers can use this new feature, such as with message processing.

For more information about how AWS can enable your genomics workloads, be sure to check out the AWS Genomics page.

We’ve updated the open-source project to take advantage of the new AWS Batch integration in Step Functions.  You can find the changes aws-batch-genomics/tree/v2.0.0 folder.

Original posts in this four-part series:

Happy coding!

Create cross-account and cross-region AWS Glue connections

Post Syndicated from Pankaj Malhotra original https://aws.amazon.com/blogs/big-data/create-cross-account-and-cross-region-aws-glue-connections/

AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easy to prepare and load data for analytics. AWS Glue uses connections to access certain types of source and target data stores, as described in the AWS Glue documentation.

By default, you can use AWS Glue to create connections to data stores in the same AWS account and AWS Region as the one where you have AWS Glue resources. In this blog post, we describe how to access data stores in an account or AWS Region different from the one where you have AWS Glue resources.

AWS Glue connections

AWS Glue uses a connection to crawl and catalog a data store’s metadata in the AWS Glue Data Catalog, as the documentation describes. AWS Glue ETL jobs also use connections to connect to source and target data stores. AWS Glue supports connections to Amazon Redshift, Amazon RDS, and JDBC data stores.

A connection contains the properties needed by AWS Glue to access a data store. These properties might include connection information such as user name and password, data store subnet IDs, and security groups.

If the data store is located inside an Amazon VPC, AWS Glue uses the VPC subnet ID and security group ID connection properties to set up elastic network interfaces in the VPC containing the data store. Doing this enables ETL jobs and crawlers to connect securely to the data store in the VPC.

AWS Glue can create this elastic network interface setup if the VPC containing the data store is in the same account and AWS Region as the AWS Glue resources. The security groups specified in a connection’s properties are applied on each of the network interfaces. The security group rules and network ACLs associated with the subnet control network traffic through the subnet. Correct rules for allowing outbound traffic through the subnet ensure that AWS Glue can establish network connectivity with all subnets in the VPC containing the data store, and therefore access the source or target data store.

VPC components can be interlinked only if they are present in the same AWS Region. Therefore, AWS Glue cannot create the elastic network interfaces inside a VPC in another region. If the VPC containing the data store is in another region, you have to add the network routes and create additional network interfaces which allow network interfaces set up by AWS Glue to establish network connectivity with the data store.

In this blog post, we describe how to configure the networking routes and interfaces to give AWS Glue access to a data store in an AWS Region different from the one with your AWS Glue resources. In our example, we connect AWS Glue, located in Region A, to an Amazon Redshift data warehouse located in Region B.

Note: The examples here assume that the Amazon Redshift cluster is in a different AWS Region, but belongs to the same account. The same setup and instructions are also valid for an Amazon Redshift cluster in a different account.

Setting up VPC components for AWS Glue

AWS Glue requires a VPC with networking routes to the data stores to which it connects. In our solution, the security groups and route tables are configured to enable elastic network interfaces set up by AWS Glue in a private subnet to reach the internet or connect to data stores outside the VPC. The following diagram shows the necessary components and the network traffic flow.

Required components for VPC setup:

  • AWS Glue resources in a private subnet in Region A.
  • A NAT gateway with an Elastic IP address attached to it in a public subnet in Region A.
  • A private route table containing a route allowing outbound network traffic from the private subnet to pass through the NAT gateway.
  • An internet gateway in Region A.
  • A public route table with a route allowing outbound network traffic from the public subnet to pass through the internet gateway.

Note: We must update the default security group of the VPC to include a self-referencing inbound rule and an outbound rule to allow all traffic from all ports. Later in the example, we attach this security group to an AWS Glue connection to let network interfaces set up by AWS Glue communicate with each other within a private subnet.

Network traffic flow through the components:

Outbound network traffic from AWS Glue resources in the private subnet to any destination or data store outside the private subnet is routed through the NAT gateway.

The NAT gateway is present in a public subnet and has an associated Elastic IP address. It forwards network traffic from AWS Glue resources to internet by using an internet gateway.

When AWS Glue tries to establish a connection with a data store outside of the private subnet, the incoming network traffic on the data store side appears to come from the NAT Gateway.

On the data store side, you allow the data store or its security group to accept incoming network traffic from the Elastic IP address attached to the NAT gateway. This is shown in the section “Allow Amazon Redshift to accept network traffic from AWS Glue,” following.

Creating VPC components using AWS CloudFormation

You can automate the creation of a VPC and all the components described preceding using the vpc_setup.yaml CloudFormation template, hosted on GitHub. Follow these step-by-step instructions to create the stack in your AWS account:

  1. Deploy the stack in the US Oregon (us-west-2) Region:

Note: In this example, we create the AWS Glue resources and connection in the us-west-2 Region. You can change this to the AWS Region where you have your AWS Glue connection and resources.

You are directed to the AWS CloudFormation console, with the stack name and URL template fields pre-filled.

  1. Choose Next.
  2. Use the default IP ranges and choose Next.
  3. Skip this step and choose Next.
  4. Review and choose Create.
  5. Wait for stack creation to complete. After completion, all the VPC components and necessary setup required are created.
  6. Navigate to the VPC console and copy the Elastic IP address for the newly created NAT.
    Note: This IP address is used for outbound network flow from AWS Glue resources and so should be whitelisted on the data store side. For more detail, see “Allow Amazon Redshift to accept network traffic from AWS Glue,” following.

Before creating and testing an AWS Glue connection to your data store, you need an IAM role that lets AWS Glue access the VPC components that you just created.

Creating an IAM role to let AWS Glue access Amazon VPC components

For this example, we create a role called TestAWSGlueConnectionIAMRole with a managed IAM policy AWSGlueServiceRole attached to it.

  1. Choose the Roles tab from the AWS Identity and Access Management (IAM) console.
  2. Choose Create role and select AWS Glue as a trusted entity.

  1. Attach an IAM policy to the role that allows AWS Glue to access the VPC components. In this example, we are using the default AWSGlueServiceRole policy, which contains all the required permissions for the setup.

  1. We name the role TestAWSGlueConnectionRole.

Note: The default GlueServiceRole policy that we attached to our custom role TestAWSGlueConnectionIAMRole has permissions for accessing VPC components. If you are using a custom policy instead of the default one, it should also contain the same permissions to be able to access VPC components.

Creating an Amazon Redshift cluster using AWS CloudFormation

For this example, we create a sample Amazon Redshift cluster in a VPC in the US N. Virginia (us-east-1) Region. Follow these step-by-step instructions to create the stack in your AWS account:

  1. Navigate to the CloudFormation console in region us-east-1 and create a new stack using this CloudFormation template, described in the documentation.
  2. Provide the configuration for the cluster and MasterUsername and MasterUserPassword. MasterUserPassword must follow the following constraints:
  • It must be 8–64 characters in length.
  • It must contain at least one uppercase letter, one lowercase letter, and one number.
  • It can use any printable ASCII characters (ASCII code 33–126) except ‘ (single quote), ” (double quote), :, \, /, @, or space.

  1. Choose Next and proceed with the stack creation.
  2. Review the configuration and choose Create.

  1. Wait for stack creation to complete, which can take a few minutes.

  1. Navigate to the Amazon Redshift console and choose the cluster name to see the cluster properties.

  1. Note the JDBC URL for the cluster and the attached security group for later use.

Note: We created a sample Amazon Redshift cluster in a public subnet present inside a VPC in Region B. We recommend that you follow the best practices for increased security and availability while setting up a new Amazon Redshift cluster, as shown in our samples on GitHub.

Creating an AWS Glue connection

Now you have the required VPC setup, Amazon Redshift cluster, and IAM role in place. Next, you can create an AWS Glue connection and test it as follows:

  1. Choose Add Connection under the Connections tab in AWS Glue console. The AWS Region in which we are creating this connection is the same as for our VPC setup, that is US Oregon (us-west-2).

  1. Choose a JDBC connection type. You can choose to enforce JDBC SSL or not, depending on the configuration for your data store.

  1. Add the connection-specific configuration. Note the URL for our Amazon Redshift cluster. It shows that the Amazon Redshift cluster is present in us-east-1.

Note: We use the VPC (VPCForGlue) and the private subnet (GluePrivateSubnet) we created for this connection. For security groups, we use the default security group for the VPC. This security group has a self-referencing inbound rule and an outbound rule that allows all traffic.

  1. Review configuration and choose Finish.

The AWS Glue console should now show that the connection was created successfully.

Note: Completing this step just means that an AWS Glue connection was created. It doesn’t guarantee that AWS Glue can actually connect to your data store. Before we test the connection, we also need to allow Amazon Redshift to accept network traffic coming from AWS Glue.

Allow Amazon Redshift to accept network traffic from AWS Glue

The Amazon Redshift cluster in a different AWS Region (us-east-1) from AWS Glue must allow incoming network traffic from AWS Glue.

For this, we update the security group attached to the Amazon Redshift cluster, and whitelist the Elastic IP address attached to the NAT gateway for the AWS Glue VPC.

Testing the AWS Glue connection

As a best practice, before you use a data store connection in an ETL job, choose Test connection. AWS Glue uses the parameters in your connection to confirm that it can access your data store and reports back any errors.

  1. Select the connection TestAWSGlueConnection that we just created and choose Test Connection.
  2. Select the TestAWSGlueConnectionIAMRole that we created for allowing AWS Glue resources to access VPC components.

  1. After you choose the Test connection button in the previous step, it can take a few seconds for AWS Glue to successfully connect to the data store. When it does, the console shows a message saying it “connected successfully to your instance.”

Conclusion

By creating a VPC setup similar to the one we describe, you can let AWS Glue connect to a data store in a different account or AWS Region. By doing this, you establish network connectivity between AWS Glue resources and your data store. You can now use this AWS Glue connection in ETL jobs and AWS Glue crawlers to connect with the data store.

If you have questions or suggestions, please leave a comment following.


Additional Reading

If you found this post helpful, be sure to check out Connecting to and running ETL jobs across multiple VPCs using a dedicated AWS Glue VPC, and How to access and analyze on-premises data stores using AWS Glue.

 


About the Author

Pankaj Malhotra is a Software Development Engineer at Amazon Web Services. He enjoys solving problems related to cloud infrastructure and distributed systems. He specializes in developing multi-regional, resilient services using serverless technologies.

 

 

 

Connecting to and running ETL jobs across multiple VPCs using a dedicated AWS Glue VPC

Post Syndicated from Nivas Shankar original https://aws.amazon.com/blogs/big-data/connecting-to-and-running-etl-jobs-across-multiple-vpcs-using-a-dedicated-aws-glue-vpc/

Many organizations use a setup that includes multiple VPCs based on the Amazon VPC service, with databases isolated in separate VPCs for security, auditing, and compliance purposes. This blog post shows how you can use AWS Glue to perform extract, transform, load (ETL) and crawler operations for databases located in multiple VPCs.

The solution presented here uses a dedicated AWS Glue VPC and subnet to perform the following operations on databases located in different VPCs:

  • Scenario 1: Ingest data from an Amazon RDS for MySQL database, transform it in AWS Glue, and output the results to an Amazon Redshift data warehouse.
  • Scenario 2: Ingest data from an Amazon RDS for MySQL database, transform it in AWS Glue, and output the results to an Amazon RDS for PostgreSQL database.

In this blog post, we’ll go through the steps needed to build an ETL pipeline that consumes from one source in one VPC and outputs it to another source in a different VPC. We’ll set up in multiple VPCs to reproduce a situation where your database instances are in multiple VPCs for isolation related to security, audit, or other purposes.

For this solution, we create a VPC dedicated to AWS Glue. Next, we set up VPC peering between the AWS Glue VPC and all of the other database VPCs. Then we configure an Amazon S3 endpoint, route tables, security groups, and IAM so that AWS Glue can function properly. Lastly, we create AWS Glue connections and an AWS Glue job to perform the task at hand.

Step 1: Set up a VPC

To simulate these scenarios, we create four VPCs with their respective IPv4 CIDR ranges. (Note: CIDR ranges can’t overlap when you use VPC peering.)

VPC 1Amazon Redshift172.31.0.0/16
VPC 2Amazon RDS for MySQL172.32.0.0/16
VPC 3Amazon RDS for PostgreSQL172.33.0.0/16
VPC 4AWS Glue172.30.0.0/16

Key configuration notes:

  1. The AWS Glue VPC needs at least one private subnet for AWS Glue to use.
  2. Ensure that DNS hostnames are enabled for all of your VPCs (unless you plan to refer to your databases by IP address later on, which isn’t recommended).

Step 2: Set up a VPC peering connection

Next, we peer our VPCs together to ensure that AWS Glue can communicate with all of the database targets and sources. This approach is necessary because AWS Glue resources are created with private addresses only. Thus, they can’t use an internet gateway to communicate with public addresses, such as public database endpoints. If your database endpoints are public, you can alternatively use a network address translation (NAT) gateway with AWS Glue rather than peer across VPCs.

Create the following peering connections.

RequesterAccepter
Peer 1172.30.0.0/16- VPC 4172.31.0.0/16- VPC 1
Peer 2172.30.0.0/16- VPC 4172.32.0.0/16 -VPC 2
Peer 3172.30.0.0/16- VPC 4172.33.0.0/16- VPC 3

These peering connections can be across separate AWS Regions if needed. The database VPCs are not peered together; they are all peered with the AWS Glue VPC instead. We do this because AWS Glue connects to each database from its own VPC. The databases don’t connect to each other.

Key configuration notes:

  1. Create a VPC peering connection, as described in the Amazon VPC documentation. Select the AWS Glue VPC as the requester and the VPC for your database as the accepter.
  2. Accept the VPC peering request. If you are peering to a different AWS Region, switch to that AWS Region to accept the request.

Important: Enable Domain Name Service (DNS) settings for each of the peering connections. Doing this ensures that AWS Glue can retrieve the private IP address of your database endpoints. Otherwise, AWS Glue resolves your database endpoints to public IP addresses. AWS Glue can’t connect to public IP addresses without a NAT gateway.

Step 3: Create an Amazon S3 endpoint for the AWS Glue subnet

We need to add an Amazon S3 endpoint to the AWS Glue VPC (VPC 4). During setup, associate the endpoint with the route table that your private subnet uses. For more details on creating an S3 endpoint for AWS Glue, see Amazon VPC Endpoints for Amazon S3 in the AWS Glue documentation.

AWS Glue uses S3 to store your scripts and temporary data to load into Amazon Redshift.

Step 4: Create a route table configuration

Add the following routes to the route tables used by the respective services’ subnets. These routes are configured along with existing settings.

VPC 4—AWS GlueDestinationTarget
Route table172.33.0.0/16- VPC 3Peer 3
172.31.0.0/16- VPC 1Peer 1
172.32.0.0/16- VPC 2Peer 2

 

VPC 1—Amazon RedshiftDestinationTarget
Route table172.30.0.0/16- VPC 4Peer 1

 

VPC 2—Amazon RDS MySQLDestinationTarget
Route table172.30.0.0/16- VPC 4Peer 2

 

VPC 3—Amazon RDS PostgreSQLDestinationTarget
Route table172.30.0.0/16- VPC 4Peer 3

Key configuration notes:

  • The route table for the AWS Glue VPC has peering connections to all VPCs. It has these so that AWS Glue can initiate connections to all of the databases.
  • All of the database VPCs have a peering connection back to the AWS Glue VPC. They have these connections to allow return traffic to reach AWS Glue.
  • Ensure that your S3 endpoint is present in the route table for the AWS Glue VPC.

Step 5: Update the database security groups

Each database’s security group must allow traffic to its listening port (3306, 5432, 5439, and so on) from the AWS Glue VPC for AWS Glue to be able to connect to it. It’s also a good idea to restrict the range of source IP addresses as much as possible.

There are two ways to accomplish this. If your AWS Glue job will be in the same AWS Region as the resource, you can define the source as the security group that you use for AWS Glue. If you are using AWS Glue to connect across AWS Regions, specify the IP range from the private subnet in the AWS Glue VPC instead. The examples following use a security group as our AWS Glue job, and data sources are all in the same AWS Region.

In addition to configuring the database’s security groups, AWS Glue requires a special security group that allows all inbound traffic from itself. Because it isn’t secure to allow traffic from 0.0.0.0/0, we create a self-referencing rule that simply allows all traffic originating from the security group. You can create a new security group for this purpose, or you can modify an existing security group. In the example following, we create a new security group to use later when AWS Glue connections are created.

The security group Amazon RDS for MySQL needs to allow traffic from AWS Glue:

Amazon RDS for PostgreSQL allows traffic to its listening port from the same:

Amazon Redshift does it as so:

AWS Glue does it as so:

Step 6: Set up IAM

Make sure that you have an AWS Glue IAM role with access to Amazon S3. You might want to provide your own policy for access to specific Amazon S3 resources. Data sources require s3:ListBucket and s3:GetObject permissions. Data targets require s3:ListBucket, s3:PutObject, and s3:DeleteObject permissions. For more information on creating an Amazon S3 policy for your resources, see Policies and Permissions in the IAM documentation.

The role should look like this:

Or you can create an S3 policy that’s more restricted to suit your use case.

Step 7: Set up an AWS Glue connection

The Amazon RDS for MySQL connection in AWS Glue should look like this:

The Amazon Redshift connection should look like this:

The Amazon RDS for PostgreSQL connection should look like this:

Step 8: Set up an AWS Glue job

Key configuration notes:

  1. Create a crawler to import table metadata from the source database (Amazon RDS for MySQL) into the AWS Glue Data Catalog. The scenario includes a database in the catalog named gluedb, to which the crawler adds the sample tables from the source Amazon RDS for MySQL database.
  2. Use either the source connection or destination connection to create a sample job as shown following. (This step is required for the AWS Glue job to establish a network connection and create the necessary elastic network interfaces with the databases’ VPCs and peered connections.)
  3. This scenario uses pyspark code and performs the load operation from Amazon RDS for MySQL to Amazon Redshift. The ingest from Amazon RDS for MySQL to Amazon RDS for PostgreSQL includes a similar job.
  4. After running the job, verify that the table exists in the target database and that the counts match.

The following screenshots show the steps to create a job in the AWS Glue Management Console.

Following are some of examples of loading data from source tables to target instances. These are simple one-to-one mappings, with no transformations applied. Notice that the data sources and data sink (target) connection configuration access multiple VPCs from a single AWS Glue job.

Sample script 1 (Amazon RDS for MySQL to Amazon Redshift)

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job


sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource = glueContext.create_dynamic_frame.from_catalog(database = "gluedb", table_name = "mysqldb_events", transformation_ctx = "datasource")


datasink = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource, catalog_connection = "Redshift", connection_options = {"dbtable": "mysqldb_events", "database": "dmartblog"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink")

job.commit()

Sample script 2:  Amazon RDS for MySQL to Amazon RDS for PostgreSQL (can also change with other RDS endpoint)

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


datasource = glueContext.create_dynamic_frame.from_catalog(database = "gluedb", table_name = "mysqldb_events", transformation_ctx = "datasource")

datasink = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "vpc-pgsql", connection_options = {"dbtable": "mysqldb_events", "database": "mypgsql"}, transformation_ctx = "datasink")

job.commit()

Summary

In this blog post, you learn how to configure AWS Glue to run in a separate VPC so that it can execute jobs for databases located in multiple VPCs.

The benefits of doing this include the following:

  • A separate VPC and dedicated pool on the running AWS Glue job, isolated from database and compute nodes.
  • Dedicated ETL developer access to a single VPC for better security control and provisioning.

 


Additional Reading

If you found this post useful, be sure to check out Restrict access to your AWS Glue Data Catalog with resource-level IAM permissions and resource-based policies, and Using Amazon Redshift Spectrum, Amazon Athena, and AWS Glue with Node.js in Production.

 


About the Author

Nivas Shankar is a Senior Big Data Consultant at Amazon Web Services. He helps and works closely with enterprise customers building big data applications on the AWS platform. He holds a Masters degree in physics and is highly passionate about theoretical physics concepts. He enjoys spending time with his wife and two adorable kids. In his spare time, he takes his kids to tennis and football practice.

 

 

Ian Eberhart is a Cloud Support Engineer on the Big Data team for AWS Premium Support. He works with customers on a daily basis to find solutions for moving and sorting their data on the AWS platform. In his spare time, Ian enjoys seeing independent and weird movies, riding his bike, and hiking in the mountains.

 

Chasing earthquakes: How to prepare an unstructured dataset for visualization via ETL processing with Amazon Redshift

Post Syndicated from Ian Funnell original https://aws.amazon.com/blogs/big-data/chasing-earthquakes-how-to-prepare-an-unstructured-dataset-for-visualization-via-etl-processing-with-amazon-redshift/

As organizations expand analytics practices and hire data scientists and other specialized roles, big data pipelines are growing increasingly complex. Sophisticated models are being built using the troves of data being collected every second.

The bottleneck today is often not the know-how of analytical techniques. Rather, it’s the difficulty of building and maintaining ETL (extract, transform, and load) jobs using tools that might be unsuitable for the cloud.

In this post, I demonstrate a solution to this challenge. I start with a noisy semistructured dataset of seismic events, spanning several years and recorded at different locations across the globe. I set out to obtain broad insights about the nature of the rocks forming the Earth’s surface itself—the tectonic plate structure—to be visualized using the mapping capability in Amazon QuickSight.

To accomplish this, I use several AWS services, orchestrated together using Matillion ETL for Amazon Redshift:

Tectonic plate structure context

An earthquake is caused by a buildup of pressure that gets suddenly released. Earthquakes tend to be more severe at the boundaries of destructive tectonic plates. These boundaries are formed when a heavier and denser oceanic plate collides with a lighter continental plate, or when two oceanic plates collide. Due to the difference in density, the oceanic lithosphere is pushed underneath the continental plate, forming what is called a subduction zone. (See the following diagram.) In subduction zones, earthquakes can occur at depths as great as 700 kilometers.

Photography by KDS4444 [CC BY-SA 4.0 (https://creativecommons.org/licenses/by-sa/4.0)], from Wikimedia Commons

For our analysis, we ultimately want to visualize the depth of an earthquake focus to locate subduction zones, and therefore find places on earth with the most severe earthquakes.

Seismographic data source

The data source is from the International Federation of Digital Seismograph Networks (FDSN). The event data is in JSON format (from the European Mediterranean Seismological Centre, or EMSC). An external process accumulates files daily into an Amazon S3 bucket, as shown following.

Each individual file contains all the seismic events for one day—usually several hundred—in an embedded array named “features,” as shown in the following example:

{
  "type": "FeatureCollection",
  "metadata": {
    "totalCount": 103
  },
  "features": [
    {
      "geometry": {
        "type": "Point",
        "coordinates": [26.76, 45.77, -140]
      },
      "type": "Feature",
      "id": "20180302_0000103",
      "properties": {
        "lastupdate": "2018-03-02T23:27:00.0Z",
        "lon": 26.76, "lat": 45.77, "depth": 140,
        "mag": 3.7,
        "time": "2018-03-02T23:22:52.1Z",
        "flynn_region": "ROMANIA"
      }
    },
    {
      "geometry": {
        "type": "Point",

Architecture overview

Athena reads and flattens the S3 data and makes it available for Matillion ETL to load into Amazon Redshift via JDBC. Matillion orchestrates this data movement, and it also provides a graphical framework to design and build the more complex data enrichment and aggregation steps to be performed by Amazon Redshift. Finally, the prepared data is queried by Amazon QuickSight for visualization.

Amazon Athena setup

You can use Athena to query data in S3 using standard SQL, via a serverless infrastructure that is managed entirely by AWS on your behalf. Before you can query your data, start by creating an external table. By doing this, you are defining the schema to apply to the data when it is being queried.

You can choose to use an AWS Glue crawler to assist in automatically discovering the schema and format of your source data files.

The following is the CREATE TABLE statement that you can copy and paste into the Athena console to create the schema needed to query the seismic data. Make sure that you substitute the correct S3 path to your seismic data in the LOCATION field of the statement.

CREATE EXTERNAL TABLE `sp_events`(
  `type` string COMMENT 'from deserializer', 
  `metadata` struct<totalcount:int> COMMENT 'from deserializer', 
  `features` array<struct<geometry:struct<type:string,coordinates:array<double>>,type:string,id:string,properties:struct<lastupdate:string,magtype:string,evtype:string,lon:double,auth:string,lat:double,depth:double,unid:string,mag:double,time:string,source_id:string,source_catalog:string,flynn_region:string>>> COMMENT 'from deserializer')
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' 
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://your-bucket/SeismicPortal'

After the table is created, you are ready to query it. Athena uses the in-memory, distributed SQL engine called Apache Presto. It provides the ability to unnest arrays, which you use next.

Transfer to Amazon Redshift

The embedded array in every source record gets flattened out and converted to individual records during the JDBC export (download the .jar file) into Amazon Redshift. You use a Matillion ETL Database Query component to assist with the data transfer during this step, as shown in the following image.

This component simplifies ETL by automating the following steps:

Runs the SQL SELECT statement (shown in the following example).

  1. Streams query results across the network from Athena and into temporary storage in S3.
  2. Performs a bulk data load into Amazon Redshift.

Athena executes the following SQL statement:

SELECT f.id,
	   f.properties.time AS event_time,
	   f.properties.lastupdate,
   f.properties.lon,
   f.properties.lat,
   f.properties.depth,
   f.properties.mag,
   f.properties.flynn_region
FROM “seismic”.”sp_events”
CROSS JOIN UNNEST (features) as t(f)

The CROSS JOIN UNNEST syntax flattens the embedded array, generating hundreds of individual event records per day.

Now that the data has been copied and flattened into individual event records (shown in the following image), it’s ready for enrichment and aggregation.

Data enrichment

Earthquakes occur along a continuous range of spatial coordinates. In order to aggregate them, as we’ll be doing very soon, it’s necessary to first group them together. A convenient method is to assign every event into a Universal Transverse Mercator (UTM) zone. These zones are six-degree bands of longitudes that convert the spherical latitude/longitude coordinates into a 2D representation. Performing this conversion provides good granularity for visualization later.

The calculation to convert a spherical longitude/latitude coordinate into a two-dimensional UTM coordinate is complex. It can be performed ideally using an Amazon Redshift user-defined function (UDF). I chose a UDF for the ability to invoke it, via a Matillion component, in the next step.

CREATE OR REPLACE FUNCTION f_ll_utm (Lat float, Long float)
      RETURNS VARCHAR
STABLE
AS $$
From math import pi, sin, cos, tan, sqrt

_deg2rad = pi / 180.0
_rad2deg = 180.0 / pi

_EquatorialRadius = 1
_eccentricitySquared = 2
_ellipsoid = [ “WGS-84”, 6378137, 0.00669438]

The UDF has to return three pieces of information:

  • UTM Zone code
  • Easting (x-axis measurement in meters)
  • Northing (ditto, for the y-axis)

A scalar UDF can only return a single value. Therefore the three results were returned as a pipe-delimited string, in which the three values are pipe-separated:

To bring the values out into individual fields, the UDF is first invoked using a Matillion ETL Calculator component, followed by a field splitter and a Calculator to perform data type conversion and rounding.

Data aggregation

To reiterate, we’re interested in the depth of earthquake focus specifically on destructive plate boundaries. Knowing the depth helps us estimate the potential severity of earthquakes.

We need to find the average event depth within each UTM zone, in the expectation that a spatial pattern will appear that will highlight the subduction zones.

The last three steps in the Matillion transformation (shown in the following diagram) perform the necessary aggregation, add a depth quartile, and create an output table from the resulting data.

The ”Aggregate to UTM ref” step gets Amazon Redshift to perform a GROUP BY function in SQL, which approximates every event to the appropriate UTM zone. While doing this aggregation, you simultaneously do the following:

  • Count the events (which determines the size of the visual representation).
  • Find the average depth (which determines the color of the visual representation).
  • Determine the average latitude and longitude (which approximates to the center of the UTM zone, and determines the position of the visual representation).

The following image shows the aggregation type for each column:

Average depth is a useful measure, but to maximize the visual impact of the final presentation, we also take the opportunity to rank the results into quartiles. This allows the zones with the deepest quartile to stand out consistently on the map.

NTILE(4) OVER (ORDER BY "avg_depth")

Amazon Redshift is great at performing this type of analytics, which is delivered inside another Matillion ETL Calculator component.

The Recreate Output step materializes the dataset into an Amazon Redshift table, ready for Amazon QuickSight to visualize.

Amazon QuickSight visualization

The Amazon QuickSight “points on map” visualization is perfect for this 2D rendering. The values for the field wells come straight from the aggregated data in Amazon Redshift:

  • Geospatial — the average lat/long per UTM grid.
  • Size — the record count, in other words, the number of events in that UTM zone.
  • Color — the Depth Ntile, with the fourth quartile in pink.

The resulting map shows the global subduction zones highlighted clearly in pink, these being the areas with the deepest earthquake’s focus on average.

Recap and summary

In this post, I used seismological data as an example to explore challenges around the visualization of unstructured data and to provide best practices. I suggested a way to overcome these challenges with an architecture that is also applicable for datasets from a wide array of sources, beyond geology. I then explored how to orchestrate activities of different data processing tasks between S3, Athena, and Amazon Redshift using Matillion ETL for Amazon Redshift.

If you’re trying to solve similar analytics challenges and want to see how Amazon Redshift and Matillion can help, launch a 14 day free trial of Matillion ETL for Amazon Redshift on the AWS Marketplace or schedule a demo today. If you have questions or suggestions, please comment below.


Additional Reading

If you found this post helpful, be sure to check out Top 8 Best Practices for High-Performance ETL Processing Using Amazon Redshift, and Orchestrate multiple ETL jobs using AWS Step Functions and AWS Lambda.

 


About the Author

Ian Funnell, a lead solution architect at Matillion, is a seasoned cloud and data warehousing specialist. Ian is dedicated to supporting customers in driving their data transformation forward and solving their deepest technical challenges. Ian’s has 25+ years experience in the tech industry.

 

 

 

Restrict access to your AWS Glue Data Catalog with resource-level IAM permissions and resource-based policies

Post Syndicated from Ben Snively original https://aws.amazon.com/blogs/big-data/restrict-access-to-your-aws-glue-data-catalog-with-resource-level-iam-permissions-and-resource-based-policies/

A data lake provides a centralized repository that you can use to store all your structured and unstructured data at any scale. A data lake can include both raw datasets and curated, query-optimized datasets. Raw datasets can be quickly ingested, in their original form, without having to force-fit them into a predefined schema. Using data lakes, you can run different types of analytics on both raw and curated datasets. By using Amazon S3 as the storage layer of your data lakes, you can have a set of rich controls at both the bucket and object level. You can use these to define access control policies for the datasets in your lake.

The AWS Glue Data Catalog is a persistent, fully managed metadata store for your data lake on AWS. Using the Glue Data Catalog, you can store, annotate, and share metadata in the AWS Cloud in the same way you do in an Apache Hive Metastore. The Glue Data Catalog also has seamless out-of-box integration with Amazon Athena, Amazon EMR, and Amazon Redshift Spectrum.

Using AWS Glue, you can also create policies to restrict access to different portions of the catalog based on users, roles, or applied at a resource level. With these policies, you can provide granular control over which users can access the various metadata definitions in your data lake.

Important: The S3 and the AWS Glue Data Catalog policies define the access permissions to the data and metadata definitions respectively. In other words, the AWS Glue Data Catalog policies define the access to the metadata, and the S3 policies define the access to the content itself.

You can restrict which metadata operations can be performed, such as GetDatabases, GetTables, and CreateTable, and others using identiy-based policies (IAM). You can also restrict which data catalog objects those operations are performed on. Additionally, you can limit which catalog objects get returned in the resulting call. A Glue Data Catalog “object” here refers to a database, a table, a user-defined function, or a connection stored in the Glue Data Catalog.

Suppose that you have users that require read access to your production databases and tables in your data lake, and others have additional permissions to dev resources. Suppose also that you have a data lake storing both raw data feeds and curated datasets used by business intelligence, analytics, and machine learning applications. You can set these configurations easily, and many others, using the access control mechanisms in the AWS Glue Data Catalog.

Note: The following example shows how to set up a policy on the AWS Glue Data Catalog. It doesn’t set up the related S3 bucket or object level policies. This means the metadata isn’t discoverable when using Athena, EMR, and tools integrating with the AWS Glue Data Catalog. At the point when someone tries to access an S3 object directly, S3 policy enforcement is important. You should use Data Catalog and S3 bucket or object level policies together.

Fine-grained access control

You can define the access to the metadata using both resource-based and identity-based policies, depending on your organization’s needs. Resource-based policies list the principals that are allowed or denied access to your resources, allowing you to set up policies such as cross-account access. Identity policies are specifically attached to users, groups, and roles within IAM.

The fine-grained access portion of the policy is defined within the Resource clause. This portion defines both the AWS Glue Data Catalog object that the action can be performed on, and what resulting objects get returned by that operation.

Let’s run through an example. Suppose that you want to define a policy that allows a set of users to access only the finegrainacces database. The policy also allows users to return all the tables listed within the database. For the GetTables actions, the resource definitions include these resource statements:

"arn:aws:glue:us-east-1:123456789012:catalog",
"arn:aws:glue:us-east-1:123456789012:database/finegrainaccess",
"arn:aws:glue:us-east-1:123456789012:table/finegrainaccess/*"

The first resource statement at the database Amazon Resource Name (ARN) allows the user to call the operation on the finegrainaccess database. The second ARN allows all the tables within that database to be returned.

Now, what if we want to return only the tables that started with “dev_” from the “finegrainaccess” database? If so, this is how the policy changes:

"arn:aws:glue:us-east-1:123456789012:catalog",
"arn:aws:glue:us-east-1:123456789012:db/finegrainaccess",           
"arn:aws:glue:us-east-1:123456789012:tables/finegrainaccess/dev_*"  

Now, we are specifying dev_ as part of the table’s ARN in the second resource definition. This approach also works with actions for getting the list of databases, partitions for a table, connections, and other operations in the catalog.

Taking it for a spin

Note: This post focuses on the policies for AWS Glue Data Catalog. If you look closely, all of these datasets are pointing to the same S3 locations, which are world-readable. In a full example, you should also set the necessary S3 bucket or object level permissions, or both.

Next, we show an example you can do yourself. The next example creates the following in a Data Catalog.

We set up two users in the example, as shown following.

In the AWS Management Console, launch the AWS CloudFormation template.

Choose Next.

Important: Enter a password for the IAM users to be created. These users will have permissions to run Athena queries, access to your Athena S3 results bucket, and see the AWS Glue databases and tables that the CloudFormation script creates. These permissions must match the minimum requirements of the IAM password policy on the account that you run this example from.

Choose Next, and then on the next page choose Next again.

Lastly, acknowledge that the template will create IAM users and policies.

Then choose Create.

When you refresh your CloudFormation page, you can see the script creating the example resources.

Wait until it’s complete.

This script creates the necessary IAM users and policies attached to them, along with the necessarily databases and tables listed preceding.

After the CloudFormation script completes, you should see these tables if using an administrator user.

If you look on the Outputs tab, you can see the two IAM users that were created along with your IAM sign-in URL.

Note:

If you click the sign-in link in the same browser, the system logs you out. A nice trick is to right-click and open a private or incognito window.

If the provided IAM password doesn’t meet the minimum requirements, you see this message in the CloudFormation script event log:

The specified password is invalid … <why it was invalid>

Looking in the AWS Glue Data Catalog, you can see the tables that just got created by the script.

We can see the script created the structure that we outlined preceding.

Let’s check the two user profiles. If you go into IAM and users, they are set as inline policies. You should see the following for each user.

For the AWS Glue dev user, this section gives us full access to anything in the dev databases:

This section gives us the ability to query and see the prod database:

Lastly, this section gives us access to get tables and partitions from the prod database. You can structure this section so that it explicitly lists the blog_prod database in the resource and only allow that. The following lets someone query for database/* and return only the blog_prod tables. This, in fact, is the default behavior of the console.

Without this, you could still query those two databases explicitly, but the policy would not allow a wildcard query such as the following.

In contrast, the QA user doesn’t have access to the dev database and can only see the tables that start with prod_in the prod database. So the following is what the QA user’s policy looks like.

The query for the prod database is as follows:

Only GetTables and GetPartitions are available for the tables starting with prod_.

Notice the “prod_*” in the resource definition following.

Querying based on the different users

Logging in as the two different IAM users created by the AWS CloudFormation output tab and the password you provided, you can see some differences.

Notice that the QA user can’t see any metadata definitions for the blog_dev database, or the staging_yellow table in the blog_prod database.

Next, sign in as blog_dev_user and go to the Athena console. Notice that the blog user only sees the databases and tables listed that this user is permitted to.

The dev user can create a table under blog_dev, but not the blog_prod database.

Now let’s look under dev_qa_user. Notice that we only see the blog_prod and prod_* tables in Athena.

The QA user can query the datasets that user can see, but the policy doesn’t let that user create a database or tables.

If the QA user tries to query through Athena, and manually pull the metadata outside the console, that user can’t see any of the information. You can test this by running the following.

select * from blog_dev.yellow limit 10;

Conclusion

Data cataloging is an important part of many analytical systems. The AWS Glue Data Catalog provides integration with a wide number of tools. Using the Data Catalog, you also can specify a policy that grants permissions to objects in the Data Catalog. Data lakes require detailed access control at both the content level and the level of the metadata describing the content. In this example, we show how you can define the access policies for the metadata in the catalog.


Additional Reading

Learn how to harmonize, search, and analyze loosely coupled datasets on AWS.

 


About the Author

Ben Snively is a Public Sector Specialist Solutions Architect. He works with government, non-profit and education customers on big data and analytical projects, helping them build solutions using AWS. In his spare time, he adds IoT sensors throughout his house and runs analytics on it.

 

 

 

Store, Protect, Optimize Your Healthcare Data with AWS: Part 2

Post Syndicated from Stephen Jepsen original https://aws.amazon.com/blogs/architecture/store-protect-optimize-your-healthcare-data-with-aws-part-2/

Leveraging Analytics and Machine Learning Tools for Readmissions Prediction

This blog post was co-authored by Ujjwal Ratan, a senior AI/ML solutions architect on the global life sciences team.

In Part 1, we looked at various options to ingest and store sensitive healthcare data using AWS. The post described our shared responsibility model and provided a reference architecture that healthcare organizations could use as a foundation to build a robust platform on AWS to store and protect their sensitive data, including protected health information (PHI). In Part 2, we will dive deeper into how customers can optimize their healthcare datasets for analytics and machine learning (ML) to address clinical and operational challenges.

There are a number of factors creating pressures for healthcare organizations, both providers and payers, to adopt analytic tools to better understand their data: regulatory requirements, changing reimbursement models from volume- to value-based care, population health management for risk-bearing organizations, and movement toward personalized medicine. As organizations deploy new solutions to address these areas, the availability of large and complex datasets from electronic health records, genomics, images (for example, CAT, PET, MRI, ultrasound, X-ray), and IoT has been increasing. With these data assets growing in size, healthcare organizations want to leverage analytic and ML tools to derive new actionable insights across their departments.

One example of the use of ML in healthcare is diagnostic image analysis, including digital pathology. Pathology is extremely important in diagnosing and treating patients, but it is also extremely time-consuming and largely a manual process. While the complexity and quantity of workloads are increasing, the number of pathologists is decreasing. According to one study, the number of active pathologists could drop by 30 percent by 2030 compared to 2010 levels. (1) A cloud architecture and solution can automate part of the workflow, including sample management, analysis, storing, sharing, and comparison with previous samples to complement existing provider workflows effectively. A recent study using deep learning to analyze metastatic breast cancer tissue samples resulted in an approximately 85% reduction in human error rate. (2)

ML is also being used to assist radiologists in examining other diagnostic images such as X-rays, MRIs, and CAT scans. Having large quantities of images and metadata to train the algorithms that are the key to ML is one of the main challenges for ML adoption. To help address this problem, the National Institutes of Health recently released 90,000 X-ray plates tagged either with one of 14 diseases or tagged as being normal. Leading academic medical centers are using these images to build their neural networks and train their algorithms. With advanced analytics and ML, we can answer the hard questions such as “what is the next best action for my patient, the expected outcome, and the cost.”

The foundations for a great analytical layer

Let’s pick up from where we left off in Part 1. We have seen how providers can ingest data into AWS from their data centers and store it securely into different services depending on the type of data. For example:

  1. All object data is stored in Amazon S3, Amazon S3 Infrequent Access, or Amazon Glacier depending on how often they are used.
  2. Data from the provider’s database is either processed and stored as objects in Amazon S3 or aggregated into data marts on Amazon Redshift.
  3. Metadata of the objects on Amazon S3 are maintained in the DynamoDB database.
  4. Amazon Athena is used to query the objects directly stored on Amazon S3 to address ad hoc requirements.

We will now look at two best practices that are key to building a robust analytical layer using these datasets.

  1. Separating storage and compute: You should not be compelled to scale compute resources just to store more data. The scaling rules of the two layers should be separate.
  2. Leverage the vast array of AWS big data services when it comes to building the analytical platforms instead of concentrating on just a few of them. Remember, one size does not fit all.

Technical overview

In this overview, we will demonstrate how we can leverage AWS big data and ML services to build a scalable analytical layer for our healthcare data. We will use a single source of data stored in Amazon S3 for performing ad hoc analysis using Amazon Athena, integrate it with a data warehouse on Amazon Redshift, build a visual dashboard for some metrics using Amazon QuickSight, and finally build a ML model to predict readmissions using Amazon SageMaker. By not moving the data around and just connecting to it using different services, we avoid building redundant copies of the same data. There are multiple advantages to this approach:

  1. We optimize our storage. Not having redundant copies reduces the amount of storage required.
  2. We keep the data secure with only authorized services having access to it. Keeping multiple copies of the data can result in higher security risk.
  3. We are able to scale the storage and compute separately as needed.
  4. It becomes easier to manage the data and monitor usage metrics centrally such as how often the data has been accessed, who has been accessing it, and what has been the growth pattern of the data over a period of time. These metrics can be difficult to aggregate if the data is duplicated multiple times.

Let’s build out this architecture using the following steps:

  1. Create a database in AWS Glue Data Catalog

We will do this using a Glue crawler. First create a JSON file that contains the parameters for the Glue crawler.

{
"Name": "readmissions",
"Role": "arn of the role for Glue",
"DatabaseName": "readmissions",
"Description": "glue data catalog for storing readmission data",
"Targets": {
"S3Targets": [
{
"Path": "s3://<bucket>/<prefix>"
},
{
"Path": "s3://<bucket>/<prefix>"
}
]
}
}

As you can see, the crawler will crawl two locations in Amazon S3 and save the resulting tables in a new database called “readmissions.” Replace the role ARN and Amazon S3 locations with your corresponding details. Save this in a file create_crawler.json. Then from the AWS CLI, call the following command to create the crawler:

aws glue create-crawler --cli-input-json file://create_crawler.json

Once the crawler is created, run it by calling the following command:

aws glue start-crawler --name readmissions

Log on to the AWS Glue console, navigate to the crawlers, and wait until the crawler completes running.

This will create two tables — phi and non-phi — in a database named “readmissions” in the AWS Glue Data Catalog as shown below.

  1. Query the data using Athena

The Amazon Glue Data Catalog is seamlessly integrated with Amazon Athena. For details on how to enable this, see Integration with AWS Glue.

As a result of this integration, the tables created using the Glue crawler can now be queried using Amazon Athena. Amazon Athena allows you to do ad hoc analysis on the dataset. You can do exploratory analysis on the data and also determine its structure and quality. This type of upfront ad hoc analysis is invaluable for ensuring the data quality in your downstream data warehouse or your ML algorithms that will make use of this data for training models. In the next few sections, we will explore these aspects in greater detail.

To query the data using Amazon Athena, navigate to the Amazon Athena console.

NOTE: Make sure the region is the same as the region you chose in the previous step. If it’s not the same, switch the region by using the drop-down menu on the top right-hand corner of the screen.

Once you arrive in the Amazon Athena console, you should already see the tables and databases you created previously, and you should be able to see the data in the two tables by writing Amazon Athena queries. Here is a list of the top 10 rows from the table readmissions.nonphi:

Now that we are able to query the dataset, we can run some queries for exploratory analysis. Here are just a few examples:

AnalysisAmazon Athena Query
How many Patients have been discharged to home?SELECT count(*) from nonphi where discharge_disposition = ‘Discharged to home’
What’s the minimum and the maximum number of procedures carried out on a patient?SELECT min(num_procedures), max(num_procedures) from nonphi
How many patients were referred to this hospital by another physician?SELECT count(*) FROM nonphi group by admission_source having admission_source = ‘Physician Referral’
What were the top 5 specialties with positive readmissions?

SELECT count(readmission_result) as num_readmissions, medical_specialty from

(select readmission_result,medical_specialty from nonphi where readmission_result = ‘Yes’)

group by medical_specialty order by num_readmissions desc limit 5

Which payer was responsible for paying for treatments that involved more than 5 procedures?SELECT distinct payer_code from nonphi where num_procedures >5 and payer_code !='(null)’

While this information is valuable, you typically do not want to invest too much time and effort into building an ad hoc query platform like this because at this stage, you are not even sure if the data is of any value for your business-critical analytical applications. One benefit of using Amazon Athena for ad hoc analysis is that it requires little effort or time. It uses Schema-On-Read instead of schema on write, allowing you to work with various source data formats without worrying about the underlying structures. You can put the data on Amazon S3 and start querying immediately.

  1. Create an external table in Amazon Redshift Spectrum with the same data

Now that we are satisfied with the data quality and understand the structure of the data, we would like to integrate this with a data warehouse. We’ll use Amazon Redshift Spectrum to create external tables on the files in S3 and then integrate these external tables with a physical table in Amazon Redshift.

Amazon Redshift Spectrum allows you to run Amazon Redshift SQL queries against data on Amazon S3, extending the capabilities of your data warehouse beyond the physical Amazon Redshift clusters. You don’t need to do any elaborate ETL or move the data around. The data exists in one place in Amazon S3 and you interface with it using different services (Athena and Redshift Spectrum) to satisfy different requirements.

Before beginning, please look at this step by step guide to set up Redshift Spectrum.

After you have set up Amazon Redshift Spectrum, you can begin executing the steps below:

  1. Create an external schema called “readmissions.” Amazon Redshift Spectrum integrates with the Amazon Glue Data Catalog and allows you to create spectrum tables by referring the catalog. This feature allows you to build the external table on the same data that you analyzed with Amazon Athena in the previous step without the need for ETL. This can be achieved by the following:
create external schema readmissions
from data catalog
database 'readmissions'
iam_role 'arn for your redshift spectrum role '
region ‘region when the S3 data exists’;

NOTE: Make sure you select the appropriate role arn and region.

  1. Once the command executes successfully, you can confirm the schema was created by running the following:
select * from svv_external_schemas;

You should see a row similar to the one above with your corresponding region and role.

You can also see the external tables that were created by running the following command:

select * from SVV_EXTERNAL_TABLES;

  1. Let’s confirm we can see all the rows in the external table by counting the number of rows:
select count(*) from readmissions.phi;
select count(*) from readmissions.nonphi;

You should see 101,766 rows in both the tables, confirming that your external tables contain all the records that you read using the AWS Glue data crawler and analyzed using Athena.

  1. Now that we have all the external tables created, let’s create an aggregate fact table in the physical Redshift data warehouse. We can use the “As Select” clause of the Redshift create table query to do this:
create table readmissions_aggregate_fact as
select
readmission_result,admission_type,discharge_disposition,diabetesmed,
avg(time_in_hospital) as avg_time_in_hospital,
min(num_procedures) as min_procedures,
max(num_procedures) as max_procedures,
avg(num_procedures) as avg_num_procedures,
avg(num_medications) as avg_num_medications,
avg(number_outpatient) as avg_number_outpatient,
avg(number_emergency) as avg_number_emergency,
avg(number_inpatient) as avg_number_inpatient,
avg(number_diagnoses) as avg_number_diagnoses
from readmissions.nonphi
group by readmission_result,admission_type,discharge_disposition,diabetesmed

Once this query executes successfully, you can see a new table created in the physical public schema of your Amazon Redshift cluster. You can confirm this by executing the following query:

select distinct(tablename) from pg_table_def where schemaname = 'public'

  1. Build a QuickSight Dashboard from the aggregate fact

We can now create dashboards to visualize the data in our readmissions aggregate fact table using Amazon QuickSight. Here are some examples of reports you can generate using Amazon QuickSight on the readmission data.

For more details on Amazon QuickSight, refer to the service documentation.

  1. Build a ML model in Amazon SageMaker to predict readmissions

As a final step, we will create a ML model to predict the attribute readmission_result, which denotes if a patient was readmitted or not, using the non-PHI dataset.

  1. Create a notebook instance in Amazon SageMaker that is used to develop our code.
  2. The code reads non-PHI data from the Amazon S3 bucket as a data frame in Python. This is achieved using the pandas.readcsv function.

  1. Use the pandas.get_dummies function to encode categorical values into numeric values for use with the model.

  1. Split the data into two, 80% for training and 20% for testing, using the numpy.random.rand function.

  1. Form train_X, train_y and test_X, test_y corresponding to training features, training labels, testing features, and testing labels respectively.

  1. Use the Amazon SageMaker Linear learner algorithm to train our model. The implementation of the algorithm uses dense tensor format to optimize the training job. Use the function write_numpy_to_dense_tensor from the Amazon SageMaker library to convert the numpy array into the dense tensor format.

  1. Create the training job in Amazon SageMaker with appropriate configurations and run it.

  1. Once the training job completes, create an endpoint in Amazon SageMaker to host our model, using the linear.deploy function to deploy the endpoint.

  1. Finally, run a prediction by invoking the endpoint using the linear_predictor.predict function.

You can view the complete notebook here.

Data, analytics, and ML are strategic assets to help you manage your patients, staff, equipment, and supplies more efficiently. These technologies can also help you be more proactive in treating and preventing disease. Industry luminaries share this opinion: “By leveraging big data and scientific advancements while maintaining the important doctor-patient bond, we believe we can create a health system that will go beyond curing disease after the fact to preventing disease before it strikes by focusing on health and wellness,” writes Lloyd B. Minor, MD, dean of the Stanford School of Medicine.

ML and analytics offer huge value in helping achieve the quadruple aim : improved patient satisfaction, improved population health, improved provider satisfaction, and reduced costs. Technology should never replace the clinician but instead become an extension of the clinician and allow them to be more efficient by removing some of the mundane, repetitive tasks involved in prevention, diagnostics, and treatment of patients.

(1) “The Digital Future of Pathology.” The Medical Futurist, 28 May 2018, medicalfuturist.com/digital-future-pathology.

(2) Wang, Dayong, et al. “Deep Learning for Identifying Metastatic Breast Cancer.” Deep Learning for Identifying Metastatic Breast Cancer, 18 June 2016, arxiv.org/abs/1606.05718.

About the Author

Stephen Jepsen is a Global HCLS Practice Manager in AWS Professional Services.

 

How to build a front-line concussion monitoring system using AWS IoT and serverless data lakes – Part 2

Post Syndicated from Saurabh Shrivastava original https://aws.amazon.com/blogs/big-data/how-to-build-a-front-line-concussion-monitoring-system-using-aws-iot-and-serverless-data-lakes-part-2/

In part 1 of this series, we demonstrated how to build a data pipeline in support of a data lake. We used key AWS services such as Amazon Kinesis Data Streams, Kinesis Data Analytics, Kinesis Data Firehose, and AWS Lambda. In part 2, we discuss how to process and visualize the data by creating a serverless data lake that uses key analytics to create actionable data.

Create a serverless data lake and explore data using AWS Glue, Amazon Athena, and Amazon QuickSight

As we discussed in part 1, you can store heart rate data in an Amazon S3 bucket using Kinesis Data Streams. However, storing data in a repository is not enough. You also need to be able to catalog and store the associated metadata related to your repository so that you can extract the meaningful pieces for analytics.

For a serverless data lake, you can use AWS Glue, which is a fully managed data catalog and ETL (extract, transform, and load) service. AWS Glue simplifies and automates the difficult and time-consuming tasks of data discovery, conversion, and job scheduling. As you get your AWS Glue Data Catalog data partitioned and compressed for optimal performance, you can use Amazon Athena for the direct query to S3 data. You can then visualize the data using Amazon QuickSight.

The following diagram depicts the data lake that is created in this demonstration:

Amazon S3 now has the raw data stored from the Kinesis process. The first task is to prepare the Data Catalog and identify what data attributes are available to query and analyze. To do this task, you need to create a database in AWS Glue that will hold the table created by the AWS Glue crawler.

An AWS Glue crawler scans through the raw data available in an S3 bucket and creates a data table with a Data Catalog. You can add a scheduler to the crawler to run periodically and scan new data as required. For specific steps to create a database and crawler in AWS Glue, see the blog post Build a Data Lake Foundation with AWS Glue and Amazon S3.

The following figure shows the summary screen for a crawler configuration in AWS Glue:

After configuring the crawler, choose Finish, and then choose Crawler in the navigation bar. Select the crawler that you created, and choose Run crawler.

The crawler process can take 20–60 seconds to initiate. It depends on the Data Catalog, and it creates a table in your database as defined during the crawler configuration.

You can choose the table name and explore the Data Catalog and table:

In the demonstration table details, our data has three attribute time stamps as value_time, the person’s ID as id, and the heart rate as colvalue. These attributes are identified and listed by the AWS Glue crawler. You can see other information such as the data format (text) and the record count (approx. 15,000 with each record size of 61 bytes).

You can use Athena to query the raw data. To access Athena directly from the AWS Glue console, choose the table, and then choose View data on the Actions menu, as shown following:

As noted, the data is currently in a JSON format and we haven’t partitioned it. This means that Athena continues to scan more data, which increases the query cost. The best practice is to always partition data and to convert the data into a columnar format like Apache Parquet or Apache ORC. This reduces the amount of data scans while running a query. Having fewer data scans means better query performance at a lower cost.

To accomplish this, AWS Glue generates an ETL script for you. You can schedule it to run periodically for your data processing, which removes the necessity for complex code writing. AWS Glue is a managed service that runs on top of a warm Apache Spark cluster that is managed by AWS. You can run your own script in AWS Glue or modify a script provided by AWS Glue that meets your requirements. For examples of how to build a custom script for your solution, see Providing Your Own Custom Scripts in the AWS Glue Developer Guide.

For detailed steps to create a job, see the blog post Build a Data Lake Foundation with AWS Glue and Amazon S3. The following figure shows the final AWS Glue job configuration summary for this demonstration:

In this example configuration, we enabled the job bookmark, which helps AWS Glue maintain state information and prevents the reprocessing of old data. You only want to process new data when rerunning on a scheduled interval.

When you choose Finish, AWS Glue generates a Python script. This script processes your data and stores it in a columnar format in the destination S3 bucket specified in the job configuration.

If you choose Run Job, it takes time to complete depending on the amount of data and data processing units (DPUs) configured. By default, a job is configured with 10 DPUs, which can be increased. A single DPU provides processing capacity that consists of 4 vCPUs of compute and 16 GB of memory.

After the job is complete, inspect your destination S3 bucket, and you will find that your data is now in columnar Parquet format.

Partitioning has emerged as an important technique for organizing datasets so that they can be queried efficiently by a variety of big data systems. Data is organized in a hierarchical directory structure based on the distinct values of one or more columns. For information about efficiently processing partitioned datasets using AWS Glue, see the blog post Work with partitioned data in AWS Glue.

You can create triggers for your job that run the job periodically to process new data as it is transmitted to your S3 bucket. For detailed steps on how to configure a job trigger, see Triggering Jobs in AWS Glue.

The next step is to create a crawler for the Parquet data so that a table can be created. The following image shows the configuration for our Parquet crawler:

Choose Finish, and execute the crawler.

Explore your database, and you will notice that one more table was created in the Parquet format.

You can use this new table for direct queries to reduce costs and to increase the query performance of this demonstration.

Because AWS Glue is integrated with Athena, you will find in the Athena console an AWS Glue catalog already available with the table catalog. Fetch 10 rows from Athena in a new Parquet table like you did for the JSON data table in the previous steps.

As the following image shows, we fetched the first 10 rows of heartbeat data from a Parquet format table. This same Athena query scanned only 4.99 KB of data compared to 205 KB of data that was scanned in a raw format. Also, there was a significant improvement in query performance in terms of run time.

Visualize data in Amazon QuickSight

Amazon QuickSight is a data visualization service that you can use to analyze data that has been combined. For more detailed instructions, see the Amazon QuickSight User Guide.

The first step in Amazon QuickSight is to create a new Amazon Athena data source. Choose the heartbeat database created in AWS Glue, and then choose the table that was created by the AWS Glue crawler.

Choose Import to SPICE for quicker analytics. This option creates a data cache and improves graph loading. All non-database datasets must use SPICE. To learn more about SPICE, see Managing SPICE Capacity.

Choose Visualize, and wait for SPICE to import the data to the cache. You can also schedule a periodic refresh so that new data is loaded to SPICE as the data is pipelined to the S3 bucket.

When the SPICE import is complete, you can create a visual dashboard easily. The following figure shows graphs displaying the occurrence of heart rate records per device.  The first graph is a horizontally stacked bar chart, which shows the percentage of heart rate occurrence per device. In the second graph, you can visualize the heart rate count group to the heart rate device.

Conclusion

Processing streaming data at scale is relevant in every industry. Whether you process data from wearables to tackle human health issues or address predictive maintenance in manufacturing centers, AWS can help you simplify your data ingestion and analysis while keeping your overall IT expenditure manageable.

In this two-part series, you learned how to ingest streaming data from a heart rate sensor and visualize it in such a way to create actionable insights. The current state of the art available in the big data and machine learning space makes it possible to ingest terabytes and petabytes of data and extract useful and actionable information from that process.


Additional Reading

If you found this post useful, be sure to check out Work with partitioned data in AWS Glue, and 10 visualizations to try in Amazon QuickSight with sample data.

 


About the Authors

Saurabh Shrivastava is a partner solutions architect and big data specialist working with global systems integrators. He works with AWS partners and customers to provide them architectural guidance for building scalable architecture in hybrid and AWS environments.

 

 

 

Abhinav Krishna Vadlapatla is a Solutions Architect with Amazon Web Services. He supports startups and small businesses with their cloud adoption to build scalable and secure solutions using AWS. During his free time, he likes to cook and travel.

 

 

 

John Cupit is a partner solutions architect for AWS’ Global Telecom Alliance Team. His passion is leveraging the cloud to transform the carrier industry. He has a son and daughter who have both graduated from college. His daughter is gainfully employed, while his son is in his first year of law school at Tulane University. As such, he has no spare money and no spare time to work a second job.

 

 

David Cowden is partner solutions architect and IoT specialist working with AWS emerging partners. He works with customers to provide them architectural guidance for building scalable architecture in IoT space.

 

 

 

Josh Ragsdale is an enterprise solutions architect at AWS. His focus is on adapting to a cloud operating model at very large scale. He enjoys cycling and spending time with his family outdoors.

 

 

 

Pierre-Yves Aquilanti, Ph.D., is a senior specialized HPC solutions architect at AWS. He spent several years in the oil & gas industry to optimize R&D applications for large scale HPC systems and enable the potential of machine learning for the upstream. He and his family crave to live in Singapore again for the human, cultural experience and eat fresh durians.

 

 

Manuel Puron is an enterprise solutions architect at AWS. He has been working in cloud security and IT service management for over 10 years. He is focused on the telecommunications industry. He enjoys video games and traveling to new destinations to discover new cultures.

 

How to build a front-line concussion monitoring system using AWS IoT and serverless data lakes – Part 1

Post Syndicated from Saurabh Shrivastava original https://aws.amazon.com/blogs/big-data/how-to-build-a-front-line-concussion-monitoring-system-using-aws-iot-and-serverless-data-lakes-part-1/

Sports-related minor traumatic brain injuries (mTBI) continue to incite concern among different groups in the medical, sports, and parenting community. At the recreational level, approximately 1.6–3.8 million related mTBI incidents occur in the United States every year, and in most cases, are not treated at the hospital. (See “The epidemiology and impact of traumatic brain injury: a brief overview” in Additional resources.) The estimated medical and indirect costs of minor traumatic brain injury are reaching $60 billion annually.

Although emergency facilities in North America collect data on admitted traumatic brain injuries (TBI) cases, there isn’t meaningful data on the number of unreported mTBIs among athletes. Recent studies indicate a significant rate of under-reporting of sports-related mTBI due to many factors. These factors include the simple inability of team staff to either recognize the signs and symptoms or to actually witness the impact. (See “A prospective study of physician-observed concussions during junior ice hockey: implications for incidence rates” in Additional resources.)

The majority of players involved in hockey and football are not college or professional athletes. There are over 3 million youth hockey players and approximately 5 million registered participants in football. (See “Head Impact Exposure in Youth Football” in Additional resources.) These recreational athletes don’t have basic access to medical staff trained in concussion recognition and sideline injury assessment. A user-friendly measurement and a smartphone-based assessment tool would facilitate the process between identifying potential head injuries, assessment, and return to play (RTP) criteria.

Recently, the use of instrumented sports helmets, including the Head Impact Telemetry System (HITS), has allowed for detailed recording of impacts to the head in many research trials. This practice has led to recommendations to alter contact in practices and certain helmet design parameters. (See “Head impact severity measures for evaluating mild traumatic brain injury risk exposure” in Additional resources.) However, due to the higher costs of the HITS system and complexity of the equipment, it is not a practical impact alert device for the general recreational population.

A simple, practical, and affordable system for measuring head trauma within the sports environment, subject to the absence of trained medical personnel, is required.

Given the proliferation of smartphones, we felt that this was a practical device to investigate to provide this type of monitoring.  All smartphone devices have an embedded Bluetooth communication system to receive and transmit data at various ranges.  For the purposes of this demonstration, we chose a class 1 Bluetooth device as the hardware communication method. We chose it because of its simplicity, widely accepted standard, and compatibility to interface with existing smartphones and IoT devices.

Remote monitoring typically involves collecting information from devices (for example, wearables) at the edge, integrating that information into a data lake, and generating inferences that can then be served back to the relevant stakeholders. Additionally, in some cases, compute and inference must also be done at the edge to shorten the feedback loop between data collection and response.

This use case can be extended to many other use cases in myriad verticals. In this two-part series, we show you how to build a data pipeline in support of a data lake. We use key AWS services such as Amazon Kinesis Data Streams, Kinesis Data Analytics, Kinesis Data Firehose, and AWS Lambda. In part 2, we focus on generating simple inferences from that data that can support RTP parameters.

Architectural overview

Here is the AWS architecture that we cover in this two-part series:

Note: For the purposes of our demonstration, we chose to use heart rate monitoring sensors rather than helmet sensors because they are significantly easier to acquire. Both types of sensors are very similar in how they transmit data. They are also very similar in terms of how they are integrated into a data lake solution.

The resulting demonstration transfers the heartbeat data using the following components:

  • AWS Greengrass set up with a Raspberry Pi 3 to stream heart rate data into the cloud.
  • Data is ingested via Amazon Kinesis Data Streams, and raw data is stored in an Amazon S3 bucket using Kinesis Data Firehose. Find more details about writing to Kinesis Data Firehose using Kinesis Data Streams.
  • Kinesis Data Analytics averages out the heartbeat-per-minute data during stream data ingestion and passes the average to an AWS Lambda
  • AWS Lambda enriches the heartbeat data by comparing the real-time data with baseline information stored in Amazon DynamoDB.
  • AWS Lambda sends SMS/email alerts via an Amazon SNS topic if the heartbeat rate is greater than 120 BPM, for example.
  • AWS Glue runs an extract, transform, and load (ETL) job. This job transforms the data store in a JSON format to a compressed Apache Parquet columnar format and applies that transformed partition for faster query processing. AWS Glue is a fully managed ETL service for crawling data stored in an Amazon S3 bucket and building a metadata catalog.
  • Amazon Athena is used for ad hoc query analysis on the data that is processed by AWS Glue. This data is also available for machine learning processing using predictive analysis to reduce heart disease risk.
  • Amazon QuickSight is a fully managed visualization tool. It uses Amazon Athena as a data source and depicts visual line and pie charts to show the heart rate data in a visual dashboard.

All data pipelines are serverless and are refreshed periodically to provide up-to-date data.

You can use Kinesis Data Firehose to transform the data in the pipeline to a compressed Parquet format without needing to use AWS Glue. For the purposes of this post, we are using AWS Glue to highlight its capabilities, including a centralized AWS Glue Data Catalog. This Data Catalog can be used by Athena for ad hoc queries and by Apache Spark EMR to run complex machine learning processes. AWS Glue also lets you edit generated ETL scripts and supports “bring your own ETL” to process data for more complex use cases.

Configuring key processes to support the pipeline

The following sections describe how to set up and configure the devices and services used in the demonstration to build a data pipeline in support of a data lake.

Remote sensors and IoT devices

You can use commercially available heart rate monitors to collect electrocardiography (ECG) information such as heart rate. The monitor is strapped around the chest area with the sensor placed over the sternum for better accuracy. The monitor measures the heart rate and sends the data over Bluetooth Low Energy (BLE) to a Raspberry Pi 3. The following figure depicts the device-side architecture for our demonstration.

The Raspberry Pi 3 is host to both the IoT device and the AWS Greengrass core. The IoT device is responsible for connecting to the heart rate monitor over BLE and collecting the heart rate data. The collected data is then sent locally to the AWS Greengrass core, where it can be processed and routed to the cloud through a secure connection. The AWS Greengrass core serves as the “edge” gateway for the heart rate monitor.

Set up AWS Greengrass core software on Raspberry Pi 3

To prepare your Raspberry Pi for running AWS Greengrass software, follow the instructions in Environment Setup for Greengrass in the AWS Greengrass Developer Guide.

After setting up your Raspberry Pi, you are ready to install AWS Greengrass and create your first Greengrass group. Create a Greengrass group by following the steps in Configure AWS Greengrass on AWS IoT. Then install the appropriate certificates to the Raspberry Pi by following the steps to start AWS Greengrass on a core device.

The preceding steps deploy a Greengrass group that consists of three discrete configurable items: a device, a subscription list, and the connectivity information.

The core device is a set of code that is responsible for collecting the heart rate information from the sensor and sending it to the AWS Greengrass core. This device is using the AWS IoT Device SDK for Python including the Greengrass Discovery API.

Use the following AWS CLI command to create a Greengrass group:

aws greengrass create-group --name heartRateGroup

To complete the setup, follow the steps in Create AWS IoT Devices in an AWS Greengrass Group.

After you complete the setup, the heart rate data is routed from the device to the AWS IoT Core service using AWS Greengrass. As such, you need to add a single subscription in the Greengrass group to facilitate this message route:

Here, your device is named Heartrate_Sensor, and the target is the IoT Cloud on the topic iot/heartrate. That means that when your device publishes to the iot/heartrate topic, AWS Greengrass also sends this message to the AWS IoT Core service on the same topic. Then you can use the breadth of AWS services to process the data.

The connectivity information is configured to use the local host because the IoT device resides on the Raspberry Pi 3 along with the AWS Greengrass core software. The IoT device uses the Discovery API, which is responsible for retrieving the connectivity information of the AWS Greengrass core that the IoT device is associated with.

The IoT device then uses the endpoint and port information to open a secure TLS connection to AWS Greengrass core, where the heart rate data is sent. The AWS Greengrass core connectivity information should be depicted as follows:

The power of AWS Greengrass core is that you can deploy AWS Lambda functions and new subscriptions to process the heart rate information locally on the Raspberry Pi 3. For example, you can deploy an AWS Lambda function that can trigger a reaction if the detected heart rate is reaching a set threshold. In this scenario, different individuals might require different thresholds and responses, so you could theoretically deploy unique Lambda functions on a per-individual basis if needed.

Configure AWS Greengrass and AWS IoT Core

To enable further processing and storage of the heart rate data messages published from AWS Greengrass core to AWS IoT Core, create an AWS IoT rule. The AWS IoT rule retrieves messages published to the IoT/heartrate topic and sends them to the Kinesis data stream through an AWS IoT rule action for Kinesis action.  

Simulate heart rate data

You might not have access to an IoT device, but you still want to run a proof of concept (PoC) around heart rate use cases. You can simulate data by creating a shell script and deploying that data simulation script on an Amazon EC2 instance. Refer to the EC2 user guide to get started with Amazon EC2 Linux instances.

On the Amazon EC2 instance, create a shell script kinesis_client_HeartRate.sh, and copy the provided code to start writing some records into the Kinesis data stream. Be sure to create your Kinesis data stream and replace the variable <your_stream_name> in the following script.

#!/bin/sh
while true
do
  deviceID=$(( ( RANDOM % 10 )  + 1 ))
  heartRate=$(jot -r 1 60 140)
  echo "$deviceID,$heartRate"
  aws kinesis put-record --stream-name <your_stream_name> --data "$deviceID,$heartRate"$'\n' --partition-key $deviceID --region us-east-1
done

You can also use the Kinesis Data Generator to create data and then stream it to your solution or demonstration. For details on its use, see the blog post Test Your Streaming Data Solution with the New Amazon Kinesis Data Generator.

Ingest data using Kinesis and manage alerts with Lambda, DynamoDB, and Amazon SNS

Now you need to ingest data from the IoT device, which can be processed for real-time notifications when abnormal heart rates are detected.

Streaming data from the heart rate monitoring device is ingested to Kinesis Data Streams. Amazon Kinesis makes it easy to collect, process, and analyze real-time, streaming data. For this project, the data stream was configured with one open shard and a data retention period of 24 hours. This lets you send 1 MB of data or 1,000 events per second and read 2 MB of data per second. If you need to support more devices, you can scale up and add more shards using the UpdateShardCount API or the Amazon Kinesis scaling utility.

You can configure your data stream by using the following AWS CLI command (and then using the appropriate flag to turn on encryption).

aws kinesis create-stream --stream-name hearrate_stream --shard-count 1

You can use an AWS CloudFormation template to create the entire stack depicted in the following architecture diagram.

When launching an AWS CloudFormation template, be sure to enter your email address or mobile phone number with the appropriate endpoint protocol (“Email” or “SMS”) as parameters:

Alternatively, you can follow the manual steps in the documentation links that are provided in this post.

Streaming data in Kinesis can be processed and analyzed in real time by Kinesis clients. Refer to the Kinesis Data Streams Developer Guide to learn how to create a Kinesis data stream.

To identify abnormal heart rate information, you must use real-time analytics to detect abnormal behavior. You can use Kinesis Data Analytics to perform analytics on streaming data in real time. Kinesis Data Analytics consists of three configurable components: source, real-time analytics, and destination. Refer to the AWS documentation to learn the detailed steps to configure Kinesis Data Analytics.

Kinesis Data Analytics uses Kinesis Data Streams as the source stream for the data. In the source configuration process, if there are scenarios where in-filtering or masking records is required, you can preprocess records using AWS Lambda. The data in this particular case is relatively simple, so you don’t need preprocessing of records on the data.

The Kinesis Data Analytics schema editor lets you edit and transform the schema if required. In the following example, we transformed the second column to Value instead of COL_Value.

The SQL code to perform the real-time analysis of the data has to be copied to the SQL Editor for real-time analytics. The following is the sample code that was used for this demonstration.

“CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
                                   VALUEROWTIME TIMESTAMP,
                                   ID INTEGER, 
                                   COLVALUE INTEGER);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
  INSERT INTO "DESTINATION_SQL_STREAM" 
SELECT STREAM ROWTIME,
              ID,
              AVG("Value") AS HEARTRATE
FROM     "SOURCE_SQL_STREAM_001"
GROUP BY ID, 
         STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND) HAVING AVG("Value") > 120 OR AVG("Value") < 40;”

This code generates DESTINATION_SQL_STREAM. It inserts values into the stream only when the average value of the heart beat that is received from SOURCE_SQL_STREAM_001 is greater than 120 or less than 40 in the 60-second time window.

For more information about the tumbling window concept, see Tumbling Windows (Aggregations Using GROUP BY).

Next, add an AWS Lambda function as one of your destinations, and configure it as follows:

In the destination editor, make sure that the stream name selected is the DESTINATION_SQL_STREAM. You only want to trigger the Lambda function when anomalies in the heart rate are detected. The output format can be JSON or CSV. In this example, our Lambda function expects the data in JSON format, so we chose JSON.

Athlete and athletic trainer registration information is stored in the heartrate Registrations DynamoDB table. Amazon DynamoDB offers fully managed encryption at rest using an AWS Key Management Service (AWS KMS) managed encryption key for DynamoDB. You need to create a table with encryption at rest enabled. Follow the detailed steps in Amazon DynamoDB Encryption at Rest.

Each record in the table should include deviceid, customerid, firstname, lastname, and mobile. The following is an example table record for reference.

{
  "customerid": {
    "S": "3"
  },
  "deviceid": {
    "S": "7"
  },
  "email": {
    "S": "[email protected]"
  },
  "firstname": {
    "S": "John"
  },
  "lastname": {
    "S": "Smith"
  },
  "mobile": {
    "S": "19999999999"
  }
}

Refer to the DynamoDB Developer Guide for complete instructions for creating and populating a DynamoDB table.

The Lambda function is created to process the record passed from the Kinesis Data Analytics application.  The node.js Lambda function retrieves the athlete and athletic trainer information from the DynamoDB registrations table. It then alerts the athletic trainer to the event by sending a cellular text message via the Amazon Simple Notification Service (Amazon SNS).

Note: The default AWS account limit for Amazon SNS for mobile messages is $1.00 per month. You can increase this limit through an SNS Limit Increase case as described in AWS Service Limits.

You now create a new Lambda function with a runtime of Node.js 6.10 and choose the Create a custom role option for IAM permissions.  If you are new to deploying Lambda functions, see Create a Simple Lambda Function.

You must configure the new Lambda function with a specific IAM role, providing privileges to Amazon CloudWatch Logs, Amazon DynamoDB, and Amazon SNS as provided in the supplied AWS CloudFormation template.

The provided AWS Lambda function retrieves the HR Monitor Device ID and HR Average from the base64-encoded JSON message that is passed from Kinesis Data Analytics.  After retrieving the HR Monitor Device ID, the function then queries the DynamoDB Athlete registration table to retrieve the athlete and athletic trainer information.

Finally, the AWS Lambda function sends a mobile text notification (which does not contain any sensitive information) to the athletic trainer’s mobile number retrieved from the athlete data by using the Amazon SNS service.

To store the streaming data to an S3 bucket for further analysis and visualization using other tools, you can use Kinesis Data Firehose to connect the pipeline to Amazon S3 storage.  To learn more, see Create a Kinesis Data Firehose Delivery Stream.

Kinesis Data Firehose delivers the streaming data in intervals to the destination S3 bucket. The intervals can be defined using either an S3 buffer size or an S3 buffer interval (or both, whichever exceeds the first metric). The data in the Data Firehose delivery stream can be transformed. It also lets you back up the source record before applying any transformation. The data can be encrypted and compressed to GZip, Zip, or Snappy format to store the data in a columnar format like Apache Parquet and Apache ORC. This improves the query performance and reduces the storage footprint. You should enable error logging for operational and production troubleshooting.

Conclusion

In part 1 of this blog series, we demonstrated how to build a data pipeline in support of a data lake. We used key AWS services such as Kinesis Data Streams, Kinesis Data Analytics, Kinesis Data Firehose, and Lambda. In part 2, we’ll discuss how to deploy a serverless data lake and use key analytics to create actionable insights from the data lake.

Additional resources

Langlois, J.A., Rutland-Brown, W. & Wald, M., “The epidemiology and impact of traumatic brain injury: a brief overview,” Journal of Head Trauma Rehabilitation, Vol. 21, No. 5, 2006, pp. 375-378.

Echlin, S. E., Tator, C. H., Cusimano, M. D., Cantu, R. C., Taunton, J. E., Upshur E. G., Hall, C. R., Johnson, A. M., Forwell, L. A., Skopelja, E. N., “A prospective study of physician-observed concussions during junior ice hockey: implications for incidence rates,” Neurosurg Focus, 29 (5):E4, 2010

Daniel, R. W., Rowson, S., Duma, S. M., “Head Impact Exposure in Youth Football,” Annals of Biomedical Engineering., Vol. 10, 2012, 1007.

Greenwald, R. M., Gwin, J. T., Chu, J. J., Crisco, J. J., “Head impact severity measures for evaluating mild traumatic brain injury risk exposure,” Neurosurgery Vol. 62, 2008, pp. 789–79


Additional Reading

If you found this post useful, be sure to check out Setting Up Just-in-Time Provisioning with AWS IoT Core, and Real-time Clickstream Anomaly Detection with Amazon Kinesis Analytics.

 


About the Authors

Saurabh Shrivastava is a partner solutions architect and big data specialist working with global systems integrators. He works with AWS partners and customers to provide them architectural guidance for building scalable architecture in hybrid and AWS environments.

 

 

 

Abhinav Krishna Vadlapatla is a Solutions Architect with Amazon Web Services. He supports startups and small businesses with their cloud adoption to build scalable and secure solutions using AWS. During his free time, he likes to cook and travel.

 

 

 

John Cupit is a partner solutions architect for AWS’ Global Telecom Alliance Team.  His passion is leveraging the cloud to transform the carrier industry.  He has a son and daughter who have both graduated from college. His daughter is gainfully employed, while his son is in his first year of law school at Tulane University.  As such, he has no spare money and no spare time to work a second job.

 

 

David Cowden is partner solutions architect and IoT specialist working with AWS emerging partners. He works with customers to provide them architectural guidance for building scalable architecture in IoT space.

 

 

 

Josh Ragsdale is an enterprise solutions architect at AWS.  His focus is on adapting to a cloud operating model at very large scale. He enjoys cycling and spending time with his family outdoors.

 

 

 

Pierre-Yves Aquilanti, Ph.D., is a senior specialized HPC solutions architect at AWS. He spent several years in the oil & gas industry to optimize R&D applications for large scale HPC systems and enable the potential of machine learning for the upstream. He and his family crave to live in Singapore again for the human, cultural experience and eat fresh durians.

 

 

Manuel Puron is an enterprise solutions architect at AWS. He has been working in cloud security and IT service management for over 10 years. He is focused on the telecommunications industry. He enjoys video games and traveling to new destinations to discover new cultures.

 

How Pagely implemented a serverless data lake in AWS to facilitate customer support analytics

Post Syndicated from Joshua Eichorn original https://aws.amazon.com/blogs/big-data/how-pagely-implemented-a-serverless-data-lake-in-aws-to-facilitate-customer-support-analytics/

Pagely is an AWS Advanced Technology Partner providing managed WordPress hosting services. Our customers continuously push us to improve visibility into usage, billing, and service performance. To better serve these customers, the service team requires an efficient way to access the logs created by the application servers.

Historically, we relied on a shell script that gathered basic statistics on demand. When processing the logs for our largest customer, it took more than 8 hours to produce one report using an unoptimized process running on an Amazon EC2 instance—sometimes crashing due to resource limitations. Instead of putting more effort into fixing a legacy process, we decided it was time to implement a proper analytics platform.

All of our customer logs are stored in Amazon S3 as compressed JSON files. We use Amazon Athena to run SQL queries directly against these logs. This approach is great because there is no need for us to prepare the data. We simply define the table and query away. Although JSON is a supported format for Amazon Athena, it is not the most efficient format for use with regards to performance and cost. JSON files must be read in their entirety, even if you are only returning one or two fields from each row of data, resulting in scanning more data than is required. Additionally, the inefficiencies of processing JSON cause longer query times.

Querying the logs of our largest customers was not ideal with Athena, as we ran into the 30-minute query timeout limit. This limit can be increased, but the query was already taking longer than we wanted.

In this post, we discuss how Pagely worked with Beyondsoft, an AWS Advanced Consulting Partner, to use ConvergDB, an open-source tool developed by Beyondsoft, to build a DevOps-centric data pipeline. This pipeline uses AWS Glue to transform application logs into optimized tables that can be queried quickly and cost effectively using Amazon Athena.

Engaging Beyondsoft

We knew that we needed to do something to make our data easily accessible to our engineers with as little overhead as possible. We wanted to get the data into a more optimal file format to reduce query times. Being a lean shop, we didn’t have the bandwidth to dive into the technologies. To bridge the gap, we engaged Beyondsoft to determine the best solution to optimize and better manage our data lake.

What is ConvergDB?

ConvergDB is open-source software for the creation and management of data lakes. Users define the structure of the source and target tables and map them to concrete cloud resources. Then ConvergDB creates all of the scripts used to build and manage the necessary resources in AWS. The scripts created by ConvergDB are deployed through the use of HashiCorp Terraform, an open-source tool for managing infrastructure as code.

With ConvergDB, we can define our data lake with metadata to drive the table-creation and ETL (extract, transform, and load) processes. The schema files are used to define tables, including field-level SQL expressions that are used to transform the incoming data as it is being loaded. These expressions are used to derive calculated fields, in addition to the fields that are used for data partitioning.

After the schema is defined, the deployment file allows us to place the tables into an ETL job that is used to manage them. The ETL job schedule is specified in the deployment file and in optional fields such as the target S3 bucket and number of AWS Glue DPUs to use at runtime.

ConvergDB is a command line binary and does not need to be installed on a server. All of the artifacts are files that can be managed using source control. The ConvergDB binary takes in all the configuration files and then outputs a Terraform configuration containing all the artifacts necessary to deploy the data lake. These artifacts can be ETL scripts, table and database definitions, and IAM policies necessary to run the jobs. They can also include Amazon SNS notification topics, and even an Amazon CloudWatch dashboard showing the volume of data processed by ConvergDB ETL jobs.

Speed bumps

No implementation goes perfectly. In the following sections, Jeremy Winters, a Beyondsoft engineer, explains the problems we ran into and how they were addressed.

Partitioning and columnar formats

Two of the key best practices for structuring data for SQL analytic queries in Amazon S3 are partitioning and columnar file structure.

Partitioning is the process of splitting data into different prefixes or folders on S3 with a naming convention that’s most suitable to efficient retrieval of data. This allows Athena to skip over data that is not relevant to the particular query that is being executed.

Apache Parquet is a columnar format popular with tools in the Hadoop ecosystem. Parquet stores the columns of the data in separate, contiguous regions in the file. Directed by metadata footers, tools like Athena read only the sections of the file that are needed to fulfill the query. This process helps eliminate a large portion of I/O and network data transfer.

Reducing I/O through partitioning and Parquet files not only increases query performance, but it dramatically reduces the cost of using Athena.

For more information about best practices for data lake optimization, see the blog post Top 10 Performance Tuning Tips for Amazon Athena.

Small files problem

A classic issue encountered with Hadoop ecosystem tools is known as the “small files problem.” Processing a large number of small files creates a lot of overhead for the system, causing job execution times to skyrocket and potentially fail. Pagely had approximately 4 TB of history across 30 million files. Of these files, 29.5 million represented only 1.2 TB of the data in S3.

To analyze this issue, we enabled S3 inventory reporting on the source data bucket. The report is delivered daily in an ORC (optimized row columnar) format. From there, it is very easy to create an Athena table to analyze the bucket contents using SQL.

We used Athena to identify S3 prefixes that were “hot spots,” that is, those having a large number of small files. We identified 14,000 prefixes with less than 1 GB of data that we could consolidate. So… 29.5 million files consolidated into 14,000 files.

The following query is a way to identify small-file hot spots. The GROUP BY expression can be suited to your data. The example shows a way of grouping by the first “folder” in the bucket.

SELECT
  -- we are looking at the first string in a / delimited path
  -- if the key is path/2017-10-10.json it will group on path_to_data
  split_part(key,'/',1) AS prefix
  
  -- calculate the total size in mb for all files in prefix
  ,SUM(size)/CAST(1024*1024 AS DOUBLE) AS mb
  
  -- count of objects in the prefix
  ,COUNT(*) AS object_count
FROM
  pagely_gateway_logs
WHERE
  -- assumes that versioning is disabled
  -- you should use the latest date after
  -- refreshing all partitions
  dt = '2018-03-28-08-00'
GROUP BY
  1
HAVING
  -- only return prefixes with a total size of less than 1 gb
  -- and a file count greater than 8
  SUM(size)/CAST(1024*1024 AS DOUBLE) < 1024
  AND
  COUNT(*) >= 8

 

The results show prefixes in your object paths that can and should be consolidated. Anything less than 1 GB with more than eight files can then be consolidated into a single object, replacing the originals.

After the hot spots are identified, the small files must be consolidated. The results in the preceding image show that files with a prefix of 14184 total 33.7 MB spread across 502 files. To reduce the overhead of this many small files, we combine all of the files into a single file. Our files use gzip compression, which allows for combining them through simple concatenation, as opposed to decompressing, concatenating the raw JSON data, and then recompressing. There are many ways to achieve this, such as the Linux cat command, which is shown in the following example:

$ ls -1

14184_file1.gz

14184_file2.gz

14184_file3.gz

$ cat *.gz > combined.gz

$ gzip -d combined.gz

You can run these commands on any gzip files, and then test that the resulting file is a valid gzip by using the -d flag, which decompresses the file for you. For some use cases, you can use the Amazon S3 Multipart Copy API, but this approach requires that your small files be at least 5 MB in size.

For the Pagely dataset, we wrote a shell script to pull down all the files with a given prefix, concatenate them into a single gzip, upload the concatenated file, validate the upload, and then delete the smaller files. This script was run using AWS Fargate containers, each of which would handle a single prefix. The details of this process would be a blog post on its own, but using a service like AWS Batch can make a job like this easier to manage. The total cost of concatenating all of the small historical files was $27.

Historical data

Daily data volumes for Pagely logs are in the tens of gigabytes per day, easily handled by the smallest AWS Glue configuration. Transforming the 4 TB of compressed (~28 TB uncompressed) historical data was a bit more challenging.

ConvergDB batches the data into smaller chunks. In the case of a very long-running historical transformation job failing, only the last batch is lost, resulting in around one hour of compute being lost. ConvergDB uses its own state-tracking mechanism to communicate the failure to the next run of the job, which cleans up any mess before trying to process the batch again. Batching is an automatic feature of the ETL job created by ConvergDB, based upon the size of the AWS Glue cluster.

Post-deployment at Pagely

Running our legacy report for a medium-size application (several gigabytes in S3) took 91 seconds. Now that our data lake is in production, the same report for a medium-size application takes 5 seconds with Athena—an 18x performance gain. Our largest dataset (~1 TB in S3) fails with our legacy process. It’s also not sufficiently performant when querying the raw JSON directly with Athena. However, the new Parquet-based tables using Athena complete the analysis in 24 seconds.

Legacy processAthena with JSONAthena with Parquet
Medium customer1 minute, 31 seconds1 minute6 seconds
Largest customer> 8 hours> 30 minutes24 seconds

 

Although these numbers are obviously important, the biggest advantage is that now we don’t have to worry about performance and cost, and the engineers can focus on solving problems. Just 15 minutes of writing queries, and the entire team now has access to new data. I was able to upgrade the legacy process with queries dispatched to Athena through the AWS SDK. This process can now run on any lightweight machine (like my laptop) while Athena does the heavy lifting.

About Pagely

Pagely, in their own words: Pagely is an AWS Advanced Technology, SaaS, and Public Sector Partner providing managed WordPress hosting. We service enterprise-level customers like BMC, UNICEF, Northwestern University, and the City of Boston, offering flexibility in our solutions and the industries best expert-only, tier-less support. Pagely uses a proprietary tech stack that accelerates WordPress sites through the use of our own ARES™ Web Application gateway, PressCACHE™ and PressCDN™ technologies, and open source tools such as Redis and NGINX.

About Beyondsoft Consulting, Inc.

Beyondsoft Consulting, Inc., in their own words: Beyondsoft Consulting, Inc. is an Amazon Web Services Advanced Consulting Partner with delivery centers across the US and Asia. Beyondsoft delivers IT solutions and services to leading technology companies and enterprises across many verticals. Our team of highly skilled professionals, coupled with our focus on customer success, truly separate us as a preferred AWS Partner for many of our clients.

Beyondsoft Amazon Partner Page

Contact: Eric Valenzuela

If you have questions or suggestions, please comment below.

 


Additional Reading

If you found this post useful, be sure to check out Build a Data Lake Foundation with AWS Glue and Amazon S3 and Work with partitioned data in AWS Glue.

 


About the Authors

Joshua Eichorn is CTO of Pagely. An engineering leader with experience leading small and large teams. As an individual contributor, manager, and director he has done it all, from writing line one on a new application to a 6 month rewrite of a massive site. Josh loves solving new challenges and building great products.

 

 

Jeremy Winters is the creator of ConvergDB, and has been working in Business Intelligence across a variety of industries for 18 years, with the past 8 years being focused on building data lakes, data warehouses, and other applications in AWS.

 

 

 

How to access and analyze on-premises data stores using AWS Glue

Post Syndicated from Rajeev Meharwal original https://aws.amazon.com/blogs/big-data/how-to-access-and-analyze-on-premises-data-stores-using-aws-glue/

AWS Glue is a fully managed ETL (extract, transform, and load) service to catalog your data, clean it, enrich it, and move it reliably between various data stores. AWS Glue ETL jobs can interact with a variety of data sources inside and outside of the AWS environment. For optimal operation in a hybrid environment, AWS Glue might require additional network, firewall, or DNS configuration.

In this post, I describe a solution for transforming and moving data from an on-premises data store to Amazon S3 using AWS Glue that simulates a common data lake ingestion pipeline. AWS Glue can connect to Amazon S3 and data stores in a virtual private cloud (VPC) such as Amazon RDS, Amazon Redshift, or a database running on Amazon EC2. For more information, see Adding a Connection to Your Data Store. AWS Glue can also connect to a variety of on-premises JDBC data stores such as PostgreSQL, MySQL, Oracle, Microsoft SQL Server, and MariaDB.

AWS Glue ETL jobs can use Amazon S3, data stores in a VPC, or on-premises JDBC data stores as a source. AWS Glue jobs extract data, transform it, and load the resulting data back to S3, data stores in a VPC, or on-premises JDBC data stores as a target.

The following diagram shows the architecture of using AWS Glue in a hybrid environment, as described in this post. The solution uses JDBC connectivity using the elastic network interfaces (ENIs) in the Amazon VPC. The ENIs in the VPC help connect to the on-premises database server over a virtual private network (VPN) or AWS Direct Connect (DX).

The solution architecture illustrated in the diagram works as follows:

  • Network connectivity exists between the Amazon VPC and the on-premises network using a virtual private network (VPN) or AWS Direct Connect (DX).
  • The JDBC connection defines parameters for a data store—for example, the JDBC connection to the PostgreSQL server running on an on-premises network.
  • AWS Glue creates elastic network interfaces (ENIs) in a VPC/private subnet. These network interfaces then provide network connectivity for AWS Glue through your VPC.
  • Security groups attached to ENIs are configured by the selected JDBC connection.
  • The number of ENIs depends on the number of data processing units (DPUs) selected for an AWS Glue ETL job. AWS Glue DPU instances communicate with each other and with your JDBC-compliant database using ENIs.
  • AWS Glue can communicate with an on-premises data store over VPN or DX connectivity. Elastic network interfaces can access an EC2 database instance or an RDS instance in the same or different subnet using VPC-level routing. ENIs can also access a database instance in a different VPC within the same AWS Region or another Region using VPC peering.
  • AWS Glue uses Amazon S3 to store ETL scripts and temporary files. S3 can also be a source and a target for the transformed data. Amazon S3 VPC endpoints (VPCe) provide access to S3, as described in Amazon VPC Endpoints for Amazon S3.
  • Optionally, a NAT gateway or NAT instance setup in a public subnet provides access to the internet if an AWS Glue ETL job requires either access to AWS services using a public API or outgoing internet access.
  • An AWS Glue crawler uses an S3 or JDBC connection to catalog the data source, and the AWS Glue ETL job uses S3 or JDBC connections as a source or target data store.

The following walkthrough first demonstrates the steps to prepare a JDBC connection for an on-premises data store. Then it shows how to perform ETL operations on sample data by using a JDBC connection with AWS Glue.

Prepare the JDBC connection for an on-premises data store

Follow these steps to set up the JDBC connection.

Step 1:  Create a security group for AWS Glue ENIs in your VPC

To allow AWS Glue to communicate with its components, specify a security group with a self-referencing inbound rule for all TCP ports. In this example, we call this security group glue-security-group. The security group attaches to AWS Glue elastic network interfaces in a specified VPC/subnet. It enables unfettered communication between the ENIs within a VPC/subnet and prevents incoming network access from other, unspecified sources.

By default, the security group allows all outbound traffic and is sufficient for AWS Glue requirements.

Optionally, if you prefer, you can tighten up outbound access to selected network traffic that is required for a specific AWS Glue ETL job. For example, the following security group setup enables the minimum amount of outgoing network traffic required for an AWS Glue ETL job using a JDBC connection to an on-premises PostgreSQL database. Your configuration might differ, so edit the outbound rules as per your specific setup.

In this example, the following outbound traffic is allowed. Edit these rules as per your setup.

  • To allow AWS Glue to communicate with its components, specify a security group with a self-referencing outbound rule for all TCP ports. It enables unfettered communication between AWS Glue ENIs within a VPC/subnet.
  • Port 5432/tcp: To access the on-premises PostgreSQL database server.
  • Ports 53/udp, 53/tcp: Required for DNS resolution when using custom DNS servers, such as on-premises DNS servers or a DNS server running on an EC2 instance. Change the destination field as per your setup.
  • Port 443/tcp: Allows traffic to all S3 IP address ranges for the AWS Region where the ETL job is running. In this example, outbound traffic is allowed to all S3 IP prefixes via VPCe for the us-east-1 Region. Edit these rules accordingly if your S3 bucket resides in a different Region.

AWS publishes IP ranges in JSON format for S3 and other services. The IP range data changes from time to time. Subscribe to change notifications as described in AWS IP Address Ranges, and update your security group accordingly.

The following example command uses curl and the jq tool to parse JSON data and list all current S3 IP prefixes for the us-east-1 Region. Use these in the security group for S3 outbound access whether you’re using an S3 VPC endpoint or accessing S3 public endpoints via a NAT gateway setup.

$ curl -s https://ip-ranges.amazonaws.com/ip-ranges.json | jq -r '.prefixes[] | select(.service=="S3") | select(.region=="us-east-1") | .ip_prefix'

52.92.16.0/20
52.216.0.0/15
54.231.0.0/17

Step 2: Create an IAM role for AWS Glue

Create an IAM role for the AWS Glue service. For the role type, choose AWS Service, and then choose Glue. For more information, see Create an IAM Role for AWS Glue.

Add IAM policies to allow access to the AWS Glue service and the S3 bucket. In this example, the IAM role is glue_access_s3_full.

Step 3: Add a JDBC connection

To add a JDBC connection, choose Add connection in the navigation pane of the AWS Glue console. Enter the connection name, choose JDBC as the connection type, and choose Next.

On the next screen, provide the following information:

  • Enter the JDBC URL for your data store. For most database engines, this field is in the following format:
jdbc:protocol://host:port/db_name

For more information, see Working with Connections on the AWS Glue Console. This example uses a JDBC URL jdbc:postgresql://172.31.0.18:5432/glue_demo for an on-premises PostgreSQL server with an IP address 172.31.0.18. The PostgreSQL server is listening at a default port 5432 and serving the glue_demo database.

  • Enter the database user name and password. Follow the principle of least privilege and grant only the required permission to the database user.
  • Choose the VPC, private subnet, and the security group glue-security-group that you created earlier. AWS Glue creates multiple ENIs in the private subnet and associates security groups to communicate over the network.
  • Complete the remaining setup by reviewing the information, as shown following.

Step 4: Open appropriate firewall ports in the on-premises data center

Edit your on-premises firewall settings and allow incoming connections from the private subnet that you selected for the JDBC connection in the previous step. AWS Glue can choose any available IP address of your private subnet when creating ENIs. The example shown here requires the on-premises firewall to allow incoming connections from the network block 10.10.10.0/24 to the PostgreSQL database server running at port 5432/tcp.

You might also need to edit your database-specific file (such as pg_hba.conf) for PostgreSQL and add a line to allow incoming connections from the remote network block. Follow your database engine-specific documentation to enable such incoming connections.

Step 5: Test the JDBC connection

Select the JDBC connection in the AWS Glue console, and choose Test connection. Choose the IAM role that you created in the previous step, and choose Test connection. It might take few moments to show the result. If you receive an error, check the following:

  • The correct JDBC URL is provided.
  • The correct user name and password are provided for the database with the required privileges.
  • The correct network routing paths are set up and the database port access from the subnet is selected for AWS Glue ENIs
  • Security groups for ENIs allow the required incoming and outgoing traffic between them, outgoing access to the database, access to custom DNS servers if in use, and network access to Amazon S3.

You are now ready to use the JDBC connection with your AWS Glue jobs.

Perform ETL in AWS Glue using the JDBC connection

This section demonstrates ETL operations using a JDBC connection and sample CSV data from the Commodity Flow Survey (CFS) open dataset published on the United States Census Bureau site. The example uses sample data to demonstrate two ETL jobs as follows:

  • Part 1: An AWS Glue ETL job loads the sample CSV data file from an S3 bucket to an on-premises PostgreSQL database using a JDBC connection. The dataset then acts as a data source in your on-premises PostgreSQL database server for Part 2.
  • Part 2: An AWS Glue ETL job transforms the source data from the on-premises PostgreSQL database to a target S3 bucket in Apache Parquet format.

In each part, AWS Glue crawls the existing data stored in an S3 bucket or in a JDBC-compliant database, as described in Cataloging Tables with a Crawler. The crawler samples the source data and builds the metadata in the AWS Glue Data Catalog. You then develop an ETL job referencing the Data Catalog metadata information, as described in Adding Jobs in AWS Glue.

Optionally, you can use other methods to build the metadata in the Data Catalog directly using the AWS Glue API. You can populate the Data Catalog manually by using the AWS Glue console, AWS CloudFormation templates, or the AWS CLI. You can also build and update the Data Catalog metadata within your pySpark ETL job script by using the Boto 3 Python library. The Data Catalog is Hive Metastore-compatible, and you can migrate an existing Hive Metastore to AWS Glue as described in this README file on the GitHub website.

Part 1: An AWS Glue ETL job loads CSV data from an S3 bucket to an on-premises PostgreSQL database

Start by downloading the sample CSV data file to your computer, and unzip the file. Upload the uncompressed CSV file cfs_2012_pumf_csv.txt into an S3 bucket. The CSV data file is available as a data source in an S3 bucket for AWS Glue ETL jobs. The sample CSV data file contains a header line and a few lines of data, as shown here.

First, set up the crawler and populate the table metadata in the AWS Glue Data Catalog for the S3 data source. Start by choosing Crawlers in the navigation pane on the AWS Glue console. Then choose Add crawler. Specify the crawler name.

When asked for the data source, choose S3 and specify the S3 bucket prefix with the CSV sample data files. You can have one or multiple CSV files under the S3 prefix. The AWS Glue crawler crawls the sample data and generates a table schema.

Next, choose the IAM role that you created earlier. The IAM role must allow access to the AWS Glue service and the S3 bucket.

Next, choose an existing database in the Data Catalog, or create a new database entry. In this example, cfs is the database name in the Data Catalog.

Finish the remaining setup, and run your crawler at least once to create a catalog entry for the source CSV data in the S3 bucket.

Review the table that was generated in the Data Catalog after completion. The crawler creates the table with the name cfs_full and correctly identifies the data type as CSV.

Choose the table name cfs_full and review the schema created for the data source. It picked up the header row from the source CSV data file and used it for column names.

Now you can use the S3 data as a source and the on-premises PostgreSQL database as a destination, and set up an AWS Glue ETL job. The demonstration shown here is fairly simple. It loads the data from S3 to a single table in the target PostgreSQL database via the JDBC connection.

To create an ETL job, choose Jobs in the navigation pane, and then choose Add job. Specify the name for the ETL job as cfs_full_s3_to_onprem_postgres.

Choose the IAM role and S3 locations for saving the ETL script and a temporary directory area. The IAM role must allow access to the specified S3 bucket prefixes that are used in your ETL job. Optionally, you can enable Job bookmark for an ETL job. This option lets you rerun the same ETL job and skip the previously processed data from the source S3 bucket.

For your data source, choose the table cfs_full from the AWS Glue Data Catalog tables.

Next, for the data target, choose Create tables in your data target. Then choose JDBC in the drop-down list. For Connection, choose the JDBC connection my-jdbc-connection that you created earlier for the on-premises PostgreSQL database server running with the database name glue_demo.

Follow the remaining setup with the default mappings, and finish creating the ETL job.

Finally, it shows an autogenerated ETL script screen. Review the script and make any additional ETL changes, if required. When you’re ready, choose Run job to execute your ETL job.

The ETL job takes several minutes to finish. A new table is created with the name cfs_full in the PostgreSQL database with data loaded from CSV files in the S3 bucket. Verify the table and data using your favorite SQL client by querying the database. For example, run the following SQL query to show the results:

SELECT * FROM cfs_full ORDER BY shipmt_id LIMIT 10;

The table data in the on-premises PostgreSQL database now acts as source data for Part 2 described next.

Part 2: An AWS Glue ETL job transforms the data from an on-premises PostgreSQL database to Parquet format

In this section, you configure the on-premises PostgreSQL database table as a source for the ETL job. It transforms the data into Apache Parquet format and saves it to the destination S3 bucket.

Set up another crawler that points to the PostgreSQL database table and creates a table metadata in the AWS Glue Data Catalog as a data source. Optionally, you can build the metadata in the Data Catalog directly using other methods, as described previously.

Next, select the JDBC connection my-jdbc-connection that you created earlier for the on-premises PostgreSQL database server. For Include path, provide the table name path as glue_demo/public/cfs_full. It refers to the PostgreSQL table name cfs_full in a public schema with a database name of glue_demo.

Follow the remaining setup steps, provide the IAM role, and create an AWS Glue Data Catalog table in the existing database cfs that you created before. Optionally, provide a prefix for a table name onprem_postgres_ created in the Data Catalog, representing on-premises PostgreSQL table data.

Run the crawler and view the table created with the name onprem_postgres_glue_demo_public_cfs_full in the AWS Glue Data Catalog. Verify the table schema and confirm that the crawler captured the schema details.

(Optional) Tuning for JDBC read parallelism

In some cases, running an AWS Glue ETL job over a large database table results in out-of-memory (OOM) errors because all the data is read into a single executor. To avoid this situation, you can optimize the number of Apache Spark partitions and parallel JDBC connections that are opened during the job execution.

After crawling a database table, follow these steps to tune the parameters.

  • In the Data Catalog, edit the table and add the partitioning parameters hashexpression or hashfield. The job partitions the data for a large table along with the column selected for these parameters, as described following.
  • hashexpression: If your database table contains a column with numeric values such as a unique ID or similar data, choose the column name for a parameter hashexpression. In this example, shipmt_id is a monotonically increasing column and a good candidate for hashexpression
  • hashfield: If no suitable numeric column is available, find a column containing string values with a good even distribution (high cardinality), and choose the column name for a parameter hashfield.
  • hashpartitions: Provide a value of hashpartition as a number. By default, this value is set to 7. This parameter determines the number of Spark partitions and the resulting number of JDBC connections opened to the target database.

In this example, hashexpression is selected as shipmt_id with the hashpartition value as 15.

Next, create another ETL job with the name cfs_onprem_postgres_to_s3_parquet. Choose the IAM role and S3 bucket locations for the ETL script, and so on.

On the next screen, choose the data source onprem_postgres_glue_demo_public_cfs_full from the AWS Glue Data Catalog that points to the on-premises PostgreSQL data table. Next, choose Create tables in your data target. For Format, choose Parquet, and set the data target path to the S3 bucket prefix.

Follow the prompts until you get to the ETL script screen. The autogenerated pySpark script is set to fetch the data from the on-premises PostgreSQL database table and write multiple Parquet files in the target S3 bucket. By default, all Parquet files are written at the same S3 prefix level.

Optionally, if you prefer to partition data when writing to S3, you can edit the ETL script and add  partitionKeys parameters as described in the AWS Glue documentation. For this example, edit the pySpark script and search for a line to add an option partitionKeys: [quarter], as shown here.

datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://xxxxxxxxxxx/glue-demo01/outputs", "partitionKeys": ["quarter"]}, format = "parquet", transformation_ctx = "datasink4")

Choose Save and run job. The job executes and outputs data in multiple partitions when writing Parquet files to the S3 bucket. Each output partition corresponds to the distinct value in the column name quarter in the PostgreSQL database table.

The ETL job transforms the CFS data into Parquet format and separates it under four S3 bucket prefixes, one for each quarter of the year. The S3 bucket output listings shown following are using the S3 CLI.

Notice that AWS Glue opens several database connections in parallel during an ETL job execution based on the value of the hashpartitions parameters set before. For PostgreSQL, you can verify the number of active database connections by using the following SQL command:

SELECT * FROM pg_stat_activity WHERE datname = 'glue_demo';

The transformed data is now available in S3, and it can act as a data lake. Data is ready to be consumed by other services, such as upload to an Amazon Redshift based data warehouse or perform analysis by using Amazon Athena and Amazon QuickSight.

To demonstrate, create and run a new crawler over the partitioned Parquet data generated in the preceding step. Go to the new table created in the Data Catalog and choose Action, View data. You can then run an SQL query over the partitioned Parquet data in the Athena Query Editor, as shown here.

The following is an example SQL query with Athena. Note the use of the partition key quarter with the WHERE clause in the SQL query, to limit the amount of data scanned in the S3 bucket with the Athena query.

SELECT shipmt_id, orig_state, dest_state, quarter FROM "cfs"."athena_test" WHERE quarter='2' ORDER BY shipmt_id limit 10;

Other considerations for a hybrid setup

In some scenarios, your environment might require some additional configuration. This section describes the setup considerations when you are using custom DNS servers, as well as some considerations for VPC/subnet routing and security groups when using multiple JDBC connections.

Custom DNS servers

For a VPC, make sure that the network attributes enableDnsHostnames and enableDnsSupport are set to true. For more information, see Setting Up DNS in Your VPC.

When you use a custom DNS server for the name resolution, both forward DNS lookup and reverse DNS lookup must be implemented for the whole VPC/subnet used for AWS Glue elastic network interfaces. ENIs are ephemeral and can use any available IP address in the subnet. ETL jobs might receive a DNS error when both forward and reverse DNS lookup don’t succeed for an ENI IP address.

For example, assume that an AWS Glue ENI obtains an IP address 10.10.10.14 in a VPC/subnet. When you use a default VPC DNS resolver, it correctly resolves a reverse DNS for an IP address 10.10.10.14 as ip-10-10-10-14.ec2.internal. It resolves a forward DNS for a name ip-10-10-10-14.ec2.internal. as 10.10.10.14. The ETL job doesn’t throw a DNS error.

When you use a custom DNS server such as on-premises DNS servers connecting over VPN or DX, be sure to implement the similar DNS resolution setup. Refer to your DNS server documentation. For example, if you are using BIND, you can use the $GENERATE directive to create a series of records easily.

Another option is to implement a DNS forwarder in your VPC and set up hybrid DNS resolution to resolve using both on-premises DNS servers and the VPC DNS resolver. For implementation details, see the following AWS Security Blog posts:

AWS Glue ETL jobs with more than one JDBC connection

When you test a single JDBC connection or run a crawler using a single JDBC connection, AWS Glue obtains the VPC/subnet and security group parameters for ENIs from the selected JDBC connection configuration. AWS Glue then creates ENIs and accesses the JDBC data store over the network. Also, this works well for an AWS Glue ETL job that is set up with a single JDBC connection.

Additional setup considerations might apply when a job is configured to use more than one JDBC connection. For example, the first JDBC connection is used as a source to connect a PostgreSQL database, and the second JDBC connection is used as a target to connect an Amazon Aurora database. In this scenario, AWS Glue picks up the JDBC driver (JDBC URL) and credentials (user name and password) information from the respective JDBC connections.

However, for ENIs, it picks up the network parameter (VPC/subnet and security groups) information from only one of the JDBC connections out of the two that are configured for the ETL job. AWS Glue then creates ENIs in the VPC/subnet and associate security groups as defined with only one JDBC connection. It then tries to access both JDBC data stores over the network using the same set of ENIs.

In some cases, this can lead to a job error if the ENIs that are created with the chosen VPC/subnet and security group parameters from one JDBC connection prohibit access to the second JDBC data store.

The following table explains several scenarios and additional setup considerations for AWS Glue ETL jobs to work with more than one JDBC connection.

ETL job with two JDBC connections scenario

Additional setup considerationsHow it works
Both JDBC connections use the same VPC/subnet and security group parameters.No additional setup required.

AWS Glue creates ENIs with the same parameters for the VPC/subnet and security group, chosen from either of the JDBC connections.

In this case, the ETL job works well with two JDBC connections.

Both JDBC connections use the same VPC/subnet, but use different security group parameters.

Option 1: Consolidate the security groups (SG) applied to both JDBC connections by merging all SG rules. Create a new common security group with all consolidated rules. Apply the new common security group to both JDBC connections.

Option 2: Have a combined list containing all security groups applied to both JDBC connections. Apply all security groups from the combined list to both JDBC connections.

AWS Glue creates ENIs with the same parameters for the VPC/subnet and security group, chosen from either of the JDBC connections.

In this case, the ETL job works well with two JDBC connections after you apply additional setup steps.

Both JDBC connections use different VPC/subnet and different security group parameters.

For the security group, apply a setup similar to Option 1 or Option 2 in the previous scenario.

For VPC/subnet, make sure that the routing table and network paths are configured to access both JDBC data stores from either of the VPC/subnets.

AWS Glue creates ENIs with the same security group parameters chosen from either of the JDBC connection.

The VPC/subnet routing level setup ensures that the AWS Glue ENIs can access both JDBC data stores from either of the selected VPC/subnets.

In this case, the ETL job works well with two JDBC connections after you apply additional setup steps.

Summary

This post demonstrated how to set up AWS Glue in a hybrid environment. While using AWS Glue as a managed ETL service in the cloud, you can use existing connectivity between your VPC and data centers to reach an existing database service without significant migration effort. This provides you with an immediate benefit.

You can also use a similar setup when running workloads in two different VPCs. You can set up a JDBC connection over a VPC peering link between two VPCs within an AWS Region or across different Regions and by using inter-region VPC peering.

You can create a data lake setup using Amazon S3 and periodically move the data from a data source into the data lake. AWS Glue and other cloud services such as Amazon Athena, Amazon Redshift Spectrum, and Amazon QuickSight can interact with the data lake in a very cost-effective manner. To learn more, see Build a Data Lake Foundation with AWS Glue and Amazon S3.

 


Additional Reading

If you found this post useful, be sure to check out Orchestrate multiple ETL jobs using AWS Step Functions and AWS Lambda, as well as AWS Glue Developer Resources.

 


About the Author

Rajeev Meharwal is a Solutions Architect for AWS Public Sector Team. Rajeev loves to interact and help customers to implement state of the art architecture in the Cloud. His core focus is in the area of Networking, Serverless Computing and Data Analytics in the Cloud. He enjoys hiking with his family, playing badminton and chasing around his playful dog.

 

 

 

 

 

 

 

Orchestrate Multiple ETL Jobs Using AWS Step Functions and AWS Lambda

Post Syndicated from Moataz Anany original https://aws.amazon.com/blogs/big-data/orchestrate-multiple-etl-jobs-using-aws-step-functions-and-aws-lambda/

Extract, transform, and load (ETL) operations collectively form the backbone of any modern enterprise data lake. It transforms raw data into useful datasets and, ultimately, into actionable insight. An ETL job typically reads data from one or more data sources, applies various transformations to the data, and then writes the results to a target where data is ready for consumption. The sources and targets of an ETL job could be relational databases in Amazon Relational Database Service (Amazon RDS) or on-premises, a data warehouse such as Amazon Redshift, or object storage such as Amazon Simple Storage Service (Amazon S3) buckets. Amazon S3 as a target is especially commonplace in the context of building a data lake in AWS.

AWS offers AWS Glue, which is a service that helps author and deploy ETL jobs. AWS Glue is a fully managed extract, transform, and load service that makes it easy for customers to prepare and load their data for analytics. Other AWS Services also can be used to implement and manage ETL jobs. They include: AWS Database Migration Service (AWS DMS), Amazon EMR (using the Steps API), and even Amazon Athena.

The challenge of orchestrating an ETL workflow

How can we orchestrate an ETL workflow that involves a diverse set of ETL technologies? AWS Glue, AWS DMS, Amazon EMR, and other services support Amazon CloudWatch Events, which we could use to chain ETL jobs together. Amazon S3, the central data lake store, also supports CloudWatch Events. But relying on CloudWatch Events alone means that there’s no single visual representation of the ETL workflow. Also, tracing the overall ETL workflow’s execution status and handling error scenarios can become a challenge.

In this post, I show you how to use AWS Step Functions and AWS Lambda for orchestrating multiple ETL jobs involving a diverse set of technologies in an arbitrarily-complex ETL workflow. AWS Step Functions is a web service that enables you to coordinate the components of distributed applications and microservices using visual workflows. You build applications from individual components. Each component performs a discrete function, or task, allowing you to scale and change applications quickly.

Let’s look at an example ETL workflow.

Example datasets for the ETL workflow

For our example, we’ll use two publicly available Amazon QuickSight datasets.

The first dataset is a sales pipeline dataset, which contains a list of slightly more than 20,000 sales opportunity records for a fictitious business. Each record has fields that specify:

  • A date, potentially when an opportunity was identified.
  • The salesperson’s name.
  • A market segment to which the opportunity belongs.
  • Forecasted monthly revenue.

The second dataset is an online marketing metrics dataset. This dataset contains records of marketing metrics, aggregated by day. The metrics describe user engagement across various channels, such as websites, mobile, and social media, plus other marketing metrics. The two datasets are unrelated, but for the purpose of this example we’ll assume that they are related.

The example ETL workflow requirements

Imagine there’s a business user who needs to answer questions based on both datasets. Perhaps the user wants to explore the correlations between online user engagement metrics on the one hand, and forecasted sales revenue and opportunities generated on the other hand. The user engagement metrics include website visits, mobile users, and desktop users.

The steps in the ETL workflow are:

Process the Sales dataset (PSD). Read the Sales dataset. Group records by day, aggregating the Forecasted Monthly Revenue field. Rename fields to replace white space with underscores. Output the intermediary results to Amazon S3 in compressed Parquet format. Overwrite any previous output.

Process the Marketing dataset (PMD). Read the Marketing dataset. Rename fields to replace white space with underscores. Send the intermediary results to Amazon S3 in compressed Parquet format. Overwrite any previous output.

Join Marketing and Sales datasets (JMSD). Read the output of the processed Sales and Marketing datasets. Perform an inner join of both datasets on the date field. Sort in ascending order by date. Send the final joined dataset to Amazon S3, and overwrite any previous output.

So far, this ETL workflow can be implemented with AWS Glue, with the ETL jobs being chained by using job triggers. But you might have other requirements outside of AWS Glue that are part of your end-to-end data processing workflow, such as the following:

  • Both Sales and Marketing datasets are uploaded to an S3 bucket at random times in an interval of up to a week. The PSD job should start as soon as the Sales dataset file is uploaded. The PMD job should start as soon as the Marketing dataset file is uploaded. Parallel ETL jobs can start and finish anytime, but the final JMSD job can start only after all parallel ETL jobs are complete.
  • In addition to PSD and PMD jobs, the orchestration must support more parallel ETL jobs in the future that contribute to the final dataset aggregated by the JMSD job. The additional ETL jobs could be managed by AWS services, such as AWS Database Migration Service, Amazon EMR, Amazon Athena or other non-AWS services.

The data engineer takes these requirements and builds the following ETL workflow chart.

To fulfill the requirements, we need a generic ETL orchestration solution. A serverless solution is even better.

The ETL orchestration architecture and events

Let’s see how we can orchestrate an ETL workflow to fulfill the requirements using AWS Step Functions and AWS Lambda. The following diagram shows the ETL orchestration architecture and the main flow of events.

The main flow of events starts with an AWS Step Functions state machine. This state machine defines the steps in the orchestrated ETL workflow. A state machine can be triggered through Amazon CloudWatch based on a schedule, through the AWS Command Line Interface (AWS CLI), or using the various AWS SDKs in an AWS Lambda function or some other execution environment.

As the state machine execution progresses, it invokes the ETL jobs. As shown in the diagram, the invocation happens indirectly through intermediary AWS Lambda functions that you author and set up in your account. We’ll call this type of function an ETL Runner.

While the architecture in the diagram shows Amazon Athena, Amazon EMR, and AWS Glue, the accompanying code sample (aws-etl-orchestrator) includes a single ETL Runner, labeled AWS Glue Runner Function in the diagram. You can use this ETL Runner to orchestrate AWS Glue jobs. You can also follow the pattern and implement more ETL Runners to orchestrate other AWS services or non-AWS tools.

ETL Runners are invoked by activity tasks in Step Functions. Because of the way AWS Step Functions’ activity tasks work, ETL Runners need to periodically poll the AWS Step Functions state machine for tasks. The state machine responds by providing a Task object. The Task object contains inputs which enable an ETL Runner to run an ETL job.

As soon as an ETL Runner receives a task, it starts the respective ETL job. An ETL Runner maintains a state of active jobs in an Amazon DynamoDB table. Periodically, the ETL Runner checks the status of active jobs. When an active ETL job completes, the ETL Runners notifies the AWS Step Functions state machine. This allows the ETL workflow in AWS Step Functions to proceed to the next step.

An important question may come up. Why does an ETL Runner run independently from your Step Functions state machine and poll for tasks? Can’t we instead directly invoke an AWS Lambda function from the Step Functions state machine? Then can’t we have that function start and monitor an ETL job until it completes?

The answer is that AWS Lambda functions have a maximum execution duration per request of 300 seconds, or 5 minutes. For more information, see AWS Lambda Limits. ETL jobs typically take more than 5 minutes to complete. If an ETL Runner function is invoked directly, it will likely time out before the ETL job completes. Thus, we follow the long-running worker approach with activity tasks. The worker in this code sample – the ETL Runner – is an AWS Lambda function that gets triggered on a schedule using CloudWatch Events. If you want to avoid managing the polling schedule through CloudWatch Events, you can implement a polling loop in your ETL workflow’s state machine. Check the AWS Big Data blog post Orchestrate Apache Spark applications using AWS Step Functions and Apache Livy for an example.

Finally, let’s discuss how we fulfill the requirement of waiting for Sales and Marketing datasets to arrive in an S3 bucket at random times. We implement these waits as two separate activity tasks: Wait for Sales Data and Wait for Marketing Data. A state machine halts execution when it encounters either of these activity tasks. A CloudWatch Events event handler is then configured on an Amazon S3 bucket, so that when Sales or Marketing dataset files are uploaded to the bucket, Amazon S3 invokes an AWS Lambda function. The Lambda function then signals the waiting state machine to exit the activity task corresponding to the uploaded dataset. The subsequent ETL job is then invoked by the state machine.

Set up your own ETL orchestration

The aws-etl-orchestrator GitHub repository provides source code you can customize to set up the ETL orchestration architecture in your AWS account. The following steps show what you need to do to start orchestrating your ETL jobs using the architecture shown in this post:

  1. Model the ETL orchestration workflow in AWS Step Functions
  2. Build your ETL Runners (or use an existing AWS Glue ETL Runner)
  3. Customize AWS CloudFormation templates and create stacks
  4. Invoke the ETL orchestration state machine
  5. Upload sample Sales and Marketing datasets to Amazon S3

Model the ETL orchestration workflow in AWS Step Functions.  Use AWS Step Functions to model the ETL workflow described in this post as a state machine. A state machine in Step Functions consists of a set of states and the transitions between these states. A state machine is defined in Amazon States Language, which is a JSON-based notation. For a few examples of state machine definitions, see Sample Projects.

The following snapshot from the AWS Step Functions console shows our example ETL workflow modeled as a state machine. This workflow is what we provide you in the code sample.

When you start an execution of this state machine, it will branch to run two ETL jobs in parallel: Process Sales Data (PSD) and Process Marketing Data (PMD). But, according to the requirements, both ETL jobs should not start until their respective datasets are uploaded to Amazon S3. Hence, we implement Wait activity tasks before both PSD and PMD. When a dataset file is uploaded to Amazon S3, this triggers an AWS Lambda function that notifies the state machine to exit the Wait states. When both PMD and PSD jobs are successful, the JMSD job runs to produce the final dataset.

Finally, to have this ETL workflow execute once per week, you will need to configure a state machine execution to start once per week using a CloudWatch Event.

Build your ETL Runners (or use an existing AWS Glue ETL Runner)The code sample includes an AWS Glue ETL Runner. For simplicity, we implemented the ETL workflow using only AWS Glue jobs. However, nothing prevents you from using a different ETL technology to implement PMD or PSD jobs. You’ll need to build an ETL Runner for the technology that follows the AWS Glue ETL Runner example.

Customize AWS CloudFormation templates and create stacks. The sample published in the aws-etl-orchestrator repository includes three separate AWS CloudFormation templates. We organized resources into three templates following AWS CloudFormation best practices. The three resource groups are logically distinct and likely to have separate lifecycles and ownerships. Each template has an associated AWS CloudFormation parameters file (“*-params.json” files). Parameters in those files must be customized. The details about the three AWS CloudFormation templates are as follows:

  1. A template responsible for setting up AWS Glue resources.For our example ETL workflow, the sample template creates three AWS Glue jobs: PSD, PMD, and JMSD. The scripts for these jobs are pulled by AWS CloudFormation from an Amazon S3 bucket that you own.
  2. A template where the AWS Step Functions state machine is defined.The state machine definition in Amazon States Language is embedded in a StateMachine resource within the Step Functions template.
  3. A template that sets up resources required by the ETL Runner for AWS Glue.The AWS Glue ETL Runner is a Python script that is written to be run as an AWS Lambda function.

Invoke the ETL orchestration state machine. Finally, it is time to start a new state machine execution in AWS Step Functions. For our ETL example, the AWS CloudFormation template creates a state machine named MarketingAndSalesETLOrchestrator. You can start an execution from the AWS Step Functions console, or through an AWS CLI command. When you start an execution, the state machine will immediately enter Wait for Data states, waiting for datasets to be uploaded to Amazon S3.

Upload sample Sales and Marketing datasets to Amazon S3

Upload datasets provided to the S3 bucket that you specified in the code sample configuration. This uploaded datasets signal the state machine to continue execution.

The state machine may take a while to complete execution. You can monitor progress in the AWS Step Functions console. If the execution is successful, the output shown in the following diagram appears.

Congratulations! You’ve orchestrated the example ETL workflow to a successful completion.

Handling failed ETL jobs

What if a job in the ETL workflow fails? In such a case, there are error-handling strategies available to the ETL workflow developer, from simply notifying an administrator, to fully undoing the effects of the previous jobs through compensating ETL jobs. Detecting and responding to a failed ETL job can be implemented using the AWS Step Functions’ Catch mechanism. For more information, see Handling Error Conditions Using a State Machine. In the sample state machine, errors are handled by a do-nothing Pass state.

Try it out. Stop any of the example ETL workflow’s jobs while executing through the AWS Glue console or the AWS CLI. You’ll notice the state machine transitioning to the ETL Job Failed Fallback state.

Conclusion

In this post, I showed you how to implement your ETL logic as an orchestrated workflow. I presented a serverless solution for ETL orchestration that allows you to control ETL job execution using AWS Step Functions and AWS Lambda.  You can use the concepts and the code described in this post to build arbitrarily complex ETL state machines.

For more information and to download the source code, see the aws-etl-orchestrator GitHub repository. If you have questions about this post, send them our way in the Comments section below.


Additional Reading

If you found this post useful, be sure to check out Build a Data Lake Foundation with AWS Glue and Amazon S3 and Orchestrate Apache Spark applications using AWS Step Functions and Apache Livy.

 


About the Author

Moataz Anany is a senior solutions architect with AWS. He enjoys partnering with customers to help them leverage AWS and the cloud in creative ways. He dedicates most of his spare time to his wife and little ones. The rest is spent building and breaking things in side projects.

 

 

 

 

Amazon Neptune Generally Available

Post Syndicated from Randall Hunt original https://aws.amazon.com/blogs/aws/amazon-neptune-generally-available/

Amazon Neptune is now Generally Available in US East (N. Virginia), US East (Ohio), US West (Oregon), and EU (Ireland). Amazon Neptune is a fast, reliable, fully-managed graph database service that makes it easy to build and run applications that work with highly connected datasets. At the core of Neptune is a purpose-built, high-performance graph database engine optimized for storing billions of relationships and querying the graph with millisecond latencies. Neptune supports two popular graph models, Property Graph and RDF, through Apache TinkerPop Gremlin and SPARQL, allowing you to easily build queries that efficiently navigate highly connected datasets. Neptune can be used to power everything from recommendation engines and knowledge graphs to drug discovery and network security. Neptune is fully-managed with automatic minor version upgrades, backups, encryption, and fail-over. I wrote about Neptune in detail for AWS re:Invent last year and customers have been using the preview and providing great feedback that the team has used to prepare the service for GA.

Now that Amazon Neptune is generally available there are a few changes from the preview:

Launching an Amazon Neptune Cluster

Launching a Neptune cluster is as easy as navigating to the AWS Management Console and clicking create cluster. Of course you can also launch with CloudFormation, the CLI, or the SDKs.

You can monitor your cluster health and the health of individual instances through Amazon CloudWatch and the console.

Additional Resources

We’ve created two repos with some additional tools and examples here. You can expect continuous development on these repos as we add additional tools and examples.

  • Amazon Neptune Tools Repo
    This repo has a useful tool for converting GraphML files into Neptune compatible CSVs for bulk loading from S3.
  • Amazon Neptune Samples Repo
    This repo has a really cool example of building a collaborative filtering recommendation engine for video game preferences.

Purpose Built Databases

There’s an industry trend where we’re moving more and more onto purpose-built databases. Developers and businesses want to access their data in the format that makes the most sense for their applications. As cloud resources make transforming large datasets easier with tools like AWS Glue, we have a lot more options than we used to for accessing our data. With tools like Amazon Redshift, Amazon Athena, Amazon Aurora, Amazon DynamoDB, and more we get to choose the best database for the job or even enable entirely new use-cases. Amazon Neptune is perfect for workloads where the data is highly connected across data rich edges.

I’m really excited about graph databases and I see a huge number of applications. Looking for ideas of cool things to build? I’d love to build a web crawler in AWS Lambda that uses Neptune as the backing store. You could further enrich it by running Amazon Comprehend or Amazon Rekognition on the text and images found and creating a search engine on top of Neptune.

As always, feel free to reach out in the comments or on twitter to provide any feedback!

Randall

Analyze Apache Parquet optimized data using Amazon Kinesis Data Firehose, Amazon Athena, and Amazon Redshift

Post Syndicated from Roy Hasson original https://aws.amazon.com/blogs/big-data/analyzing-apache-parquet-optimized-data-using-amazon-kinesis-data-firehose-amazon-athena-and-amazon-redshift/

Amazon Kinesis Data Firehose is the easiest way to capture and stream data into a data lake built on Amazon S3. This data can be anything—from AWS service logs like AWS CloudTrail log files, Amazon VPC Flow Logs, Application Load Balancer logs, and others. It can also be IoT events, game events, and much more. To efficiently query this data, a time-consuming ETL (extract, transform, and load) process is required to massage and convert the data to an optimal file format, which increases the time to insight. This situation is less than ideal, especially for real-time data that loses its value over time.

To solve this common challenge, Kinesis Data Firehose can now save data to Amazon S3 in Apache Parquet or Apache ORC format. These are optimized columnar formats that are highly recommended for best performance and cost-savings when querying data in S3. This feature directly benefits you if you use Amazon Athena, Amazon Redshift, AWS Glue, Amazon EMR, or any other big data tools that are available from the AWS Partner Network and through the open-source community.

Amazon Connect is a simple-to-use, cloud-based contact center service that makes it easy for any business to provide a great customer experience at a lower cost than common alternatives. Its open platform design enables easy integration with other systems. One of those systems is Amazon Kinesis—in particular, Kinesis Data Streams and Kinesis Data Firehose.

What’s really exciting is that you can now save events from Amazon Connect to S3 in Apache Parquet format. You can then perform analytics using Amazon Athena and Amazon Redshift Spectrum in real time, taking advantage of this key performance and cost optimization. Of course, Amazon Connect is only one example. This new capability opens the door for a great deal of opportunity, especially as organizations continue to build their data lakes.

Amazon Connect includes an array of analytics views in the Administrator dashboard. But you might want to run other types of analysis. In this post, I describe how to set up a data stream from Amazon Connect through Kinesis Data Streams and Kinesis Data Firehose and out to S3, and then perform analytics using Athena and Amazon Redshift Spectrum. I focus primarily on the Kinesis Data Firehose support for Parquet and its integration with the AWS Glue Data Catalog, Amazon Athena, and Amazon Redshift.

Solution overview

Here is how the solution is laid out:

 

 

The following sections walk you through each of these steps to set up the pipeline.

1. Define the schema

When Kinesis Data Firehose processes incoming events and converts the data to Parquet, it needs to know which schema to apply. The reason is that many times, incoming events contain all or some of the expected fields based on which values the producers are advertising. A typical process is to normalize the schema during a batch ETL job so that you end up with a consistent schema that can easily be understood and queried. Doing this introduces latency due to the nature of the batch process. To overcome this issue, Kinesis Data Firehose requires the schema to be defined in advance.

To see the available columns and structures, see Amazon Connect Agent Event Streams. For the purpose of simplicity, I opted to make all the columns of type String rather than create the nested structures. But you can definitely do that if you want.

The simplest way to define the schema is to create a table in the Amazon Athena console. Open the Athena console, and paste the following create table statement, substituting your own S3 bucket and prefix for where your event data will be stored. A Data Catalog database is a logical container that holds the different tables that you can create. The default database name shown here should already exist. If it doesn’t, you can create it or use another database that you’ve already created.

CREATE EXTERNAL TABLE default.kfhconnectblog (
  awsaccountid string,
  agentarn string,
  currentagentsnapshot string,
  eventid string,
  eventtimestamp string,
  eventtype string,
  instancearn string,
  previousagentsnapshot string,
  version string
)
STORED AS parquet
LOCATION 's3://your_bucket/kfhconnectblog/'
TBLPROPERTIES ("parquet.compression"="SNAPPY")

That’s all you have to do to prepare the schema for Kinesis Data Firehose.

2. Define the data streams

Next, you need to define the Kinesis data streams that will be used to stream the Amazon Connect events.  Open the Kinesis Data Streams console and create two streams.  You can configure them with only one shard each because you don’t have a lot of data right now.

3. Define the Kinesis Data Firehose delivery stream for Parquet

Let’s configure the Data Firehose delivery stream using the data stream as the source and Amazon S3 as the output. Start by opening the Kinesis Data Firehose console and creating a new data delivery stream. Give it a name, and associate it with the Kinesis data stream that you created in Step 2.

As shown in the following screenshot, enable Record format conversion (1) and choose Apache Parquet (2). As you can see, Apache ORC is also supported. Scroll down and provide the AWS Glue Data Catalog database name (3) and table names (4) that you created in Step 1. Choose Next.

To make things easier, the output S3 bucket and prefix fields are automatically populated using the values that you defined in the LOCATION parameter of the create table statement from Step 1. Pretty cool. Additionally, you have the option to save the raw events into another location as defined in the Source record S3 backup section. Don’t forget to add a trailing forward slash “ / “ so that Data Firehose creates the date partitions inside that prefix.

On the next page, in the S3 buffer conditions section, there is a note about configuring a large buffer size. The Parquet file format is highly efficient in how it stores and compresses data. Increasing the buffer size allows you to pack more rows into each output file, which is preferred and gives you the most benefit from Parquet.

Compression using Snappy is automatically enabled for both Parquet and ORC. You can modify the compression algorithm by using the Kinesis Data Firehose API and update the OutputFormatConfiguration.

Be sure to also enable Amazon CloudWatch Logs so that you can debug any issues that you might run into.

Lastly, finalize the creation of the Firehose delivery stream, and continue on to the next section.

4. Set up the Amazon Connect contact center

After setting up the Kinesis pipeline, you now need to set up a simple contact center in Amazon Connect. The Getting Started page provides clear instructions on how to set up your environment, acquire a phone number, and create an agent to accept calls.

After setting up the contact center, in the Amazon Connect console, choose your Instance Alias, and then choose Data Streaming. Under Agent Event, choose the Kinesis data stream that you created in Step 2, and then choose Save.

At this point, your pipeline is complete.  Agent events from Amazon Connect are generated as agents go about their day. Events are sent via Kinesis Data Streams to Kinesis Data Firehose, which converts the event data from JSON to Parquet and stores it in S3. Athena and Amazon Redshift Spectrum can simply query the data without any additional work.

So let’s generate some data. Go back into the Administrator console for your Amazon Connect contact center, and create an agent to handle incoming calls. In this example, I creatively named mine Agent One. After it is created, Agent One can get to work and log into their console and set their availability to Available so that they are ready to receive calls.

To make the data a bit more interesting, I also created a second agent, Agent Two. I then made some incoming and outgoing calls and caused some failures to occur, so I now have enough data available to analyze.

5. Analyze the data with Athena

Let’s open the Athena console and run some queries. One thing you’ll notice is that when we created the schema for the dataset, we defined some of the fields as Strings even though in the documentation they were complex structures.  The reason for doing that was simply to show some of the flexibility of Athena to be able to parse JSON data. However, you can define nested structures in your table schema so that Kinesis Data Firehose applies the appropriate schema to the Parquet file.

Let’s run the first query to see which agents have logged into the system.

The query might look complex, but it’s fairly straightforward:

WITH dataset AS (
  SELECT 
    from_iso8601_timestamp(eventtimestamp) AS event_ts,
    eventtype,
    -- CURRENT STATE
    json_extract_scalar(
      currentagentsnapshot,
      '$.agentstatus.name') AS current_status,
    from_iso8601_timestamp(
      json_extract_scalar(
        currentagentsnapshot,
        '$.agentstatus.starttimestamp')) AS current_starttimestamp,
    json_extract_scalar(
      currentagentsnapshot, 
      '$.configuration.firstname') AS current_firstname,
    json_extract_scalar(
      currentagentsnapshot,
      '$.configuration.lastname') AS current_lastname,
    json_extract_scalar(
      currentagentsnapshot, 
      '$.configuration.username') AS current_username,
    json_extract_scalar(
      currentagentsnapshot, 
      '$.configuration.routingprofile.defaultoutboundqueue.name') AS               current_outboundqueue,
    json_extract_scalar(
      currentagentsnapshot, 
      '$.configuration.routingprofile.inboundqueues[0].name') as current_inboundqueue,
    -- PREVIOUS STATE
    json_extract_scalar(
      previousagentsnapshot, 
      '$.agentstatus.name') as prev_status,
    from_iso8601_timestamp(
      json_extract_scalar(
        previousagentsnapshot, 
       '$.agentstatus.starttimestamp')) as prev_starttimestamp,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.firstname') as prev_firstname,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.lastname') as prev_lastname,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.username') as prev_username,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.routingprofile.defaultoutboundqueue.name') as current_outboundqueue,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.routingprofile.inboundqueues[0].name') as prev_inboundqueue
  from kfhconnectblog
  where eventtype <> 'HEART_BEAT'
)
SELECT
  current_status as status,
  current_username as username,
  event_ts
FROM dataset
WHERE eventtype = 'LOGIN' AND current_username <> ''
ORDER BY event_ts DESC

The query output looks something like this:

Here is another query that shows the sessions each of the agents engaged with. It tells us where they were incoming or outgoing, if they were completed, and where there were missed or failed calls.

WITH src AS (
  SELECT
     eventid,
     json_extract_scalar(currentagentsnapshot, '$.configuration.username') as username,
     cast(json_extract(currentagentsnapshot, '$.contacts') AS ARRAY(JSON)) as c,
     cast(json_extract(previousagentsnapshot, '$.contacts') AS ARRAY(JSON)) as p
  from kfhconnectblog
),
src2 AS (
  SELECT *
  FROM src CROSS JOIN UNNEST (c, p) AS contacts(c_item, p_item)
),
dataset AS (
SELECT 
  eventid,
  username,
  json_extract_scalar(c_item, '$.contactid') as c_contactid,
  json_extract_scalar(c_item, '$.channel') as c_channel,
  json_extract_scalar(c_item, '$.initiationmethod') as c_direction,
  json_extract_scalar(c_item, '$.queue.name') as c_queue,
  json_extract_scalar(c_item, '$.state') as c_state,
  from_iso8601_timestamp(json_extract_scalar(c_item, '$.statestarttimestamp')) as c_ts,
  
  json_extract_scalar(p_item, '$.contactid') as p_contactid,
  json_extract_scalar(p_item, '$.channel') as p_channel,
  json_extract_scalar(p_item, '$.initiationmethod') as p_direction,
  json_extract_scalar(p_item, '$.queue.name') as p_queue,
  json_extract_scalar(p_item, '$.state') as p_state,
  from_iso8601_timestamp(json_extract_scalar(p_item, '$.statestarttimestamp')) as p_ts
FROM src2
)
SELECT 
  username,
  c_channel as channel,
  c_direction as direction,
  p_state as prev_state,
  c_state as current_state,
  c_ts as current_ts,
  c_contactid as id
FROM dataset
WHERE c_contactid = p_contactid
ORDER BY id DESC, current_ts ASC

The query output looks similar to the following:

6. Analyze the data with Amazon Redshift Spectrum

With Amazon Redshift Spectrum, you can query data directly in S3 using your existing Amazon Redshift data warehouse cluster. Because the data is already in Parquet format, Redshift Spectrum gets the same great benefits that Athena does.

Here is a simple query to show querying the same data from Amazon Redshift. Note that to do this, you need to first create an external schema in Amazon Redshift that points to the AWS Glue Data Catalog.

SELECT 
  eventtype,
  json_extract_path_text(currentagentsnapshot,'agentstatus','name') AS current_status,
  json_extract_path_text(currentagentsnapshot, 'configuration','firstname') AS current_firstname,
  json_extract_path_text(currentagentsnapshot, 'configuration','lastname') AS current_lastname,
  json_extract_path_text(
    currentagentsnapshot,
    'configuration','routingprofile','defaultoutboundqueue','name') AS current_outboundqueue,
FROM default_schema.kfhconnectblog

The following shows the query output:

Summary

In this post, I showed you how to use Kinesis Data Firehose to ingest and convert data to columnar file format, enabling real-time analysis using Athena and Amazon Redshift. This great feature enables a level of optimization in both cost and performance that you need when storing and analyzing large amounts of data. This feature is equally important if you are investing in building data lakes on AWS.

 


Additional Reading

If you found this post useful, be sure to check out Analyzing VPC Flow Logs with Amazon Kinesis Firehose, Amazon Athena, and Amazon QuickSight and Work with partitioned data in AWS Glue.


About the Author

Roy Hasson is a Global Business Development Manager for AWS Analytics. He works with customers around the globe to design solutions to meet their data processing, analytics and business intelligence needs. Roy is big Manchester United fan cheering his team on and hanging out with his family.