Tag Archives: AWS Glue

Monitor data quality in your data lake using PyDeequ and AWS Glue

Post Syndicated from Joan Aoanan original https://aws.amazon.com/blogs/big-data/monitor-data-quality-in-your-data-lake-using-pydeequ-and-aws-glue/

In our previous post, we introduced PyDeequ, an open-source Python wrapper over Deequ, which enables you to write unit tests on your data to ensure data quality. The use case we ran through was on static, historical data, but most datasets are dynamic, so how can you quantify how your data is changing and detect anomalous changes over time?

At Amazon, we’ve leveraged PyDeequ on AWS Glue to address this problem. AWS Glue is a serverless data integration service that allows you to easily prepare and combine your data for analytics, machine learning (ML), and application development. AWS Glue enables data engineers to build extract, transform, and load (ETL) workflows with ease. By using PyDeequ with AWS Glue, you can create a metrics repository on your data and check for anomalous changes over time inside your ETL workflows. In this post, we share this design pattern with you.

Use cases of PyDeequ on AWS Glue include:

  • Identifying and counting mismatched schema items and then immediately correcting them
  • Reviewing your incoming data with standard or custom, predefined analytics before storing it for big data validation
  • Tracking changes in data distribution by using a data quality metric file
  • Immediately identifying and creating useful constraints based on data distribution

The post describes the implementation process and provides a step-by-step tutorial of tracking changes in data quality. It walks you through an example of transforming a large dataset to identify the seasonality of the trends over time. Next, you create, sort, and load a metrics repository using PyDeequ, which allows you to persist your analysis over time. Finally, you create an alert that notifies you when a data point is outside the forecasted range.

Where are the Anomalies?

It can be difficult to immediately find anomalies within your incoming data stream over time. PyDeequ makes it easier to identify changes in data distribution by creating a metrics repository. The repository allows you to store and load a variety of anomaly checks to compare current and past metric values. For this post, you learn about the Holt Winters anomaly detection strategy, one of the various anomaly detection strategies that PyDeequ provides. The Holt Winters model forecasts future datasets based on a repeated periodical pattern (seasonality), a trend (slope), and the average between two corresponding time points.

You can apply the Holt Winters method in many different use cases, such as the following:

  • Business problem – Identifying a shift in the demand of a product
  • Data pattern – Input data deviates from trend and seasonality
  • Business analysis – Detecting changes in profits over time

To demonstrate this anomaly detection strategy, you use the AWS Customer Reviews Dataset, a collection of over 130 million reviews written in Amazon.com marketplace from 1995–2015. Specifically, you narrow down the dataset to focus on the total votes in the jewelry subset from 2013–2015. A graph of this data shows a tight correlation and seasonality with more engagement throughout the winter holidays. However, by 2015, the correlation deviates.

The following graph illustrates February 2015 as divergent from the previous years, with nearly 30% more engagement in votes.

How can we detect similar events like these in new data?

With PyDeequ, you can easily identify anomalies without any visuals. February 2015 is outside the calculated forecast range; therefore, PyDeequ flags the data point as anomalous. This post demonstrates using PyDeequ’s anomaly detection to get email notifications for anomalous events, which look like the following screenshot.

Solution architecture

With Amazon Athena and an AWS Glue crawler, you can create an AWS Glue Data Catalog to access the Amazon Simple Storage Service (Amazon S3) data source. This allows the data to be easily queried for usage downstream. You can use an Amazon SageMaker notebook with a configured AWS Glue development endpoint to interact with your AWS Glue ETL jobs. We configure our AWS Glue ETL jobs to use PyDeequ to store results in Amazon S3, and use Amazon Simple Notification Service (Amazon SNS) to notify administrators of any anomalies.

The following diagram illustrates this architecture.

Solution overview

To implement this solution, you complete the following high-level steps:

  1. Create an SNS topic.
  2. Upload PyDeequ and Deequ to Amazon S3.
  3. Create an AWS Identity and Access Management (IAM) role for AWS Glue.
  4. Crawl, query, and create your dataset.
  5. Transform the dataset into a table.
  6. Create an AWS Glue development endpoint.
  7. Create a SageMaker notebook to interface with the endpoint.
  8. Create a new AWS Glue session.
  9. Extract the table.
  10. Transform the table.
  11. Use PyDeequ to detect anomalous data points.

Create an SNS topic

Complete the following steps to create your SNS topic:

  1. On the Amazon SNS console, choose Topics.
  2. Choose Create topic.
  3. For Type, choose Standard.
  4. For Name, enter jewelry_hw.
  5. For Display name, enter Holt Winters Anomaly Example.
  6. Choose Create Topic.
  7. On the details page for the topic you just created, under Subscription, choose Create subscription.
  8. For Protocol, choose Email.
  9. For Endpoint, enter the email you want to receive the notification.
  10. Choose Create subscription. An email is sent to the entered endpoint.
  11. Open the email message and choose Confirm subscription.

Upload PyDeequ and Deequ to Amazon S3

In this step, you create an S3 bucket and upload PyDeequ and Deequ.

  1. On the Amazon S3 console, create a new bucket. We reference it as <__YOUR_BUCKET__> throughout this post.
  2. Inside your bucket, create a folder called dependencies.
  3. Download the deequ-1.0.3.jar file.
  4. Create a .zip file for PyDeequ by compressing the folder that contains the __init__.py file.
  5. Upload the Deequ and PyDeequ file to your dependencies folder.

If you’re on a *nix operating system or have the AWS Command Line Interface (AWS CLI) configured, you can use the following code:

$ wget https://repo1.maven.org/maven2/com/amazon/deequ/deequ/1.0.3/deequ-1.0.3.jar 
$ git clone https://github.com/awslabs/python-deequ.git
$ cd python-deequ && zip -r ../pydeequ.zip pydeequ && cd ../
$ aws s3 cp deequ-1.0.3.jar s3://<__YOUR_BUCKET__>/dependencies/
$ aws s3 cp pydeequ.zip s3://<__YOUR_BUCKET__>/dependencies/

Create an IAM role for AWS Glue

You now create an IAM role for AWS Glue and attach the required policies.

  1. On the IAM console, choose Roles.
  2. Choose Create a role.
  3. For Trusted entity, choose AWS Service.
  4. For Use case, choose Glue.
  5. Choose Next.
  6. Add the following policies to the role:
    1. AWSGlueServiceRole
    2. AWSGlueConsoleSageMakerNotebookFullAccess
  7. Add an inline policy to the role with the following JSON code.

Be sure to replace the resource values in the code. If you’re unsure what your Athena query outputs location is in Amazon S3, you can find it on the Settings tab on the Athena console.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:Put*",
                "s3:Get*",
                "s3:Create*",
                "s3:Delete*"
            ],
            "Resource": [
                "arn:aws:s3:::<__YOUR_BUCKET__>/*",
                "arn:aws:s3:::<__ATHENA_QUERY_OUTPUTS_BUCKET__>/*",
                "arn:aws:s3:::amazon-reviews-pds/parquet/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": "sns:Publish",
            "Resource": "arn:aws:sns:*:*:jewelry_hw"
        }
    ]
}

Crawl, query, and create the dataset

First, you use an AWS Glue crawler to add the AWS Customer Reviews Dataset to the Data Catalog.

  1. On the Athena console, choose Connect Data Source.
  2. For Choose where your data is located, select Query data in Amazon S3.
  3. For Choose a metadata catalog, select AWS Glue data catalog.
  4. Choose Set up a crawler in AWS Glue to retrieve schema information automatically.
  5. Choose Connect to AWS Glue.
  6. For Crawler Name, enter jewelry_dataset_crawler.
  7. Choose Next.
  8. Choose Next again.
  9. For Crawler Source Type, choose Data stores.
  10. For Repeat crawls of S3 data stores, choose Crawl all folders.
  11. Choose Next.
  12. For Choose a data store, choose S3.
  13. For Crawl data in, select Specified path in another account.
  14. For Include path, enter: s3://amazon-reviews-pds/parquet/.
  15. Choose Next.
  16. In the Choose an IAM role section, select Choose an existing IAM role.
  17. Choose the IAM role we created earlier.
  18. Choose Next.
  19. Under Frequency, choose Run on Demand.

Alternatively, to test incoming data in the Data Catalog, you can change the frequency of the crawler.

  1. Choose Next.
  2. For Database, choose Add Database and enter jewelry_db.
  3. Choose Next.
  4. Review the crawler properties and choose Finish.
  5. Run the data crawler.

Transform the dataset into a table

Next, we transform the AWS Customer Reviews Dataset into a table with Athena.

  1. On the Athena console, under Database, choose the jewelry_db table.

The table parquet(Partitioned) should be listed under Tables. If the database doesn’t show up, choose the refresh icon above Connect data source.

Now let’s create a second table from this dataset. This table includes three columns, which contain where data has a product category jewelry and the marketplace is US. We use US as a filter to closely match holiday seasonal trends.

  1. Enter the following query:
    /*Athena jewelry dataset*/
    CREATE TABLE jewelry_db.jewelry_dataset
    WITH (
    format='PARQUET'
    ) AS
    SELECT total_votes, year,
    Date_FORMAT(review_date,
    '%Y-%c-01') AS review_date
    FROM parquet
    WHERE product_category = 'Jewelry' AND marketplace = 'US'
    ORDER BY review_date DESC

  2. Choose Run Query.

Under Tables, a new data table has been added called jewelry_dataset.

Create an AWS Glue development endpoint

To create your AWs Glue development endpoint, complete the following steps:

  1. On the AWS Glue console, choose Dev Endpoints.
  2. Choose Add endpoint.
  3. For Development endpoint name, enter jewelry_hw_example.
  4. In the IAM role section, select Choose an existing IAM role and choose the IAM role we created earlier.
  5. Under Python Library Path, choose the folder icon to navigate to the pydeequ.zip file in your S3 bucket.
  6. Under Dependent Jars Path, choose the folder icon to select the deequ-1.0.3.jar file in your S3 bucket.
  7. For AWS Glue Version, choose Spark 2.4, Python 3 (Glue Version 1.0).
  8. Choose Next.
  9. Review your settings and choose Finish.

Create a SageMaker notebook to interface with our endpoint

You’re redirected to the dev endpoint page. Under Provisioning Status, it currently says Provisioning. Wait until that changes to Ready. This may take more than 5 minutes.

  1. On the AWS Glue console, choose Notebooks.
  2. Choose Create notebook.
  3. For Notebook name, enter jewelry-hw.
  4. For Attach to development endpoint, choose jewelry_hw_example.
  5. Select Create an IAM Role.
  6. For IAM role, enter a name for your role.
  7. Choose Create notebook.

Now we can do our data analysis! You can walk through the following sections in your newly created SageMaker notebook.

Create an AWS Glue session

To create your AWS Glue session, complete the following steps:

  1. In your SageMaker notebook instance, choose New.
  2. Choose Sparkmagic (PySpark).

This creates a new notebook for you with a Sparkmagic (PySpark) kernel.

  1. Create an AWS Glue session using the following code:
    import sys
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    
    glueContext = GlueContext(SparkContext.getOrCreate())
    
    session = glueContext.spark_session
    
    # UPDATE ME:
    topic_arn = "<__SNS_TOPIC_ARN__>"
    s3_bucket = "<__S3_BUCKET_NAME__>"
    region = "<__REGION_YOUR_DEV_ENDPOINT_AND_SNS_TOPIC_ARE_IN__>"

Extract the table

You extract the data table jewelry_dataset and turn it into to a DataFrame so that it can be used with PyDeequ. Next, you use the dropDuplicates method to remove any potential duplicates within the dataset. See the following code:

jewelry_dyf = glueContext.create_dynamic_frame.from_catalog(database="jewelry_db", table_name="jewelry_dataset")
jewelry_df = jewelry_dyf.toDF()
jewelry_df.dropDuplicates()

The following screenshot shows your output.

Transform the table

We can further simply the jewelry_df table by using the date_format method to change the column to only show the month and year of total_votes. Afterwards, we can filter jewelry_df2 by year to contain only the two columns needed. See the following code:

import pyspark.sql.functions as f

jewelry_df2 = jewelry_df.withColumn('review_date', f.date_format('review_date', 'yyyy/M'))\
.orderBy('review_date', ascending = False)

df_2013 = jewelry_df2.filter("year ='2013'").select("review_date","total_votes")
df_2014 = jewelry_df2.filter("year ='2013'").select("review_date","total_votes")
df_2015 = jewelry_df2.filter("year ='2013'").select("review_date","total_votes")

We can use df_2013.show(10) to see an iteration of what our data table looks like before iterating through PyDeequ. The following screenshot shows our output.

Use PyDeequ to detect anomalous data points

For this post, we demonstrate detecting anomalous data points with the FileSystemMetricsRepository class. A metrics repository is stored in JSON format to be used as a data quality report over time in Amazon S3, HDFS, or in memory. The variable s3_write_path is where you want your JSON file to be stored within Amazon S3. See the following code:

s3_write_path = f"s3://{s3_bucket}/tmp/holt_winters_tutorial.json"
import pydeequ
from pydeequ.repository import *
metricsRepository = FileSystemMetricsRepository(session,s3_write_path)

We now load the 2013–2014 dataset into metrics.

If your dataset is collected monthly, and follows an annual seasonal trend, use the MetricInterval.Monthly and SeriesSeasonality.Yearly metrics. This selection requires you to collect at least 25 data points. The initial 24 data points are monthly values from 2013–2014, which we use to create the Holt Winters model. The values in 2015 are the forecasted points, which could can concede an anomalous value.

As shown in the following code, we create a for loop that iterates through df_2013. We use month to create a date to later help us query values from df_2013. The filter method allows us create a df data frame that contains the total_votes values by month (for this post, the first iteration is a table of values from January 2013).

Next, each set of metrics that we computed needs be indexed by a ResultKey, which contains a timestamp and supports arbitrary tags in the form of key-value pairs.

Finally, we create a VerificationSuite. We make PyDeequ write and store our metrics in Amazon S3 by adding the useRepository and saveOrAppendResult method. Then we add Holt Winters with a Sum analyzer to calculate monthly total_votes. See the following code:

from pydeequ.verification import *

for year in ['2013','2014']:
    for month in range(1,13):
        date = f"\'{year}/{month}\'"
        df = df_2013.filter(f"review_date = {date}")

        key_tags = {'tag':  date}
        result_key_2013 = ResultKey(session, ResultKey.current_milli_time(), key_tags)

        jewelry_result = VerificationSuite(session).onData(df)\
            .useRepository(metricsRepository) \
            .saveOrAppendResult(result_key_2013) \
            .addAnomalyCheck(HoltWinters(MetricInterval.Monthly, SeriesSeasonality.Yearly), Sum('total_votes'))\
            .run()

Great! We have created the trend for the Holt Winters algorithm. Now it’s time to detect any anomalies within 2015.

Create another Holt Winters anomaly check similar to the 2013–2014 dataset, except  iterate only to August (because the dataset only goes to August of 2015). For each month, we check for an anomaly using jewelry_result.status. If it’s not a success, that means an anomaly has been detected. Collect the constraint_message to see the error value. Use publish to create an SNS notification. Include the topicArn that we created in Amazon SNS, a Message, subject, and MessageAttribute. If an anomaly has been detected, break out of the loop. See the following code:

# Use AWS SNS 
import boto3 
import json

# Topic for AWS SNS 
snsClient = boto3.client('sns', region_name = region)

for month in range(1,9):
    date = "\'2015" +'/'+str(month)+"\'"
    df = df_2015.filter("review_date =" + date)
    key_tags = {'tag':  date}
    result_key_2015 = ResultKey(session, ResultKey.current_milli_time(), key_tags)

    jewelry_result = VerificationSuite(session).onData(df)\
        .useRepository(metricsRepository) \
        .saveOrAppendResult(result_key_2015) \
        .addAnomalyCheck(HoltWinters(MetricInterval.Monthly, SeriesSeasonality.Yearly), Sum('total_votes'))\
        .run()
    
    df = VerificationResult.checkResultsAsDataFrame(session, jewelry_result)
    
    if (jewelry_result.status != "Success"):
        print("Anomaly for total_votes has been detected")
        print(date)
        message = df.select("constraint_message").collect()
        response = snsClient.publish(TopicArn = topic_arn,
                             Message = "anomaly detected in data frame: \n" + json.dumps(message),
                             Subject = "Anomaly Detected in the jewelry dataset:"+ date,
                             MessageAttributes = {"TransactionType":
                                            {"DataType": "String.Array", "StringValue": "Anomaly Detected in Glue"}})
        break

After completing this tutorial, you should receive an email notification stating an anomaly has been detected for February 2015. This coincides with our hypothesis that PyDeequ will flag the same anomaly from the graph!

More on using AWS Glue and PyDeequ

This post shows how you can start exploring anomaly detection with PyDeequ. This simple tutorial is just the beginning of what you can do with AWS Glue. To add to this tutorial, you can create a time-based schedule for jobs and crawlers to run every time a dataset is appended.

Alternatively, you can use the different modules provided by PyDeequ and its tutorials, or the use case examples provided at the beginning of this post to further understand the dataset.

Resource cleanup

Clean up the resources created in this post when you’re finished:

Conclusion

This post demonstrates the basics of detecting anomalies using PyDeequ and AWS Glue. Anomaly detection relies on the metrics repository file. This repository can easily be stored within Amazon S3, HDFS, or in memory as a JSON object for future test usage and AWS Glue ETL jobs. In addition to AWS Glue, PyDeequ can function within Amazon EMR and SageMaker in order to best handle the needs of your data pipeline.

This approach allows you to improve the quality and your own knowledge of your dataset. You can also apply this tool to a variety of business scenarios. The contents of this tutorial are for demonstration purposes and not production workloads. Be sure to follow security best practices for handling data at rest and in transit when you adapt PyDeequ into your workflows.


About the Authors

Joan Aoanan is a ProServe Consultant at AWS. With her B.S. Mathematics-Computer Science degree from Gonzaga University, she is interested in integrating her interests in math and science with technology.

 

 

Veronika Megler, PhD, is Principal Data Scientist for Amazon.com Consumer Packaging. Until recently she was the Principal Data Scientist for AWS Professional Services. She enjoys adapting innovative big data, AI, and ML technologies to help companies solve new problems, and to solve old problems more efficiently and effectively. Her work has lately been focused more heavily on economic impacts of ML models and exploring causality.

 

 

Calvin Wang is a Data Scientist at AWS AI/ML. He holds a B.S. in Computer Science from UC Santa Barbara and loves using machine learning to build cool stuff.

 

 

 

Integrating Datadog data with AWS using Amazon AppFlow for intelligent monitoring

Post Syndicated from Gopalakrishnan Ramaswamy original https://aws.amazon.com/blogs/big-data/integrating-datadog-data-with-aws-using-amazon-appflow-for-intelligent-monitoring/

Infrastructure and operation teams are often challenged with getting a full view into their IT environments to do monitoring and troubleshooting. New monitoring technologies are needed to provide an integrated view of all components of an IT infrastructure and application system.

Datadog provides intelligent application and service monitoring by bringing together data from servers, databases, containers, and third-party services in the form of a software as a service (SaaS) offering. It provides operations and development professionals the ability to measure application and infrastructure performance, visualize metrics with the help of a unified dashboard and create alerts and notifications.

Amazon AppFlow is a fully managed service that provides integration capabilities by enabling you to transfer data between SaaS applications like Datadog, Salesforce, Marketo, and Slack and AWS services like Amazon Simple Storage Service (Amazon S3) and Amazon Redshift. It provides capabilities to transform, filter, and validate data to generate enriched and usable data in a few easy steps.

In this post, I walk you through the process of extracting log data from Datadog, using Amazon AppFlow and storing it in Amazon S3, and querying it with Amazon Athena.

Solution overview

The following diagram shows the flow of our solution.

The following diagram shows the flow of our solution.

The Datadog Agent is a lightweight software that can be installed in many different platforms, either directly or as a containerized version. It collects events and metrics from hosts and sends them to Datadog. Amazon AppFlow extracts the log data from Datadog and stores it in Amazon S3, which is then queried using Athena.

To implement the solution, you complete the following steps:

  1. Install and configure the Datadog Agent.
  2. Create a new Datadog application key.
  3. Create an Amazon AppFlow connection for Datadog.
  4. Create a flow in Amazon AppFlow.
  5. Run the flow and query the data.

Prerequisites

The walkthrough requires the following:

  • An AWS account
  • A Datadog account

Installing and configuring the Datadog Agent

The Datadog Agent is lightweight software installed on your hosts. With additional setup, the Agent can report live processes, logs, and traces. The Agent needs an API key, which is used to associate the Agent’s data with your organization. Complete the following steps to install and configure the Datadog Agent:

  1. Create a Datadog account if you haven’t already.
  2. Login to your account.
  3. Under Integrations, choose APIs.
  4. Copy the API key.
  5. Download the Datadog Agent software for the selected platform.
  6. Install the Agent on the hosts using the API key you copied.

Collecting logs is disabled by default in Datadog Agent. To enable Agent log collection and configure a custom log collection, perform the following steps on your host:

  1. Update the Datadog Agent’s main configuration file (datadog.yaml) with the following code:
    logs_enabled: true

In Windows this file is in C:\ProgramData\Datadog.

  1. Create custom log collection by customizing the conf.yaml file.

For example in Windows this file would be in the path C:\ProgramData\Datadog\conf.d\win32_event_log.d. The following code is a sample entry in the conf.yaml file that enables collection of Windows security events:

logs:
  - type: windows_event
    channel_path: Security
    source: Security
    service: windowsOS
    sourcecategory: windowsevent

Getting the Datadog application key

The application keys in conjunction with your organization’s API key give you full access to Datadog’s programmatic API. Application keys are associated with the user account that created them. The application key is used to log all requests made to the API. Get your application key with the following steps:

  1. Login into your Datadog account.
  2. Under Integrations, choose APIs.
  3. Expand Application Keys.
  4. For Application key name, enter a name.
  5. Choose Create Application key.

Creating an Amazon AppFlow connection for Datadog

A connection defines the source or destination to use in a flow. To create a new connection for Datadog, complete the following steps:

  1. On the Amazon AppFlow console, in the navigation pane, choose Connections. 
  2. For Connectors, choose Datadog.
  3. Choose Create Connection.
  4. For API key and Application Key, enter the keys procured from the previous steps.
  5. For Connection Name, enter a name; for example, myappflowconnection.
  6. Choose Connect.

Choose Connect.

Creating a flow in Amazon AppFlow

After you create the data connection, you can create a flow that uses the connection and defines the destination, data mapping, transformation, and filters.

Creating an S3 bucket

Create an S3 bucket as your Amazon AppFlow transfer destination.

  1. On the Amazon S3 console, choose Create bucket.
  2. Enter a name for your bucket; for example, mydatadoglogbucket.
  3. Ensure that Block all public access is selected.
  4. Enable bucket versioning and encryption (optional).
  5. Choose Create bucket.
  6. Enable Amazon S3 server access logging (optional).

Configuring the flow source

After you create the Datadog agent and the S3 bucket, complete the following steps to create a flow:

  1. On the Amazon AppFlow console, in the navigation pane, choose Flows.
  2. Choose Create flow.
  3. For Flow name, enter a name for your flow; for example mydatadogflow.
  4. For Source name, choose Datadog.
  5. For Choose Datadog connection, choose the connection created earlier.
  6. For Choose Datadog object, choose Logs.

For Choose Datadog object, choose Logs.

Choosing a destination

In the Destination details section, provide the following information:

  1. For Destination name, Choose Amazon S3.
  2. For Bucket details, choose the name of the S3 bucket created earlier.

This step create a folder with the flow name you specified within the bucket to store the logs.

This step creates a folder with the flow name you specified within the bucket to store the logs.

Additional settings

You can provide additional settings for data format (JSON, CSV, Parquet), data transfer preference, filename preference, flow trigger and transfer mode. Leave all settings as default:

  • For Data format preference, choose JSON format.
  • For Data transfer preference, choose No aggregation.
  • For Filename preference, choose No timestamp.
  • For Folder structure preference, choose No timestamped folder.

Adding a flow trigger

Flows can be run on a schedule, based on an event or on demand. For this post, we choose Run on demand.

Mapping data fields

You can map manually or using a CSV file. This determines how data is transferred from source to destination. You can apply transformations like concatenation, masking, and truncation to the mappings.

  1. In the Map data fields section, for Mapping method, choose Manually map fields.
  2. For Source field name, choose Map all fields directly.
  3. Choose Next.Choose Next.

Validation

You can add validation to perform certain actions based on conditions on field values.

  1. In the Validations section, for Field name choose Content.
  2. For Condition, choose Values are missing or null.
  3. For Action, choose Ignore record.For Action, choose Ignore record.

Filters

Filters specify which records to transfer. You can add multiple filters with criterion. For the Datadog data source, it’s mandatory to specify filters for Date_Range and Query. The format for specifying filter query for metrics and logs are different.

  1. In the Add filters section, for Field name, choose Date_Range.
  2. For Condition, choose is between.
  3. For Criterion 1 and Criterion 2, enter start and end dates for log collection.
  4. Choose Add filter.
  5. For your second filter, for Field name, choose
  6. For Condition, enter host:<yourhostname> AND service:(windowsOS OR LinuxOS).
  7. Choose Save.

Choose Save.

The service names specified in the filter should have Datadog logs enabled (refer to the earlier step when you installed and configured the Datadog Agent).

The following are some examples of the filter Query for metrics:

  • load.1{*} by {host}
  • avg:system.cpu.idle{*}
  • avg:system.cpu.system{*}
  • avg:system.cpu.user{*}
  • avg:system.cpu.guest{*}
  • avg:system.cpu.user{host:yourhostname}

The following are some examples of the filter Query for logs:

  • service:servicename
  • host:myhostname
  • host:hostname1 AND service:(servicename1 OR servicename2) 

Running the Flow and querying the data

If a flow is based on a trigger, you can activate or deactivate it. If it’s on demand, it must be run each time data needs to be transferred. When you run the flow, the logs or metrics are pulled into files residing in Amazon S3. The data is in the form of a nested JSON in this example. Use AWS Glue and Athena to create a schema and query the log data.

Querying data with Athena

When the Datadog data is in AWS, there are a host of possibilities to store, process, integrate with other data sources, and perform advanced analytics. One such method is to use Athena to query the data directly from Amazon S3.

  1. On the AWS Glue console, in the navigation pane, choose Databases.
  2. Choose Add database.
  3. For Database name, enter a name such as mydatadoglogdb.
  4. Choose Create.
  5. In the navigation pane, choose Crawlers.
  6. Choose Add Crawler.
  7. For Crawler name, enter a name, such as mylogcrawler.
  8. Choose Next.
  9. For Crawler source type, select Data stores.
  10. Choose Next.
  11. In the Add a data store section, choose S3 for the data store.
  12. Enter the path to the S3 folder that has the log files; for example s3://mydatadoglogbucket/logfolder/.
  13. In the Choose an IAM role section, select Create an IAM role and provide a name.
  14. For Frequency select Run on demand.
  15. In the Configure the crawler’s output section, for Database, select the database created previously.
  16. Choose Next.
  17. Review and choose Finish.
  18. When the crawler’s status changes to Active, select it and choose Run Crawler.

When the crawler finishes running, it creates the tables and populates them with data based on the schema it infers from the JSON log files.

  1. On the Athena console, choose Settings.
  2. Select an S3 bucket and folder where Athena results are stored.
  3. In the Athena query window, enter the following query:
    select * 
    from mydatadoglogdb.samplelogfile
    where content.attributes.level = 'Information'
    

  4. Choose Run Query.

This sample query gets all the log entries where the level is Information. We’re traversing a nested JSON object in the Athena query, simply with a dot notation.

Summary

In this post, I demonstrated how we can bring Datadog data into AWS. Doing so opens a host of opportunities to use the tools available in AWS to drive advance analytics and monitoring while integrating with data from other sources.

With Amazon AppFlow, you can integrate applications in a few minute, transfer data at massive scale, and enrich the data as it flows, using mapping, merging, masking, filtering, and validation. For more information about integrating SaaS applications and AWS, see Amazon AppFlow.


About the Author

Gopalakrishnan Ramaswamy is a Solutions Architect at AWS based out of India with extensive background in database, analytics, and machine learning. He helps customers of all sizes solve complex challenges by providing solutions using AWS products and services. Outside of work, he likes the outdoors, physical activities and spending time with friends and family.

Amazon MSK backup for Archival, Replay, or Analytics

Post Syndicated from Rohit Yadav original https://aws.amazon.com/blogs/architecture/amazon-msk-backup-for-archival-replay-or-analytics/

Amazon MSK is a fully managed service that helps you build and run applications that use Apache Kafka to process streaming data. Apache Kafka is an open-source platform for building real-time streaming data pipelines and applications. With Amazon MSK, you can use native Apache Kafka APIs to populate data lakes. You can also stream changes to and from databases, and power machine learning and analytics applications.

Amazon MSK simplifies the setup, scaling, and management of clusters running Apache Kafka. MSK manages the provisioning, configuration, and maintenance of resources for a highly available Kafka clusters. It is fully compatible with Apache Kafka and supports familiar community-build tools such as MirrorMaker 2.0, Kafka Connect and Kafka streams.

Introduction

In the past few years, the volume of data that companies must ingest has increased significantly. Information comes from various sources, like transactional databases, system logs, SaaS platforms, mobile, and IoT devices. Businesses want to act as soon as the data arrives. This has resulted in increased adoption of scalable real-time streaming solutions. These solutions scale horizontally to provide the needed throughput to process data in real time, with milliseconds of latency. Customers have adopted Amazon MSK as a top choice of streaming platforms. Amazon MSK gives you the flexibility to retain topic data for longer term (default 7 days). This supports replay, analytics, and machine learning based use cases. When IT and business systems are producing and processing terabytes of data per hour, it can become expensive to store, manage, and retrieve data. This has led to legacy data archival processes moving towards cheaper, reliable, and long-term storage solutions like Amazon Simple Storage Service (S3).

Following are some of the benefits of archiving Amazon MSK topic data to Amazon S3:

  1. Reduced Cost – You only must retain the data in the cluster based on your Recovery Point Objective (RPO). Any historical data can be archived in Amazon S3 and replayed if necessary.
  2. Integration with Enterprise Data Lake – Since your data is available in S3, you can now integrate with other data analytics services like Amazon EMR, AWS Glue, Amazon Athena, to run data aggregation and analytics. For example, you can build reports to visualize month over month changes.
  3. Optimize Machine Learning Workloads – Machine learning applications will be able to train new models and improve predictions using historical streams of data available in Amazon S3. This also enables better integration with Amazon Machine Learning services.
  4. Compliance – Long-term data archival for regulatory and security compliance.
  5. Backloading data to other systems – Ability to rebuild data into other application environments such as pre-prod, testing, and more.

There are many benefits to using Amazon S3 as long-term storage for Amazon MSK topics. Let’s dive deeper into the recommended architecture for this pattern. We will present an architecture to back up Amazon MSK topics to Amazon S3 in real time. In addition, we’ll demonstrate some of the use cases previously mentioned.

Architecture

The diagram following illustrates the architecture for building a real-time archival pipeline to archive Amazon MSK topics to S3. This architecture uses an AWS Lambda function to process records from your Amazon MSK cluster when the cluster is configured as an event source. As a consumer, you don’t need to worry about infrastructure management or scaling with Lambda. You only pay for what you consume, so you don’t pay for over-provisioned infrastructure.

To create an event source mapping, you can add your Amazon MSK cluster in a Lambda function trigger. The Lambda service internally polls for new records or messages from the event source, and then synchronously invokes the target Lambda function. Lambda reads the messages in batches from one or more partitions and provides these to your function as an event payload. The function then processes records, and sends the payload to an Amazon Kinesis Data Firehose delivery stream. We use Kinesis Data Firehose delivery stream because it can natively batch, compress, transform, and encrypt your events before loading to S3.

In this architecture, Kinesis Data Firehose delivers the records received from Lambda in Gzip file to Amazon S3. These files are partitioned in hive style format by Kinesis Data Firehose:

data/year = yyyy/month = MM/day = dd/hour = HH

Figure 1. Archival Architecture

Figure 1. Archival Architecture

Let’s review some of the possible solutions that can be built on this archived data.

Integration with Enterprise Data Lake

The architecture diagram following shows how you can integrate the archived data in Amazon S3 with your Enterprise Data Lake. Since the data files are prefixed in hive style format, you can partition and store the Data Catalog in AWS Glue. With partitioning in place, you can perform optimizations like partition pruning, which enables predicate pushdown for improved performance of your analytics queries. You can also use AWS Data Analytics services like Amazon EMR and AWS Glue for batch analytics. Amazon Athena can be used to run serverless SQL-like interactive queries on visualization and data.

Data currently gets stored in JSON files. Following are some of the services/tools that can be integrated with your archive for reporting, analytics, visualization, and machine learning requirements.

Figure 2. Analytics Architecture

Figure 2. Analytics Architecture

Cloning data into other application environments

There are use cases where you would want to use this data to clone other application environments using this archive.

These clusters could be used for testing or debugging purposes. You could decide to use only a subset of your data from the archive. Let’s say you want to debug an issue beyond the configured retention period, but not replicate all the data to your testing environment. With archived data in S3, you can build downstream jobs to filter data that can be loaded into a new Amazon MSK cluster. The following diagram highlights this pattern:

Figure 3. Replay Architecture

Figure 3. Replay Architecture

Ready for a Test Drive

To help you get started, we would like to introduce an AWS Solution: AWS Streaming Data Solution for Amazon MSK (scroll down and see Option 3 tab). There is a single-click AWS CloudFormation template, which can assist you in quickly provisioning resources. This will get your real-time archival pipeline for Amazon MSK up and running quickly. This solution shortens your development time by removing or reducing the need for you to:

  • Model and provision resources using AWS CloudFormation
  • Set up Amazon CloudWatch alarms, dashboards, and logging
  • Manually implement streaming data best practices in AWS

This solution is data and logic agnostic, enabling you to start with boilerplate code and start customizing quickly. After deployment, use this solution’s monitoring capabilities to transition easily to production.

Conclusion

In this post, we explained the architecture to build a scalable, highly available real-time archival of Amazon MSK topics to long term storage in Amazon S3. The architecture was built using Amazon MSK, AWS Lambda, Amazon Kinesis Data Firehose, and Amazon S3. The architecture also illustrates how you can integrate your Amazon MSK streaming data in S3 with your Enterprise Data Lake.

Using AWS DevOps Tools to model and provision AWS Glue workflows

Post Syndicated from Nuatu Tseggai original https://aws.amazon.com/blogs/devops/provision-codepipeline-glue-workflows/

This post provides a step-by-step guide on how to model and provision AWS Glue workflows utilizing a DevOps principle known as infrastructure as code (IaC) that emphasizes the use of templates, source control, and automation. The cloud resources in this solution are defined within AWS CloudFormation templates and provisioned with automation features provided by AWS CodePipeline and AWS CodeBuild. These AWS DevOps tools are flexible, interchangeable, and well suited for automating the deployment of AWS Glue workflows into different environments such as dev, test, and production, which typically reside in separate AWS accounts and Regions.

AWS Glue workflows allow you to manage dependencies between multiple components that interoperate within an end-to-end ETL data pipeline by grouping together a set of related jobs, crawlers, and triggers into one logical run unit. Many customers using AWS Glue workflows start by defining the pipeline using the AWS Management Console and then move on to monitoring and troubleshooting using either the console, AWS APIs, or the AWS Command Line Interface (AWS CLI).

Solution overview

The solution uses COVID-19 datasets. For more information on these datasets, see the public data lake for analysis of COVID-19 data, which contains a centralized repository of freely available and up-to-date curated datasets made available by the AWS Data Lake team.

Because the primary focus of this solution showcases how to model and provision AWS Glue workflows using AWS CloudFormation and CodePipeline, we don’t spend much time describing intricate transform capabilities that can be performed in AWS Glue jobs. As shown in the Python scripts, the business logic is optimized for readability and extensibility so you can easily home in on the functions that aggregate data based on monthly and quarterly time periods.

The ETL pipeline reads the source COVID-19 datasets directly and writes only the aggregated data to your S3 bucket.

The solution exposes the datasets in the following tables:

Table Name Description Dataset location Provider
countrycode Lookup table for country codes s3://covid19-lake/static-datasets/csv/countrycode/ Rearc
countypopulation Lookup table for the population of each county s3://covid19-lake/static-datasets/csv/CountyPopulation/ Rearc
state_abv Lookup table for US state abbreviations s3://covid19-lake/static-datasets/json/state-abv/ Rearc
rearc_covid_19_nyt_data_in_usa_us_counties Data on COVID-19 cases at US county level s3://covid19-lake/rearc-covid-19-nyt-data-in-usa/csv/us-counties/ Rearc
rearc_covid_19_nyt_data_in_usa_us_states Data on COVID-19 cases at US state level s3://covid19-lake/rearc-covid-19-nyt-data-in-usa/csv/us-states/ Rearc
rearc_covid_19_testing_data_states_daily Data on COVID-19 cases at US state level s3://covid19-lake/rearc-covid-19-testing-data/csv/states_daily/ Rearc
rearc_covid_19_testing_data_us_daily US total test daily trend s3://covid19-lake/rearc-covid-19-testing-data/csv/us_daily/ Rearc
rearc_covid_19_testing_data_us_total_latest US total tests s3://covid19-lake/rearc-covid-19-testing-data/csv/us-total-latest/ Rearc
rearc_covid_19_world_cases_deaths_testing World total tests s3://covid19-lake/rearc-covid-19-world-cases-deaths-testing/ Rearc
rearc_usa_hospital_beds Hospital beds and their utilization in the US s3://covid19-lake/rearc-usa-hospital-beds/ Rearc
world_cases_deaths_aggregates Monthly and quarterly aggregate of the world s3://<your-S3-bucket-name>/covid19/world-cases-deaths-aggregates/ Aggregate

Prerequisites

This post assumes you have the following:

  • Access to an AWS account
  • The AWS CLI (optional)
  • Permissions to create a CloudFormation stack
  • Permissions to create AWS resources, such as AWS Identity and Access Management (IAM) roles, Amazon Simple Storage Service (Amazon S3) buckets, and various other resources
  • General familiarity with AWS Glue resources (triggers, crawlers, and jobs)

Architecture

The CloudFormation template glue-workflow-stack.yml defines all the AWS Glue resources shown in the following diagram.

architecture diagram showing ETL process

Figure: AWS Glue workflow architecture diagram

Modeling the AWS Glue workflow using AWS CloudFormation

Let’s start by exploring the template used to model the AWS Glue workflow: glue-workflow-stack.yml

We focus on two resources in the following snippet:

  • AWS::Glue::Workflow
  • AWS::Glue::Trigger

From a logical perspective, a workflow contains one or more triggers that are responsible for invoking crawlers and jobs. Building a workflow starts with defining the crawlers and jobs as resources within the template and then associating it with triggers.

Defining the workflow

This is where the definition of the workflow starts. In the following snippet, we specify the type as AWS::Glue::Workflow and the property Name as a reference to the parameter GlueWorkflowName.

Parameters:
  GlueWorkflowName:
    Type: String
    Description: Glue workflow that tracks all triggers, jobs, crawlers as a single entity
    Default: Covid_19

Resources:
  Covid19Workflow:
    Type: AWS::Glue::Workflow
    Properties: 
      Description: Glue workflow that tracks specified triggers, jobs, and crawlers as a single entity
      Name: !Ref GlueWorkflowName

Defining the triggers

This is where we define each trigger and associate it with the workflow. In the following snippet, we specify the property WorkflowName on each trigger as a reference to the logical ID Covid19Workflow.

These triggers allow us to create a chain of dependent jobs and crawlers as specified by the properties Actions and Predicate.

The trigger t_Start utilizes a type of SCHEDULED, which means that it starts at a defined time (in our case, one time a day at 8:00 AM UTC). Every time it runs, it starts the job with the logical ID Covid19WorkflowStarted.

The trigger t_GroupA utilizes a type of CONDITIONAL, which means that it starts when the resources specified within the property Predicate have reached a specific state (when the list of Conditions specified equals SUCCEEDED). Every time t_GroupA runs, it starts the crawlers with the logical ID’s CountyPopulation and Countrycode, per the Actions property containing a list of actions.

  TriggerJobCovid19WorkflowStart:
    Type: AWS::Glue::Trigger
    Properties:
      Name: t_Start
      Type: SCHEDULED
      Schedule: cron(0 8 * * ? *) # Runs once a day at 8 AM UTC
      StartOnCreation: true
      WorkflowName: !Ref GlueWorkflowName
      Actions:
        - JobName: !Ref Covid19WorkflowStarted

  TriggerCrawlersGroupA:
    Type: AWS::Glue::Trigger
    Properties:
      Name: t_GroupA
      Type: CONDITIONAL
      StartOnCreation: true
      WorkflowName: !Ref GlueWorkflowName
      Actions:
        - CrawlerName: !Ref CountyPopulation
        - CrawlerName: !Ref Countrycode
      Predicate:
        Conditions:
          - JobName: !Ref Covid19WorkflowStarted
            LogicalOperator: EQUALS
            State: SUCCEEDED

Provisioning the AWS Glue workflow using CodePipeline

Now let’s explore the template used to provision the CodePipeline resources: codepipeline-stack.yml

This template defines an S3 bucket that is used as the source action for the pipeline. Any time source code is uploaded to a specified bucket, AWS CloudTrail logs the event, which is detected by an Amazon CloudWatch Events rule configured to start running the pipeline in CodePipeline. The pipeline orchestrates CodeBuild to get the source code and provision the workflow.

For more information on any of the available source actions that you can use with CodePipeline, such as Amazon S3, AWS CodeCommit, Amazon Elastic Container Registry (Amazon ECR), GitHub, GitHub Enterprise Server, GitHub Enterprise Cloud, or Bitbucket, see Start a pipeline execution in CodePipeline.

We start by deploying the stack that sets up the CodePipeline resources. This stack can be deployed in any Region where CodePipeline and AWS Glue are available. For more information, see AWS Regional Services.

Cloning the GitHub repo

Clone the GitHub repo with the following command:

$ git clone https://github.com/aws-samples/provision-codepipeline-glue-workflows.git

Deploying the CodePipeline stack

Deploy the CodePipeline stack with the following command:

$ aws cloudformation deploy \
--stack-name codepipeline-covid19 \
--template-file cloudformation/codepipeline-stack.yml \
--capabilities CAPABILITY_NAMED_IAM \
--no-fail-on-empty-changeset \
--region <AWS_REGION>

When the deployment is complete, you can view the pipeline that was provisioned on the CodePipeline console.

CodePipeline console showing the deploy pipeline in failed state

Figure: CodePipeline console

The preceding screenshot shows that the pipeline failed. This is because we haven’t uploaded the source code yet.

In the following steps, we zip and upload the source code, which triggers another (successful) run of the pipeline.

Zipping the source code

Zip the source code containing Glue scripts, CloudFormation templates, and Buildspecs file with the following command:

$ zip -r source.zip . -x images/\* *.history* *.git* *.DS_Store*

You can omit *.DS_Store* from the preceding command if you are not a Mac user.

Uploading the source code

Upload the source code with the following command:

$ aws s3 cp source.zip s3://covid19-codepipeline-source-<AWS_ACCOUNT_ID>-<AWS_REGION>

Make sure to provide your account ID and Region in the preceding command. For example, if your AWS account ID is 111111111111 and you’re using Region us-west-2, use the following command:

$ aws s3 cp source.zip s3://covid19-codepipeline-source-111111111111-us-west-2

Now that the source code has been uploaded, view the pipeline again to see it in action.

CodePipeline console showing the deploy pipeline in success state

Figure: CodePipeline console displaying stage “Deploy” in-progress

Choose Details within the Deploy stage to see the build logs.

CodeBuild console displaying build logs

Figure: CodeBuild console displaying build logs

To modify any of the commands that run within the Deploy stage, feel free to modify: deploy-glue-workflow-stack.yml

Try uploading the source code a few more times. Each time it’s uploaded, CodePipeline starts and runs another deploy of the workflow stack. If nothing has changed in the source code, AWS CloudFormation automatically determines that the stack is already up to date. If something has changed in the source code, AWS CloudFormation automatically determines that the stack needs to be updated and proceeds to run the change set.

Viewing the provisioned workflow, triggers, jobs, and crawlers

To view your workflows on the AWS Glue console, in the navigation pane, under ETL, choose Workflows.

Glue console showing workflows

Figure: Navigate to Workflows

To view your triggers, in the navigation pane, under ETL, choose Triggers.

Glue console showing triggers

Figure: Navigate to Triggers

To view your crawlers, under Data Catalog, choose Crawlers.

Glue console showing crawlers

Figure: Navigate to Crawlers

To view your jobs, under ETL, choose Jobs.

Glue console showing jobs

Figure: Navigate to Jobs

Running the workflow

The workflow runs automatically at 8:00 AM UTC. To start the workflow manually, you can use either the AWS CLI or the AWS Glue console.

To start the workflow with the AWS CLI, enter the following command:

$ aws glue start-workflow-run --name Covid_19 --region <AWS_REGION>

To start the workflow on the AWS Glue console, on the Workflows page, select your workflow and choose Run on the Actions menu.

Glue console run workflow

Figure: AWS Glue console start workflow run

To view the run details of the workflow, choose the workflow on the AWS Glue console and choose View run details on the History tab.

Glue console view run details of a workflow

Figure: View run details

The following screenshot shows a visual representation of the workflow as a graph with your run details.

Glue console showing visual representation of the workflow as a graph.

Figure: AWS Glue console displaying details of successful workflow run

Cleaning up

To avoid additional charges, delete the stack created by the CloudFormation template and the contents of the buckets you created.

1. Delete the contents of the covid19-dataset bucket with the following command:

$ aws s3 rm s3://covid19-dataset-<AWS_ACCOUNT_ID>-<AWS_REGION> --recursive

2. Delete your workflow stack with the following command:

$ aws cloudformation delete-stack --stack-name glue-covid19 --region <AWS_REGION>

To delete the contents of the covid19-codepipeline-source bucket, it’s simplest to use the Amazon S3 console because it makes it easy to delete multiple versions of the object at once.

3. Navigate to the S3 bucket named covid19-codepipeline-source-<AWS_ACCOUNT_ID>- <AWS_REGION>.

4. Choose List versions.

5. Select all the files to delete.

6. Choose Delete and follow the prompts to permanently delete all the objects.

S3 console delete all object versions

Figure: AWS S3 console delete all object versions

7. Delete the contents of the covid19-codepipeline-artifacts bucket:

$ aws s3 rm s3://covid19-codepipeline-artifacts-<AWS_ACCOUNT_ID>-<AWS-REGION> --recursive

8. Delete the contents of the covid19-cloudtrail-logs bucket:

$ aws s3 rm s3://covid19-cloudtrail-logs-<AWS_ACCOUNT_ID>-<AWS-REGION> --recursive

9. Delete the pipeline stack:

$ aws cloudformation delete-stack --stack-name codepipeline-covid19 --region <AWS-REGION>

Conclusion

In this post, we stepped through how to use AWS DevOps tooling to model and provision an AWS Glue workflow that orchestrates an end-to-end ETL pipeline on a real-world dataset.

You can download the source code and template from this Github repository and adapt it as you see fit for your data pipeline use cases. Feel free to leave comments letting us know about the architectures you build for your environment. To learn more about building ETL pipelines with AWS Glue, see the AWS Glue Developer Guide and the AWS Data Analytics learning path.

About the Authors

Nuatu Tseggai

Nuatu Tseggai is a Cloud Infrastructure Architect at Amazon Web Services. He enjoys working with customers to design and build event-driven distributed systems that span multiple services.

Suvojit Dasgupta

Suvojit Dasgupta is a Sr. Customer Data Architect at Amazon Web Services. He works with customers to design and build complex data solutions on AWS.

Data monetization and customer experience optimization using telco data assets: Part 1

Post Syndicated from Vikas Omer original https://aws.amazon.com/blogs/big-data/part-1-data-monetization-and-customer-experience-optimization-using-telco-data-assets/

The landscape of the telecommunications industry is changing rapidly. For telecom service providers (TSPs), revenue from core voice and data services continues to shrink due to regulatory pressure and emerging OTT players that offer an attractive alternative. Despite increasing demand from customers for bandwidth, speed, and efficiency, TSPs are finding that ROI from implementing new access technologies like 5G are unsubstantial.

To overcome the risk of being relegated to a utility or dumb pipe, TSPs today are looking to diversify, adopting alternative business models to generate new revenue streams.

In recent times, adopting customer experience (CX) and data monetization initiatives has been a key theme across all industries. Although many Tier-1 TSPs are leading this transformation by using new technologies to improve CX and improve profitability, many TSPs have yet to embark on this challenging but rewarding journey.

Building and implementing a CX management and data monetization strategy

Data monetization is often misunderstood as making dollars by selling data, but what it really means is to drive revenue by increasing the top line or the bottom line. It can be tangible or intangible, internal or external, or by making use of data assets.

According to Gartner, most data and analytics leaders are looking to increase investments in business intelligence (BI) and analytics (see the following study results).

The preceding visualization is from “The 2019 CIO Agenda: Securing a New Foundation for Digital Business”, published October 15, 2018.

Although the external monetization opportunities are limited due to strict regulations, a plethora of opportunities exist for TSPs to monetize data both internally (regulated but much less compared to external) and externally via a marketplace (highly regulated). If TSPs can shift their mindsets from selling data to focus on using data insights for monetization and improving CX, they can adopt a significant number of use cases to realize an immediate positive impact.

Tapping and utilizing insights around customer behavior acts like a Swiss Army Knife for businesses. You can use these insights to drive CX, hyper-personalization and localization, micro-segmentation, subscriber retention, loyalty and rewards programs, network planning and optimization, internal and external data monetization, and more. The following are some use cases that can be driven using CX and data monetization strategies:

  • Segmentation/micro-segmentation (cross-sell, up-sell, targeted advertising, enhanced market locator); for example:
    • Identify targets for consuming baby products or up-selling a kids-related TV channel
    • Identify females in the age range of 18-35 to target for high-end beauty products or apparels

You can build hundreds of such segments.

  • Personalized loyalty and reward programs (incentivize customers with what they like). For example, movie tickets or discounts for a movie lover, or food coupons and deals for a food lover.
  • CX-driven network optimization (allocate more resources to streaming hotspots with high-value customers).
  • Identifying potential partners for joint promotions. For example, bundling device offers with a music app subscription.
  • Hyper-personalization. For example, personalized recommendations for on-portal apps and websites.
  • Next best action and next best offer. For example, intelligent bundling and packaging of offerings.

Challenges with driving CX and data monetization

In this digital era, TSPs consider data analytics a strategic pillar in their quest to evolve into a true data-driven organization. Although many TSPs are harnessing the power of data to drive and improve CX, there are technological gaps and challenges to baseline and formulate internal and external data monetization strategies. Some of these challenges include:

  • Non-overlapping technology investments for CX and data monetization due to misaligned business and IT initiatives
  • Huge CAPEX requirements to process massive volumes of data
  • Inability to unearth hidden insights due to siloed data initiatives
  • Inability to marry various datasets together due to missing pieces around data standardization techniques
  • Lack of user-friendly tools and techniques to discover, ingest, process, correlate, analyze, and consume the data
  • Inability to experiment and innovate with agility and low cost

In this two-part series, I demonstrate a working solution with an AWS CloudFormation template for how a TSP can use existing data assets to generate new revenue streams and improve and personalize CX using AWS services. I also include key pieces of information around data standardization, baselining an analytics data model to marry different datasets in the data warehouse, self-service analytics, metadata search, and media dictionary framework.

In this post, you deploy the stack using a CloudFormation template and follow simple steps to transform, enrich, and bring multiple datasets together so that they can be correlated and queried.

In part 2, you learn how advanced business users can query enriched data and derive meaningful insights using Amazon Redshift and Amazon Redshift Spectrum or Amazon Athena, enable self-service analytics for business users, and publish ready-made dashboards via Amazon QuickSight.

Solution overview

The main ingredient of this solution is Packet Switch (PS) probe data embedded with a deep packet inspection (DPI) engine, which can reveal a lot of information about user interests and usage behavior. This data is transformed and enriched with DPI media and device dictionaries, along with other standard telco transformations to deduce insights, profile and micro-segment subscribers. Enriched data is made available along with other transformed dimensional attributes (CRM, subscriptions, media, carrier, device and network configuration management) for rich slicing and dicing.

For example, the following QuickSight visualizations depict a use case to identity music lovers ages 18-55 with Apple devices. You can also generate micro-segments by capturing the top X subscribers by consumption or adding KPIs like recency and frequency.

The following diagram illustrates the workflow of the solution.

For this post, AWS CloudFormation sets up the required folder structure in Amazon Simple Storage Service (Amazon S3) and provides sample data and dictionary file. Most of the data included as part of the CloudFormation template is dummy and is as follows:

  • CRM
  • Subscription and subscription mapping
  • Network 3G & 4G configuration management
  • Operator PLMN
  • DPI and device dictionary
  • PS probe data

Descriptions of all the input datasets and attributes are available with AWS Glue Data Catalog tables and as part of Amazon Redshift metadata for all tables in Amazon Redshift.

The workflow for this post includes the following steps:

  1. Catalog all the files in the AWS Glue Data Catalog using the following AWS Glue data crawlers:
    1. DPI data crawler (to crawl incoming PS probe DPI data)
    2. Dimension data crawler (to crawl all dimension data)
  2. Update attribute descriptions in the Data Catalog (this step is optional).
  3. Create Amazon Redshift schema, tables, procedures, and metadata using an AWS Lambda
  4. Process each data source file using separate AWS Glue Spark jobs. These jobs enrich, transform, and apply business filtering rules before ingesting data into an Amazon Redshift cluster.
  5. Trigger Amazon Redshift hourly and daily aggregation procedures using Lambda functions to aggregate data from the raw table into hourly and daily tables.

Part 2 includes the following steps:

  1. Catalog the processed raw, aggregate, and dimension data in the Data Catalog using the DPI processed data crawler.
  2. Interactively query data directly from Amazon S3 using Amazon Athena.
  3. Enable self-service analytics using QuickSight to prepare and publish insights based on data residing in the Amazon Redshift cluster.

The workflow can change depending on the complexity of the environment and your use case, but the fundamental idea remains the same. For example, your use case could be processing PS probe DPI data in real time rather than in batch mode, keeping hot data in Amazon Redshift, storing cold and historical data on Amazon S3, or archiving data in Amazon S3 Glacier for regulatory compliance. Amazon S3 offers several storage classes designed for different use cases. You can move the data among these different classes based on Amazon S3 lifecycle properties. For more information, see Amazon S3 Storage Classes.

Prerequisites

For this walkthrough, you should have the following prerequisites:

For more information about AWS Regions and where AWS services are available, see Region Table.

Creating your resources with AWS CloudFormation

To get started, create your resources with the following CloudFormation stack.

  1. Click the Launch Stack button below:
  2. Leave the parameters at their default, with the following exceptions:
    1. Enter RedshiftPassword and S3BucketNameParameter parameters, which aren’t populated by default.
    2. An Amazon S3 bucket name is globally unique, so enter a unique bucket name for S3BucketNameParameter.

The following screenshot shows the parameters for our use case.

  1. Choose Next.
  2. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  3. Choose Create stack.

It takes approximately 10 minutes to deploy the stack. For more information about the key resources deployed through the stack, see Data Monetization and Customer Experience(CX)Optimization using telco data assets: Amazon CloudFormation stack details. You can view all the resources on the AWS CloudFormation console. For instructions, see Viewing AWS CloudFormation stack data and resources on the AWS Management Console.

The CloudFormation stack we provide in this post serves as a baseline and is not a production-grade solution.

Building a Data Catalog using AWS Glue

You start by discovering sample data stored on Amazon S3 through an AWS Glue crawler. For more information, see Populating the AWS Glue Data Catalog. To catalog data, complete the following steps:

  1. On the AWS Glue console, in the navigation pane, choose Crawlers.
  2. Select DPIRawDataCrawler and choose Run crawler.
  3. Select DimensionDataCrawler and choose Run crawler.
  4. Wait for the crawlers to show the status Stopping.

The tables added against the DimensionDataCrawler and DPIRawDataCrawler crawlers should show 9 and 1, respectively.

  1. In the navigation pane, choose Tables.
  2. Verify the following 10 tables are created under the cemdm database:
    • d_crm_demographics
    • d_device
    • d_dpi_dictionary
    • d_network_cm_3g
    • d_network_cm_4g
    • d_operator_plmn
    • d_tac
    • d_tariff_plan
    • d_tariff_plan_desc
    • raw_dpi_incoming

Updating attribute descriptions in the Data Catalog

The AWS Glue Data Catalog has a comment field to store the metadata under each table in the AWS Glue database. Anybody who has access to this database can easily understand attributes coming from different data sources through metadata provided in the comment field. The CloudFormation stack includes a CSV file that contains a description of all the attributes from the source files. This file is used to update the comment field for all the Data Catalog tables this stack deployed. This step is not mandatory to proceed with the workflow. However, if you want to update the comment field against each table, complete the following steps:

  1. On the Lambda console, in the navigation pane, choose Functions.
  2. Choose the GlueCatalogUpdate
  3. Configure a test event by choosing Configure test events.
  4. For Event name, enter Test.
  5. Choose Create.
  6. Choose Test.

You should see a message that the test succeeded, which implies that the Data Catalog attribute description is complete.

Attributes of the table under the Data Catalog database should now have descriptions in the Comment column. For example, the following screenshot shows the d_operator_plmn table.

Creating Amazon Redshift schema, tables, procedures, and metadata

To create schema, tables, procedures, and metadata in Amazon Redshift, complete the following steps:

  1. On the Lambda console, in the navigation pane, choose Functions.
  2. Choose the RedshiftDDLCreation
  3. Choose Configure test events.
  4. For Event name, enter Test.
  5. Choose Create.
  6. Choose Test.

You should see a message that the test succeeded, which means that the schema, table, procedures, and metadata generation is complete.

Running AWS Glue ETL jobs

AWS Glue provides the serverless, scalable, and distributed processing capability to transform and enrich your datasets. To run AWS Glue extract, transform, and load (ETL) jobs, complete the following steps:

  1. On the AWS Glue console, in the navigation pane, choose Jobs.
  2. Select the following jobs (one at a time) and choose Run job from Action
    • d_customer_demographics
    • d_device
    • d_dpi_dictionary
    • d_location
    • d_operator_plmn
    • d_tac
    • d_tariff_plan
    • d_tariff_plan_desc
    • f_dpi_enrichment

You can run all these jobs in parallel.

All dimension data jobs should finish successfully within 3 minutes, and the fact data enrichment job should finish within 5 minutes.

  1. Verify the jobs are complete by selecting each job and checking Run status on the History tab.

Aggregating hourly and daily DPI data in Amazon Redshift

To aggregate hourly and daily sample data in Amazon Redshift using Lambda functions, complete the following steps:

  1. On the Lambda console, in the navigation pane, choose Functions.
  2. Choose the RedshiftDPIHourlyAgg function.
  3. Choose Configure test events.
  4. For Event name, enter Test.
  5. Choose Create.
  6. Choose Test.

You should see a message that the test succeeded, which means that hourly aggregation is complete.

  1. In the navigation pane, choose Functions.
  2. Choose the RedshiftDPIDailyAgg function.
  3. Choose Configure test events.
  4. For Event name, enter Test.
  5. Choose Create.
  6. Choose Test.

You should see a message that the test succeeded, which means that daily aggregation is complete.

Both hourly and daily Lambda functions are hardcoded with the date and hour to aggregate the sample data. To make them generic, there are a few commented lines of code that need to be uncommented and a few lines to be commented. Both functions are also equipped with offset parameters to decide how far back in time you want to do the aggregations. However, this isn’t required for this walkthrough.

You can schedule these functions with CloudWatch. However, this is not required for this walkthrough.

So far, we have completed the following:

  1. Deployed the CloudFormation stack.
  2. Cataloged sample raw data by running DimensionDataCrawler and DPIRawDataCrawler AWS Glue crawlers.
  3. Updated attribute descriptions in the AWS Glue Data Catalog by running the GlueCatalogUpdate Lambda function.
  4. Created Amazon Redshift schema, tables, stored procedures, and metadata through the RedshiftDDLCreation Lambda function.
  5. Ran all AWS Glue ETL jobs to transform raw data and load it into their respective Amazon Redshift tables.
  6. Aggregated hourly and daily data from enriched raw data into hourly and daily Amazon Redshift tables by running the RedshiftDPIHourlyAgg and RedshiftDPIDailyAgg Lambda functions.

Cleaning up

If you don’t plan to proceed to the part 2 of this series, and want to avoid incurring future charges, delete the resources you created by deleting the CloudFormation stack.

Conclusion

In this post, I demonstrated how you can easily transform, enrich, and bring multiple telco datasets together in an Amazon Redshift data warehouse cluster. You can correlate these datasets to produce multi-dimensional insights from several angles, like subscriber, network, device, subscription, roaming, and more.

In part 2 of this series, I demonstrate how you can enable data analysts, scientists, and advanced business users to query data from Amazon Redshift or Amazon S3 directly.

As always, AWS welcomes feedback. This is a wide space to explore, so reach out to us if you want a deep dive into building this solution and more on AWS. Please submit comments or questions in the comments section.


About the Author

Vikas Omer is an analytics specialist solutions architect at Amazon Web Services. Vikas has a strong background in analytics, customer experience management (CEM), and data monetization, with over 11 years of experience in the telecommunications industry globally. With six AWS Certifications, including Analytics Specialty, he is a trusted analytics advocate to AWS customers and partners. He loves traveling, meeting customers, and helping them become successful in what they do.

Developing, testing, and deploying custom connectors for your data stores with AWS Glue

Post Syndicated from Bo Li original https://aws.amazon.com/blogs/big-data/developing-testing-and-deploying-custom-connectors-for-your-data-stores-with-aws-glue/

AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development. AWS Glue already integrates with various popular data stores such as the Amazon Redshift, RDS, MongoDB, and Amazon S3. Organizations continue to evolve and use a variety of data stores that best fit their applications and data requirements. We recently announced general availability of AWS Glue custom connectors, which makes it easy to discover and integrate with variety of additional data sources, such as SaaS applications and your custom data sources. With just a few clicks, you can search and select connectors from the AWS Marketplace and begin your data preparation workflow in minutes. We are also releasing a new framework to develop, validate, and deploy your own custom connectors (bring your own connectors / BYOC).

In this blog post, we go over three key aspects of AWS Glue custom connectors. First, we introduce the two mechanisms using which you can plug in a custom connector by either subscribing from AWS Marketplace or bring your own connector into Glue Spark jobs. Second, we describe the three interfaces based on Apache Spark DataSource, Amazon Athena Federated Query, and JDBC, which you can use to develop a custom connector with the released Glue Spark runtime.  Finally, we get deeper into the development process, and describe how Glue Spark runtime interfaces simplify data integration by offering powerful features that are built-in for Glue custom connectors. These features include job bookmarks for incremental loads of your data, at-source data filtering with SQL queries, partitioned execution for data parallelism, data type mapping, advanced Spark and built-in AWS Glue data transformations, integration with AWS Secrets Manager to securely store authentication credentials, AWS Glue Data Catalog for storing connections and table metadata. Glue custom connectors are also supported with AWS Glue Studio that enables visual authoring of your data integration jobs.

These data sources cover the following categories:

This post introduces two mechanisms to use custom connectors with AWS Glue Spark runtime and AWS Glue Studio console. First, we go over the user experience for seamless discovery and subscription to custom connectors developed by AWS Glue partners that are hosted on AWS Marketplace. Next, we go deeper into the five simple steps to develop and test your own connectors with AWS Glue Spark runtime, and deploy them into your production Apache Spark applications for ETL and analytics workloads that run on AWS Glue.

AWS Glue custom connectors: AWS Marketplace and BYOC

You can use an AWS Glue connector available on AWS Marketplace or bring your own connector (BYOC) and plug it into AWS Glue Spark runtime. This is in addition to the native connectors available with AWS Glue.

Connectors available on AWS Marketplace

As we make AWS Glue custom connectors generally available today, we have an ecosystem of Glue connectors listed on AWS Marketplace available from different AWS Glue partners, big data architects, and third-party developers. The following posts go into more detail on using some of these connectors for different use cases with AWS Glue:

BYOC connector example

Customers and application developers also need a method to develop connectors for custom data stores. The next section describes the end-to-end process to develop and test a custom connector using the AWS Glue Spark runtime library and interfaces locally.

After testing and validating, you can package and deploy the custom connector using the BYOC workflow in AWS Glue Studio. For instructions on deploying and using the Snowflake connector with AWS Glue jobs as a BYOC custom connector, see Performing data transformations using Snowflake and AWS Glue.

AWS Glue Spark runtime connector interfaces

AWS Glue Spark runtime offers three interfaces to plug in custom connectors built for existing frameworks: the Spark DataSource API, Amazon Athena Data Source API, or Java JDBC API. The following code snippets show how you can plug in these connectors into AWS Glue Spark runtime without any changes.

For connectors subscribed from AWS Marketplace, use the following code:

Datasource = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark|athena|jdbc", connection_options = {"dbTable":"Account","connectionName":"my-marketplace-connection"}, transformation_ctx = "DataSource0)

For custom connectors developed and deployed with AWS Glue, use the following code:

Datasource = glueContext.create_dynamic_frame.from_options(connection_type = "custom.spark|athena|jdbc", connection_options = {"dbTable":"Account","connectionName":"my-custom-connection"}, transformation_ctx = "DataSource0")

The following table summarizes the interfaces you need to implement for connectivity with AWS Glue Spark runtime using the Spark DataSource API.

Interfaces Description
DataSourceV2 The base interface for Spark DataSource v2 API.
ReadSupport A mix-in interface for DataSourceV2 for the connector to provide data reading ability and scan the data from the data source.
DataSourceReader A data source reader that is created by ReadSupport to scan the data from this data source. It also supports reading actual schema and generating a list of InputPartition for parallel reads from Spark executors.
InputPartition Each InputPartition is responsible for creating a data reader to read data into one RDD partition. InputPartitions are serialized and sent to executors, then the reader is created on executors to do the actual reading.
InputPartitionReader Responsible for reading data into an RDD partition.

The following table summarizes the interfaces you need to implement for connectivity with AWS Glue Spark runtime using the Athena Data Source API.

Interfaces Description
MetadataHandler
doGetSplits Splits up the reads required to scan the requested batch of partitions.
doListSchemaNames Gets the list of schemas (databases) that this source contains.
doGetTable Gets a definition (such as field names, types, and descriptions) of a table.
doListTables Gets the list of tables that this source contains.
getPartitions Gets the partitions that must be read from the request table.
RecordHandler
doReadRecords Reads the row data associated with the provided split.

The following diagram shows the class structure for the three interfaces and their execution on Spark drivers to read metadata and Spark executors to read data from the underlying datasource. The classes shown in pink are the ones that need to be implemented as part of the connector. Classes shown in green are already implemented as part of the Glue Spark runtime.

Steps to develop a custom connector

In the following sections, we describe how to develop, test, and validate an AWS Glue custom connector. We also show how to deploy the connectors to AWS Glue jobs using the AWS Glue Studio console.

Implementing the solution includes the following 5 high-level steps:

  1. Download and install AWS Glue Spark runtime, and review sample connectors.
  2. Develop using the required connector interface.
  3. Build, test, and validate your connector locally.
  4. Package and deploy the connector on AWS Glue.
  5. Use AWS Glue Studio to author a Spark application with the connector.

Downloading and installing AWS Glue Spark runtime and reviewing sample connectors

The first step to developing a connector is to install the Glue Spark runtime from Maven and refer to AWS Glue sample connectors on AWS Glue GitHub repository.

Developing and testing using the required connector interface

As discussed earlier, you can develop AWS Glue custom connectors with one of the following interfaces:

  • Spark DataSource
  • Athena Federated Query
  • JDBC

In this section, we walk you through each interface.

Spark DataSource interface

We use a simple example to illustrate the development of an AWS Glue custom connector with the Spark DataSource interface. You can also find intermediate and complex examples for developing connectors with more functionality for different data sources.

This solution implements a DataSourceReader that returns predefined data as InputPartitions stored in-memory with a given schema. The following interfaces need to be implemented for DataSourceReader. The DataSourceReader implementation runs on the Spark driver and plans the execution of Spark executors reading the data in InputPartitions:

class Reader implements DataSourceReader {
        public StructType readSchema() { ... }
        
        public List<InputPartition<InternalRow>> planInputPartitions() { ... }
}

The InputPartitions are read in parallel by different Spark executors using the InputPartitionReader implementation, which returns the records in Spark’s InternalRow format. The InputPartitionReader is essentially implemented to return an iterator of the records scanned from the underlying data store. Refer the following code:

class SimpleInputPartitionReader implements InputPartitionReader<InternalRow> {
    public boolean next() { ... }

    public InternalRow get() { ... }

    public void close() throws IOException { ... }
}

The second connector example shows how to use an Amazon S3 client to read the data in CSV format from an S3 bucket and path supplied as reader options. The third connector example shows how to use a JDBC driver to read data from a MySQL source. It also shows how to push down a SQL query to filter records at source and authenticate with the user name and password supplied as reader options.

You can plug the connectors based on the Spark DataSource API into AWS Glue Spark runtime as follows. You need to supply the connection_type for custom.spark and an AWS Glue catalog connection containing the reader options, such as user name and password. AWS Glue Spark runtime automatically converts the data source into a Glue DynamicFrame. The following code is an example to plug in the Elasticsearch Spark connector:

Datasource = glueContext.create_dynamic_frame.from_options(connection_type = "custom.spark", connection_options = {"path": "test", "es.nodes": "https://search-glue-etl-job-xxx.us-east-1.es.amazonaws.com","es.net.http.auth.user": "user","es.net.http.auth.pass": "pwd","es.port": "443","es.nodes.wan.only": "true" ,"connectionName":"my-custom-es-connection"}, transformation_ctx = "DataSource0")

AWS Glue Studio provides a visual ETL console that can also auto-generate the preceding code to construct a DynamicFrame for a deployed Spark connector (as described later in this post).

Athena Federated Query interface

AWS Glue Spark runtime also supports connectors developed with the Athena connector interface for federated queries. Similar to the Spark DataSource API, it requires implementation of two key handler interfaces: MetadataHandler and RecordHandler.

The MetadataHandler implementation runs on the Spark driver and contains the functions required to compute the schema, tables, and table partitions, and plan the actual scan by splitting the reads of individual partitions into different splits. See the following code:

public class MyMetadataHandler extends MetadataHandler{
       ListSchemasResponse doListSchemaNames(BlockAllocator allocator, ListSchemasRequest request) { … }

       ListTablesResponse doListTables(BlockAllocator allocator, ListTablesRequest request) { … }

       GetTableResponse doGetTable(BlockAllocator allocator, GetTableRequest request) { … }

       void getPartitions(BlockWriter blockWriter, GetTableLayoutRequest request, QueryStatusChecker queryStatusChecker) { … }

       GetSplitsResponse doGetSplits(BlockAllocator allocator, GetSplitsRequest request) { … }
}

The RecordHandler implements the reader to scan the data from the underlying data store associated with the split contained in the ReadRecordsRequest structure.

AWS Glue custom connectors uses the Athena RecordHandler interface, but it do not need the BlockSpiller implementation or use AWS Lambda to read the data from the underlying data store. Instead, the implementation directly runs inline within each Spark executor to return the records as Apache Arrow column vector batches. Refer the following code:

public class MyRecordHandlerextends RecordHandler{

void readWithConstraint(ConstraintEvaluator constraints, BlockSpiller spiller, ReadRecordsRequest recordsRequest, QueryStatusChecker queryStatusChecker){…}
}

AWS Glue Spark runtime can convert records returned by plugging in an Athena connector to an AWS Glue DynamicFrame as follows:

Datasource = glueContext.create_dynamic_frame.from_options(connection_type = "custom.athena", connection_options = {"tableName":"table","schemaName":"schema","connectionName":"my-custom-athena-connection"}, transformation_ctx = "DataSource0")

JDBC interface

AWS Glue Spark runtime also allows you to plug in any connector compliant with the JDBC interface. It allows you to pass in any connection option available with the JDBC connector as follows:

Datasource = glueContext.create_dynamic_frame.from_options(connection_type = "custom.jdbc", connection_options = {"dbTable":"Account","connectionName":"my-custom-jdbc-connection"}, transformation_ctx = "DataSource0")

Advanced ETL and analytics with AWS Glue

AWS Glue Spark runtime also provides different features supported out-of-the-box with the custom connectors to enable advanced extract, data transformations, and load.

AWS Glue Studio for visual authoring of ETL jobs

Data type mapping

You can type cast the columns while reading them from the underlying data store itself. For example, a dataTypeMapping of {"INTEGER":"STRING"} casts all integer columns to string while parsing the records and constructing the DynamicFrame. This also helps you cast columns to types of your choice. Refer the following code:

DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "custom.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}", connectionName":"test-connection-snowflake-jdbc"}, transformation_ctx = "DataSource0")

Partitioning for parallel reads

AWS Glue allows you to read data in parallel from the data store by partitioning it on a column by specifying the partitionColumn, lowerBound, upperBound, and numPartitions. This allows you to use data parallelism and multiple Spark executors allocated for the Spark application. Refer the following code, which reads data from Snowflake using 4 Spark executors in parallel. Data is partitioned across executors uniformly along the id column in the range [0, 200]:

DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "custom.jdbc", connection_options = {"upperBound":"200","numPartitions":"4","partitionColumn":"id","lowerBound":"0","connectionName":"my-connection-snowflake"}, transformation_ctx = "DataSource0")

Glue Data Catalog connections

You can encapsulate all your connection properties with Glue Data Catalog connections and supply the connection name as follows. Integration with Glue Data Catalog connections allows you to use the same connection properties across multiple calls in a single Spark application or across different applications. See the following code:

DataSource = glueContext.create_dynamic_frame.from_options(connection_type = "custom.jdbc", connection_options = {"connectionName":"my-connection-snowflake"}, transformation_ctx = "DataSource0")

Secrets Manager for credentials

The Data Catalog connection can also contain a secretId corresponding to a secret stored in AWS Secrets Manager that can be used to securely gather authentication and credentials information at runtime. For more details on using a secretId on the AWS Glue Studio console, see Adding connectors to AWS Glue Studio. secretId can also be specified within the ETL script as follows.

DataSource = glueContext.create_dynamic_frame.from_options(connection_type = "custom.jdbc", connection_options = {"connectionName":"my-connection-snowflake", "secretId"-> "my-secret-id"}, transformation_ctx = "DataSource0")

Secret Id can be used to store credentials for different authentication mechanisms that your connector can support such as username/password, access keys, and OAuth.

SQL queries at source: Filtering with row predicates and column projections

AWS Glue Spark runtime allows you to push down SQL queries to filter data at source with row predicates and column projections. This allows you to load filtered data faster from data stores that support pushdowns. An example SQL query pushed down to a JDBC data source is SELECT id, name, department FROM department WHERE id < 200. Refer the following code:

DataSource = glueContext.create_dynamic_frame.from_options(connection_type = "custom.jdbc", connection_options = {"query":"SELECT id, name, department FROM department WHERE id < 200","connectionName":"my-connection-snowflake "}, transformation_ctx = "DataSource0")

Job bookmarks

AWS Glue job bookmarks allows for incremental loading of data from JDBC sources. It keeps track of the last processed record from the data store and processes new data records in subsequent AWS Glue job runs. Job bookmarks use the primary key as the default column as the bookmark key if it increases or decreases sequentially. Refer the following code that uses a transformation_ctx with job bookmarks enabled for the job:

DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "custom.jdbc", connection_options = {"connectionName":"test-connection-snowflake-jdbc"}, transformation_ctx = "DataSource0")

AWS Glue transformations

AWS Glue offers more than 35 commonly used data transformation operations with DynamicFrames and Spark DataFrames. These transforms allow you to get insights from your data and prepare it for further analytics using hundreds of available Spark SQL functions. These transformations include popular functions for schema manipulation, projecting columns, and performing joins across different data sources; transforming data with map, split, and explode; flattening nested datasets with relationalize and unnest; group and aggregate records; and run arbitrary SQL on datasets.

VPC support for networking

AWS Glue jobs allow you to securely connect to your data stores within a private VPC subnet. You can also enable a NAT (Network Address Translation) gateway within a VPC to access both VPC resources and public internet.

Building, testing, and validating your connector locally

After developing the connector for your favorite data store with the interface of your choice, you can follow the instructions to build the connector using Maven by doing a maven install. This builds the connector and installs the resulting JAR into your local Maven repository. You can now include this JAR in the class path of your IDE or AWS Glue Spark runtime downloaded from Maven.

After you build and import the JAR, you can test it locally by plugging it in AWS Glue Spark runtime and writing a validation test. We provide sample validation tests in the AWS Glue’s GitHub repository. These cover several scenarios for both local testing and validation on the AWS Glue job system. The following table lists the validation tests, the functionality they test, and the associated interfaces.

Test Name Description JDBC Spark Athena
DataSourceTest Tests connector connectivity and reading functionality. x x x
ReadWriteTest Tests reading and writing end-to-end workflow. x x x
CatalogConnectionTest Tests catalog connection integration. x x x
DataSchemaTest Tests data schema from reading with the connector. x x x
SecretsManagerTest Tests Secrets Manager integration. x x
DataSinkTest Tests connector connectivity and writing functionality x x
ColumnPartitioningTest Tests connector column partitioning functionality. x
FilterPredicateTest Tests connector filter predicate functionality. x
JDBCUrlTest Tests connector extra parameters for JDBC Url functionality. x
DbtableQueryTest Tests connector dbTable and query option functionality. x
DataTypeMappingTest Tests connector custom data type mapping functionality. x

Functionality such as AWS Glue job bookmarks that allow incremental loads can be tested on the AWS Glue job system only. We also provide a Python script to run all tests together as a suite on the AWS Glue job system.

Packaging and deploying the connector on AWS Glue

We now discuss how you can package your connector and deploy it on AWS Glue using the BYOC workflow:

  1. Package the custom connector as a JAR and upload the JAR file to an Amaon S3 bucket in your account.
  2. Follow the flow to create a custom connector referencing the JAR in Amazon S3 from AWS Glue Studio.
  3. Instantiate a connection for that connector and create an AWS Glue job using it.

For step-by-step instructions on the BYOC workflow, see Creating custom connectors.

Alternatively, we also provide the scripts and instructions for you to share the connector publicly using AWS Marketplace for a price or free. For instructions on subscribing to the connector, see Subscribing to AWS Marketplace connectors.

Using AWS Glue Studio to author a Spark application

After you create a connection for using a BYOC or AWS Marketplace – AWS Glue connector, you can follow the instructions to visually author a Spark ETL application with AWS Glue Studio. These instructions are available here for Job Authoring with custom connectors. Following are screenshots from AWS Glue Studio:

Connectors on AWS Marketplace

Connectors on AWS Marketplace

Visually author Glue jobs using connectors with AWS Glue Studio

Step 1 – Select a connector

Following are screenshots from AWS Glue Studio:

Step 2 – Visually author the job using the associated connection

Conclusion

You can use two different mechanisms to use custom connectors with AWS Glue Spark runtime and AWS Glue Studio console. In this post, we discussed the user experience for seamless discovery and subscription to custom connectors, and walked you through developing and testing your own connectors with AWS Glue Spark runtime, and deploying them into your production Apache Spark applications for ETL and analytics workloads that run on AWS Glue.

Build a custom connector yourself or try one on AWS Marketplace with AWS Glue Studio.

If you would like to partner or add a new Glue connector to AWS Marketplace, please reach out to us at [email protected]

Resources

For additional resources, see the following:


About the Authors

Bo Li is a software engineer in AWS Glue and devoted to designing and building end-to-end solutions to address customer’s data analytic and processing needs with cloud-based data-intensive technologies.

 

 

 

Yubo Xu is a Sofware Development Engineer on the AWS Glue team. His main focus is to improve the stability and efficiency of Spark runtime for AWS Glue and the easiness to connect to various data sources. Outside of work, he enjoys reading books and hiking the trails in the Bay area with his dog, Luffy, a one-year old Shiba Inu.

 

 

Xiaoxi Liu is a software engineer at AWS Glue team. Her passion is building scalable distributed systems for efficiently managing big data on cloud and her concentrations are distributed system, big data and cloud computing

 

 

Mohit Saxena is a Software Development Manager at AWS Glue. His team works on Glue’s Spark runtime to enable new customer use cases for efficiently managing data lakes on AWS and optimize Apache Spark for performance and reliability.

Performing data transformations using Snowflake and AWS Glue

Post Syndicated from Srinivas Kesanapally original https://aws.amazon.com/blogs/big-data/performing-data-transformations-using-snowflake-and-aws-glue/

In the connected world, data is getting generated from many different sources in a wide variety of data formats. Enterprises are looking for tools to ingest from these evolving data sources as well as programmatically customize the ingested data to meet their data warehousing needs. You also need solutions that help you quickly meet your business needs without provisioning any hardware or software resources, keeping costs low with the pay-as-you-use model.

AWS Glue is serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning (ML), and application development. AWS Glue provides all the capabilities needed for data integration and analyzes your data in minutes instead of weeks or months.

To further support wide variety of use cases, AWS Glue has launched a new capability at AWS re:Invent 2020 to support custom third party connectors that will help users to easily orchestrate data integration workflow visually using AWS Glue Studio in minutes with just few clicks. AWS Glue Customer Connectors help users to search and select connectors from the AWS Marketplace or bring their own connectors.  Using this new feature, users can easily connect to Snowflake with few clicks using their own Snowflake connector and start orchestrating the data pipeline in minutes.

In this post, we go over how to unify your datasets in your Amazon Simple Storage Service (Amazon S3) data lake with data in Snowflake and read and transform it using AWS Glue. Though not addressed in this post, you can also read data from Amazon S3, perform transformations on it using AWS Glue, persist it into Snowflake by customizing the generated AWS Glue script.

Solution overview

The following architecture diagram shows how AWS Glue connects to Snowflake for data preparation.

The following architecture diagram shows how AWS Glue connects to Snowflake for data preparation.

You upload the Snowflake JDBC connector JAR file into your S3 bucket and define an AWS Identity and Access Management (IAM) role that has permissions to read from this bucket, write to a destination S3 bucket, and run AWS Glue jobs. Then, you define your credentials to connect to Snowflake either in AWS Secrets Manager or define it on the AWS Glue Studio console, and create a job that can load the JAR file from your S3 bucket and connect to Snowflake to get the data and save it to the defined S3 bucket location. With the same JDBC connection, you also can read data from your S3 bucket and write to Snowflake.

Creating a custom connector

To implement this solution, you first create a custom connector.

  1. On the AWS Glue Studio console, under Connectors, choose Create custom connector.

On the AWS Glue Studio console, under Connectors, choose Create custom connector.

  1. For Connector S3 URL, enter the S3 location where you uploaded the Snowflake JDBC connector JAR file.
  2. For Name, enter a name (for this post, we enter snowflake-jdbc-connector).
  3. For Connector type, choose JDBC.
  4. For Class name, enter the Snowflake JDBC driver class name, snowflake.client.jdbc.SnowflakeDriver.
  5. For JDBC URL base, enter the following URL (provide your own account): jdbc:snowflake://<snowflake account info> /?user=${Username}&password=${Password}&warehouse=${warehouse}.
  6. For URL parameter delimiter, Enter &.
  7. Choose Create connector.

8. Choose Create connector.

Creating a connection

To create a JDBC connection to Snowflake, complete the following steps:

  1. On the Connectors page, select the connector.
  2. Choose Create connection.

Choose Create connection.

  1. For Name, enter a name, such as snowflake-glue-jdbc-connection.
  2. For Description, enter a meaningful description to identify the connection.
  3. For JDBC URL format, choose default.

You have an option to enter a user name and password or use Secrets Manager to store your encrypted credentials.

  1. For this post, for Data source credentials, select Use a secret.
  2. For Secret, choose your secret.
  3. For Additional URL parameters, provide the following parameters needed to run a SQL statement in Snowflake:
    1. warehouse – Virtual Snowflake warehouse to use to run the query. Replace {warehouse} with a valid value.
    2. db – The Snowflake database name.
    3. schema – The Snowflake database schema.
  4. Verify that the JDBC URL is well formed.

Verify that the JDBC URL is well formed.

Creating a job

You’re now ready to define the job using this connection.

  1. On the Connectors page, select your connection.
  2. Choose Create job.

Choose Create job.

  1. For Name, enter a name (for this post, we enter untitled job).
  2. For Description, enter a meaningful description for the job.
  3. For IAM Role, choose the role that has access to the target S3 location where job is writing to and the source location from where it’s loading the Snowflake JDBC JAR file and also to run the AWS Glue job (use the AWS Glue service role).
  4. Use the default options for Type, Glue version, Language, Worker type, Number of workers, Number of retries, and Job timeout.
  5. For Job bookmark, choose Disable.

For Job bookmark, choose Disable.

  1. Save the job.
  2. On the Visual tab, go to the Data Source properties-connector tab to specify the table or query to read from Snowflake.
  3. Choose Save.

Choose Save.

  1. In the Visual tab, choose the + icon to create a new S3 node for the destination.
  2. On the Node properties tab, pay close attention to choose the node as Target node.

On the Node properties tab, pay close attention to choose the node as Target node.

  1. On the Data target properties tab, define the S3 bucket location to where AWS Glue is writing the results to.

On the Data target properties tab, define the S3 bucket location to where AWS Glue is writing the results to.

  1. Add an Apply Mapping transformation to map Snowflake column name to destination column

Add an Apply Mapping transformation to map Snowflake column name to destination column

  1. Save your settings.
  2. On the Script tab, look at the script generated by AWS Glue for verification.

On the Script tab, look at the script generated by AWS Glue for verification.

  1. Run the job and validate that the table data is successfully stored in the specified S3 bucket location

In the following screenshot, I upload three records from my employee table in Snowflake into my S3 bucket.

In the following screenshot, I upload three records from my employee table in Snowflake into my S3 bucket.

The following screenshot shows that my S3 bucket has the data from Snowflake.

The following screenshot shows that my S3 bucket has the data from Snowflake.

Conclusion

In this post, you went over how AWS Glue Console integration with Snowflake has simplified the process of connecting to Snowflake and apply transformations on it without writing a single line of code and you also learnt how to define Snowflake connection parameters in AWS Glue, connect to Snowflake from AWS Glue, read from Snowflake using AWS Glue and apply transformations to meet your business needs.


About the Author

Srinivas Kesanapally  is a principal partner solution architect at AWS and has over 25 years of experience in working with database and analytics products from traditional to modern database vendors and has helped many large technology companies in designing data analytics solutions as well as led engineering teams involved in modernizing data analytic platforms.

Building AWS Glue Spark ETL jobs by bringing your own JDBC drivers for Amazon RDS

Post Syndicated from Srikanth Sopirala original https://aws.amazon.com/blogs/big-data/building-aws-glue-spark-etl-jobs-by-bringing-your-own-jdbc-drivers-for-amazon-rds/

AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easy to prepare and load your data for analytics. AWS Glue has native connectors to connect to supported data sources either on AWS or elsewhere using JDBC drivers. Additionally, AWS Glue now enables you to bring your own JDBC drivers (BYOD) to your Glue Spark ETL jobs. This feature enables you to connect to data sources with custom drivers that aren’t natively supported in AWS Glue, such as MySQL 8 and Oracle 18. You can also use multiple JDBC driver versions in the same AWS Glue job, enabling you to migrate data between source and target databases with different versions. For more information, see Connection Types and Options for ETL in AWS Glue.

This post shows how to build AWS Glue ETL Spark jobs and set up connections with custom drivers with Oracle18 and MySQL8 databases.

Solution overview

We discuss three different use cases in this post, using AWS Glue, Amazon RDS for MySQL, and Amazon RDS for Oracle.

In the following architecture, we connect to Oracle 18 using an external ojdbc7.jar driver from AWS Glue ETL, extract the data, transform it, and load the transformed data to Oracle 18.

In the following architecture, we connect to Oracle 18 using an external ojdbc7.jar driver from AWS Glue ETL, extract the data, transform it, and load the transformed data to Oracle 18.

In the second scenario, we connect to MySQL 8 using an external mysql-connector-java-8.0.19.jar driver from AWS Glue ETL, extract the data, transform it, and load the transformed data to MySQL 8.

In the second scenario, we connect to MySQL 8 using an external mysql-connector-java-8.0.19.jar driver from AWS Glue ETL, extract the data, transform it, and load the transformed data to MySQL 8.

In the third scenario, we set up a connection where we connect to Oracle 18 and MySQL 8 using external drivers from AWS Glue ETL, extract the data, transform it, and load the transformed data to Oracle 18.

In the third scenario, we set up a connection where we connect to Oracle 18 and MySQL 8 using external drivers from AWS Glue ETL, extract the data, transform it, and load the transformed data to Oracle 18.

Prerequisites

Before getting started, you must complete the following prerequisites:

  1. Create an AWS Identity and Access Management (IAM) user with sufficient permissions to interact with the AWS Management Console. Your IAM permissions must also include access to create IAM roles and policies created by the AWS CloudFormation template provided in this post.
  2. Create am IAM policy for AWS Glue.
  3. Before setting up the AWS Glue job, you need to download drivers for Oracle and MySQL, which we discuss in the next section.

Downloading drivers for Oracle and MySQL

To download the required drivers for Oracle and MySQL, complete the following steps:

  1. Download the MySQL JDBC connector.
  2. Select the operating system as platform independent and download the .tar.gz or .zip file (for example, mysql-connector-java-8.0.19.tar.gz or mysql-connector-java-8.0.19.zip) and extract it.
  3. Pick MySQL connector .jar file (such as mysql-connector-java-8.0.19.jar) and upload it into your Amazon Simple Storage Service (Amazon S3) bucket.
  4. Make a note of that path because you use it later in the AWS Glue job to point to the JDBC driver.
  5. Similarly, download the Oracle JDBC connector (ojdbc7.jar).

This post is tested for mysql-connector-java-8.0.19.jar and ojdbc7.jar drivers, but based on your database types, you can download and use appropriate version of JDBC drivers supported by the database.

  1. Upload the Oracle JDBC 7 driver to (ojdbc7.jar) to your S3 bucket.
  2. Make a note of that path, because you use it in the AWS Glue job to establish the JDBC connection with the database.
  3. Make sure to upload the three scripts (OracleBYOD.py, MySQLBYOD.py, and CrossDB_BYOD.py) in an S3 bucket.
  4. Save the following code as py in your S3 bucket.
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext, SparkConf
    from awsglue.context import GlueContext
    from awsglue.job import Job
    import time
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    
    connection_oracle18_options_source_emp = {
        "url": "jdbc:oracle:thin://@<Oracle RDS Endpoint>:1521:orcl",
        "dbtable": "employee",
        "user": "byod",
        "password": "Awsuser123",
        "customJdbcDriverS3Path": "s3://<Bucket>/<Folder>/ojdbc7.jar",
        "customJdbcDriverClassName": "oracle.jdbc.OracleDriver"}
    
    connection_oracle18_options_source_dept = {
        "url": "jdbc:oracle:thin://@<Oracle RDS Endpoint>:1521:orcl",
        "dbtable": "dept",
        "user": "byod",
        "password": "Awsuser123",
        "customJdbcDriverS3Path": "s3://<Bucket>/<Folder>/ojdbc7.jar",
        "customJdbcDriverClassName": "oracle.jdbc.OracleDriver"}
        
    connection_oracle18_options_target_emp_dept = {
        "url": "jdbc:oracle:thin://@<Oracle RDS Endpoint>:1521:orcl",
        "dbtable": "emp_dept",
        "user": "byod",
        "password": "Awsuser123",
        "customJdbcDriverS3Path": "s3://<Bucket>/<Folder>/ojdbc7.jar",
        "customJdbcDriverClassName": "oracle.jdbc.OracleDriver"}
        
    # Read DynamicFrame from Oracle 
    df_emp = glueContext.create_dynamic_frame.from_options(connection_type="oracle", connection_options=connection_oracle18_options_source_emp)
    df_emp = ApplyMapping.apply(frame = df_emp, mappings = [("employee_id", "integer", "employee_id", "integer"), ("first_name", "string", "first_name", "string"), ("last_name", "string", "last_name", "string"), ("email", "string", "email", "string"), ("phone_number", "string", "phone_number", "string"), ("hire_date", "string", "hire_date", "string"), ("job_id", "string", "job_id", "string"), ("salary", "long", "salary", "long"), ("commission_pct", "long", "commission_pct", "long"), ("manager_id", "long", "manager_id", "long"), ("department_id", "integer", "department_id", "integer")])
    df_emp = df_emp.drop_fields(['phone_number','hire_date','job_id','salary','commission_pct','manager_id'])
    df_dept = glueContext.create_dynamic_frame.from_options(connection_type="oracle", connection_options=connection_oracle18_options_source_dept)
    df_dept = ApplyMapping.apply(frame = df_dept, mappings = [("department_id", "integer", "dept_id", "integer"), ("dept_name", "string", "dept_name", "string")])
    
    
    df_emp.printSchema()
    df_dept.printSchema()
    
    df_emp_dept = Join.apply(df_emp, df_dept, 'department_id', 'dept_id')
    df_emp_dept = df_emp_dept.drop_fields(['department_id','dept_id'])
    df_emp_dept = DropNullFields.apply(frame = df_emp_dept)
    
    df_emp_dept.printSchema()
    
    # Write data to Oracle 
    glueContext.write_from_options(frame_or_dfc=df_emp_dept, connection_type="oracle", connection_options=connection_oracle18_options_target_emp_dept)

  1. Save the following code as MySQLBYOD.py in your S3 bucket.
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext, SparkConf
    from awsglue.context import GlueContext
    from awsglue.job import Job
    import time
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    
    connection_mysql8_options_source_emp = {
        "url": "jdbc:mysql://<MySQL RDS Endpoint>:3306/byod",
        "dbtable": "employee",
        "user": "MySQLadmin",
        "password": "MYSQLadmin123",
        "customJdbcDriverS3Path": "s3://<Bucket>/<Folder>/mysql-connector-java-8.0.19.jar",
        "customJdbcDriverClassName": "com.mysql.cj.jdbc.Driver"}
        
    connection_mysql8_options_source_dept = {
        "url": "jdbc:mysql://<MySQL RDS Endpoint>:3306/byod",
        "dbtable": "dept",
        "user": "MySQLadmin",
        "password": "MYSQLadmin123",
        "customJdbcDriverS3Path": "s3://<Bucket>/<Folder>/mysql-connector-java-8.0.19.jar",
        "customJdbcDriverClassName": "com.mysql.cj.jdbc.Driver"}
    
    connection_mysql8_options_target_emp_dept = {
        "url": "jdbc:mysql://<MySQL RDS Endpoint>:3306/byod",
        "dbtable": "emp_dept",
        "user": "MySQLadmin",
        "password": "MYSQLadmin123",
        "customJdbcDriverS3Path": "s3://<Bucket>/<Folder>/mysql-connector-java-8.0.19.jar",
        "customJdbcDriverClassName": "com.mysql.cj.jdbc.Driver"}
        
    
    # Read from JDBC databases with custom driver
    df_emp = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql8_options_source_emp)
    df_emp = ApplyMapping.apply(frame = df_emp, mappings = [("employee_id", "integer", "employee_id", "integer"), ("first_name", "string", "first_name", "string"), ("last_name", "string", "last_name", "string"), ("email", "string", "email", "string"), ("phone_number", "string", "phone_number", "string"), ("hire_date", "string", "hire_date", "string"), ("job_id", "string", "job_id", "string"), ("salary", "long", "salary", "long"), ("commission_pct", "long", "commission_pct", "long"), ("manager_id", "long", "manager_id", "long"), ("department_id", "integer", "department_id", "integer")])
    
    #print "Applied mapping to the Glue DynamicFrame"
    df_emp = df_emp.drop_fields(['phone_number','hire_date','job_id','salary','commission_pct','manager_id'])
    df_dept = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql8_options_source_dept)
    df_dept = ApplyMapping.apply(frame = df_dept, mappings = [("department_id", "integer", "dept_id", "integer"), ("dept_name", "string", "dept_name", "string")])
    
    df_emp.printSchema()
    df_dept.printSchema()
    
    df_emp_dept = Join.apply(df_emp, df_dept, 'department_id', 'dept_id')
    df_emp_dept = df_emp_dept.drop_fields(['department_id','dept_id'])
    df_emp_dept = DropNullFields.apply(frame = df_emp_dept)
    
    df_emp_dept.printSchema()
    
    glueContext.write_from_options(frame_or_dfc=df_emp_dept, connection_type="mysql", connection_options=connection_mysql8_options_target_emp_dept)
    

  1. Save the following code as CrossDB_BYOD.py in your S3 bucket.
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext, SparkConf
    from awsglue.context import GlueContext
    from awsglue.job import Job
    import time
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    
    connection_mysql8_options_source_emp = {
        "url": "jdbc:mysql://<MySQL RDS Endpoint>:3306/byod",
        "dbtable": "employee",
        "user": "MySQLadmin",
        "password": "MYSQLadmin123",
        "customJdbcDriverS3Path": "s3://<Bucket>/<Folder>/mysql-connector-java-8.0.19.jar",
        "customJdbcDriverClassName": "com.mysql.cj.jdbc.Driver"}
    
    connection_oracle18_options_source_dept = {
        "url": "jdbc:oracle:thin://@<Oracle RDS Endpoint>:1521:orcl",
        "dbtable": "dept",
        "user": "byod",
        "password": "Awsuser123",
        "customJdbcDriverS3Path": "s3://<Bucket>/<Folder>/ojdbc7.jar",
        "customJdbcDriverClassName": "oracle.jdbc.OracleDriver"}
    
    connection_oracle18_options_target_emp_dept = {
        "url": "jdbc:oracle:thin://@<Oracle RDS Endpoint>:1521:orcl",
        "dbtable": "emp_dept",
        "user": "byod",
        "password": "Awsuser123",
        "customJdbcDriverS3Path": "s3://<Bucket>/<Folder>/ojdbc7.jar",
        "customJdbcDriverClassName": "oracle.jdbc.OracleDriver"}
    
    # Read DynamicFrame from Oracle
    df_emp = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql8_options_source_emp)
    df_emp = ApplyMapping.apply(frame = df_emp, mappings = [("employee_id", "integer", "employee_id", "integer"), ("first_name", "string", "first_name", "string"), ("last_name", "string", "last_name", "string"), ("email", "string", "email", "string"), ("phone_number", "string", "phone_number", "string"), ("hire_date", "string", "hire_date", "string"), ("job_id", "string", "job_id", "string"), ("salary", "long", "salary", "long"), ("commission_pct", "long", "commission_pct", "long"), ("manager_id", "long", "manager_id", "long"), ("department_id", "integer", "department_id", "integer")])
    df_emp = df_emp.drop_fields(['phone_number','hire_date','job_id','salary','commission_pct','manager_id'])
    df_dept = glueContext.create_dynamic_frame.from_options(connection_type="oracle", connection_options=connection_oracle18_options_source_dept)
    df_dept = ApplyMapping.apply(frame = df_dept, mappings = [("department_id", "integer", "dept_id", "integer"), ("dept_name", "string", "dept_name", "string")])
    
    
    
    df_emp.printSchema()
    df_dept.printSchema()
    
    df_emp_dept = Join.apply(df_emp, df_dept, 'department_id', 'dept_id')
    df_emp_dept = df_emp_dept.drop_fields(['department_id','dept_id'])
    df_emp_dept = DropNullFields.apply(frame = df_emp_dept)
    
    df_emp_dept.printSchema()
    
    # Write data to Oracle
    glueContext.write_from_options(frame_or_dfc=df_emp_dept, connection_type="oracle", connection_options=connection_oracle18_options_target_emp_dept)
    
    

Provisioning resources with AWS CloudFormation

The generic workflow of setting up a connection with your own custom JDBC drivers involves various steps. It’s a manual configuration that is error prone and adds overhead when repeating the steps between environments and accounts. With AWS CloudFormation, you can provision your application resources in a safe, repeatable manner, allowing you to build and rebuild your infrastructure and applications without having to perform manual actions or write custom scripts. The declarative code in the file captures the intended state of the resources to create, and allows you to automate the creation of AWS resources.

We provide this CloudFormation template for you to use. Review and customize it to suit your needs. Some of the resources deployed by this stack incur costs as long as they remain in use, like Amazon RDS for Oracle and Amazon RDS for MySQL.

This CloudFormation template creates the following resources:

  • A VPC
  • Two subnets
  • A route table
  • An internet gateway
  • A MySQL 8 database
  • An Oracle 18 database

To provision your resources, complete the following steps:

  1. Sign in to the console.
  2. Choose the us-east-1 Region in which to create the stack.
  3. Choose Next.
  4. Choose Launch Stack:

This step automatically launches AWS CloudFormation in your AWS account with a template. It prompts you to sign in as needed. You can view the CloudFormation template from within the console as required.

  1. For Stack name, enter a name.
  2. Change the other parameters as needed or keep the following default values:
    1. Oracle user nameoraadmin
    2. Oracle passwordoraadmin123
    3. MySQL usernameMySQLadmin
    4. MySQL passwordMYSQLadmin123

Change the other parameters as needed or keep the following default values:

  1. Choose Next.
  2. Choose Next
  3. Review the details and choose Create.

This stack creation can take up to 20 minutes.

After the stack creation is complete, go to the Outputs tab on the AWS CloudFormation console and note the following values (you use these in later steps):

  • MySQLJDBCConnectionString
  • OracleJDBCConnectionString

Configuring an AWS Glue ETL job using your own drivers

Before creating an AWS Glue ETL, run the SQL script (database_scripts.sql) on both the databases (Oracle and MySQL) to create tables and insert data. For more information about connecting to the RDS DB instance, see How can I troubleshoot connectivity to an Amazon RDS DB instance that uses a public or private subnet of a VPC?

To set up AWS Glue connections, complete the following steps:

  1. On the AWS Glue console, under Databases, choose Connections.
  2. Choose Add Connection.
  3. For Connection Name, enter a name for your connection.
  4. For Connection Type, choose JDBC.
  5. For JDBC URL, enter a URL, such as jdbc:oracle:thin://@<hostname>:1521/ORCL for Oracle or jdbc:mysql://<hostname>:3306/mysql for MySQL.
  6. Enter the user name and password for the database.
  7. Select the VPC in which you created the RDS instance (Oracle and MySQL).
  8. Choose the subnet within your VPC. Refer to the CloudFormation stack Outputs tab for the subnet name.
  9. Choose the security group of the database. Refer to the CloudFormation stack Outputs tab for security group name.
  10. Choose Next.
  11. Check the connection details and choose Finish.

Make sure to add a connection for both databases (Oracle and MySQL).

Creating endpoints and a security group

Before testing the connection, make sure you create an AWS Glue endpoint and S3 endpoint in the VPC in which databases are created. Complete the following steps for both Oracle and MySQL instances:

  1. To create your AWS Glue endpoint, on the Amazon VPC console, choose Endpoints.
  2. Choose Create endpoint.
  3. For Service Names, choose AWS Glue.
  4. Choose amazonaws.<region>.glue (for example, com.amazonaws.us-west-2.glue).
  5. Choose the VPC of the RDS for Oracle or RDS for MySQL
  6. Choose the security group of the RDS instances.
  7. Choose Create endpoint.

To create your S3 endpoint, you use Amazon Virtual Private Cloud (Amazon VPC).

  1. On the Amazon VPC console, choose Endpoints.
  2. Choose Create endpoint.
  3. For Service Names, choose Amazon S3.
  4. Choose amazonaws.<region>.s3 (for example, com.amazonaws.us-west-2.s3).
  5. Choose the VPC of the RDS for Oracle or RDS for MySQL
  6. Choose the route table ID.
  7. Choose Create endpoint.

The RDS for Oracle or RDS for MySQL security group must include itself as a source in its inbound rules.

  1. On the Security Groups page, choose Edit Inbound Rules.
  2. Choose Add rule.
  3. For Type, choose All Traffic Type, for example glue-byod-stack1….
  4. For Source, choose the same security group.
  5. Choose Save Rules.

If both the databases are in the same VPC and subnet, you don’t need to create a connection for MySQL and Oracle databases separately. The reason for setting an AWS Glue connection to the databases is to establish a private connection between the RDS instances in the VPC and AWS Glue via S3 endpoint, AWS Glue endpoint, and Amazon RDS security group. It’s not required to test JDBC connection because that connection is established by the AWS Glue job when you run it. If you test the connection with MySQL8, it fails because the AWS Glue connection doesn’t support the MySQL 8.0 driver at the time of writing this post, therefore you need to bring your own driver.

Setting up AWS Glue ETL jobs

You’re now ready to set up your ETL job in AWS Glue. Complete the following steps for both connections:

  1. Edit the following parameters in the scripts (OracleBYOD.py, MySQLBYOD.py, and CrossDB_BYOD.py) and upload them in Amazon S3:
    1. url
    2. user
    3. password
    4. customJdbcDriverS3Path for sources and target tables

You can find the database endpoints (url) on the CloudFormation stack Outputs tab; the other parameters are mentioned earlier in this post. If you use another driver, make sure to change customJdbcDriverClassName to the corresponding class in the driver.

Alternatively, you can pass on this as AWS Glue job parameters and retrieve the arguments that are passed using the getResolvedOptions.

  1. On the AWS Glue console, under ETL, choose Jobs.
  2. Choose Add Job.
  3. For Job Name, enter a name.
  4. For IAM role, choose the IAM role you created as a prerequisite.
  5. For Type, choose Spark.
  6. For Glue Version, choose Python (latest version).
  7. For This job runs, choose An existing script that you provide.
  8. Choose the Amazon S3 path where the script (OracleBYOD.py, MySQLBYOD.py, or CrossDB_BYOD.py) is stored.
  9. Under Advanced properties, enable Job bookmark.

Job bookmarks help AWS Glue maintain state information and prevent the reprocessing of old data.

  1. Keep the remaining settings as their defaults and choose
  2. For Connections, choose the Amazon RDS for Oracle connection for OracleBYOD.py, Amazon RDS for MySQL connection for MySQLBYOD.py, or Amazon RDS for Oracle and Amazon RDS for MySQL connection for CrossDB_BYOD.py.
  3. Choose Save job and edit scripts.
  4. Choose Run Job.
  5. When the job is complete, validate the data loaded in the target table.

Cleaning up

After you finish, don’t forget to delete the CloudFormation stack, because some of the AWS resources deployed by the stack in this post incur a cost as long as you continue to use them.

You can delete the CloudFormation stack to delete all AWS resources created by the stack.

  1. On the AWS CloudFormation console, on the Stacks page, select the stack to delete. The stack must be currently running.
  2. In the stack details pane, choose Delete.
  3. Choose Delete stack when prompted.

Summary

In this post, we showed you how to build AWS Glue ETL Spark jobs and set up connections with custom drivers with Oracle18 and MySQL8 databases using AWS CloudFormation. You can use this solution to use your custom drivers for databases not supported natively by AWS Glue.

If you have any questions or suggestions, please leave a comment.


About the Authors

Srikanth Sopirala is a Sr. Analytics Specialist Solutions Architect at AWS. He is a seasoned leader with over 20 years of experience, who is passionate about helping customers build scalable data and analytics solutions to gain timely insights and make critical business decisions. In his spare time, he enjoys reading, spending time with his family and road biking.

 

 

Naresh Gautam is a Sr. Analytics Specialist Solutions Architect at AWS. His role is helping customers architect highly available, high-performance, and cost-effective data analytics solutions to empower customers with data-driven decision-making. In his free time, he enjoys meditation and cooking.

 

 

Building fast ETL using SingleStore and AWS Glue

Post Syndicated from Saurabh Shanbhag original https://aws.amazon.com/blogs/big-data/building-fast-etl-using-singlestore-and-aws-glue/

Disparate data systems have become a norm in many companies. The reasons for this vary: different teams in the organization select data system best suited for its primary function, the responsibility for choosing these data systems may have been decentralized across different departments, a merged company may still use separate data systems from the formerly individual companies, and many more. Data Integration combines data from these disparate data sources and helps users throughout the organization to fully leverage the inherent value in the data to gain meaningful and valuable insights. AWS Glue is a fully managed serverless data integration service that makes it easy to extract, transform, and load (ETL) from various data sources for analytics and data processing with Apache Spark ETL jobs. AWS Glue Spark runtime supports connectivity to popular data sources such as Amazon Simple Storage Service (Amazon S3), Amazon Relational Database Service (Amazon RDS), Amazon DynamoDB, Amazon Redshift, and Apache Kafka.

We recognize the existence of disparate data systems that best fit your application needs. AWS Glue custom connectors in AWS Glue Studio extends AWS Glue support for data sources beyond the native connection types. Now you can discover and subscribe to AWS Glue ETL connectors from AWS Marketplace from data sources that best fit your needs. AWS Partner SingleStore provides a relational SQL database that can handle both transactional and analytical workloads in a single system. New applications that need to combine transactional and analytical (HTAP—hybrid transaction analytical processing) requirements can take advantage of SingleStore DB. SingleStore provides a SingleStore connector for AWS Glue based on Apache Spark Datasource through AWS Marketplace. The fully managed, scale-out Apache Spark environment for ETL jobs provided by AWS Glue matches well to SingleStore’s distributed SQL design.

This post shows how you can use AWS Glue custom connector from AWS Marketplace based on Apache Spark Datasource in AWS Glue Studio to create ETL jobs in minutes using an easy-to-use graphical interface.

The following architecture diagram shows SingleStore connecting with AWS Glue for an ETL job.

The following architecture diagram shows SingleStore connecting with AWS Glue for an ETL job.

Now you can easily subscribe to the SingleStore connector on AWS Marketplace and create a connection to your SingleStore cluster. VPC networking and integration with AWS Secrets Manager for authentication credentials are supported for the connection.

Walkthrough overview

In this post, we demonstrate how to connect a SingleStore cluster in an AWS Glue ETL job as the source, transform the data, and store it back on a SingleStore cluster and in Apache Parquet format on Amazon S3. We use the TPC-H Benchmark dataset that is available as a sample dataset in SingleStore.

To successfully create the ETL job using a custom ETL connector from AWS Marketplace, you complete the following steps:

  1. Store authentication credentials in Secrets Manager.
  2. Create an AWS Identity and Access Management (IAM) role for the AWS Glue ETL job.
  3. Configure the SingleStore connector and connection.
  4. Create an ETL job using the SingleStore connection in AWS Glue Studio.

Storing authentication credentials in Secrets Manager

AWS Glue provides integration with Secrets Manager to securely store connection authentication credentials. Follow these steps to create these credentials:

  1. On the Secrets Manager console, choose Store a new secret.
  2. For Select a secret type, select Other type of secrets.
  3. For Secret key/value, set one row for each of the following parameters:
    1. ddlEndpoint
    2. database
    3. user
    4. password
  4. Choose Next.

Choose Next.

  1. For Secret name, enter aws-glue-singlestore-connection-info.
  2. Choose Next.
  3. Keep the Disable automatic rotation check box selected.
  4. Choose Next.
  5. Choose Store.

Creating an IAM role for the AWS Glue ETL job

In this section, you create a role with an attached policy to allow read-only access to credentials that are stored in Secrets Manager for the AWS Glue ETL job.

  1. On the IAM console, choose Policies.
  2. Choose Create policy.
  3. On the JSON tab, enter the following JSON snippet, providing your Region and account ID:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "VisualEditor0",
                "Effect": "Allow",
                "Action": [
                    "secretsmanager:GetSecretValue",
                    "secretsmanager:DescribeSecret"
                ],
                "Resource": "arn:aws:secretsmanager:<REGION>:<ACCOUNT_ID>:secret:aws-glue-*"
            }
        ]
    }

  4. Choose Review Policy.
  5. Give your policy a name, for example, GlueAccessSecretValue.
  6. In the navigation pane, choose Roles.
  7. Choose Create role.
  8. For Select type of trusted entity, choose AWS service.
  9. Choose Glue.

Choose Glue.

  1. Choose Next: Permissions.
  2. Search for the AWS managed policies AWSGlueServiceRole and AmazonEC2ContainerRegistryReadOnly policy, and select them.
  3. Search for GlueAccessSecretValue policy created before, and select it.
  4. For Role name, enter a name, for example, GlueCustomETLConnectionRole.
  5. Confirm the three policies are selected.

Confirm the three policies are selected.

Configuring your SingleStore connector and connection

To connect to SingleStore, complete the following steps:

  1. On the AWS Glue console, choose AWS Glue Studio.
  2. Choose Connectors.
  3. Choose Go to AWS Marketplace.

Choose Go to AWS Marketplace.

  1. Subscribe to the SingleStore connector for AWS Glue from AWS Marketplace.
  2. Activate the connector from AWS Glue Studio.
  3. In the navigation pane, under Connectors, choose Create connection.
  4. For name, enter a name, such as SingleStore_connection.
  5. For AWS Secret, choose the AWS secret value aws-glue-singlestore-connection-info created before.

For AWS Secret, choose the AWS secret value aws-glue-singlestore-connection-info created before.

  1. Choose Create connection.

Creating an ETL job using the SingleStore connection in AWS Glue Studio

After you define the SingleStore connection, you can start authoring the job using this connection.

  1. On the AWS Glue Studio console, choose Connectors.
  2. Select your connector and choose Create job.

Select your connector and choose Create job.

An untitled job is created with the connection as the source node.

  1. On the Job details page, for Name, enter SingleStore_tpch_transform_job.
  2. For Description, enter Glue job to transform tpch data from SingleStore DB.
  3. For IAM Role, choose GlueCustomETLConnectionRole.
  4. Keep the other properties at their default.

Keep the other properties at their default.

  1. On the Visual page, on the Data source properties – Connector tab, expand Connection options.
  2. For Key, enter dbtable.
  3. For Value, enter lineitem.

For Value, enter lineitem.

Because AWS Glue Studio is using information stored in the connection to access the data source instead of retrieving metadata information from a Data Catalog table, you must provide the schema metadata for the data source. Use the schema editor to update the source schema. For instructions on how to use the schema editor, see Editing the schema in a custom transform node.

Use the schema editor to update the source schema.

  1. Choose the + icon.
  2. For Node type, choose DropFields.
  3. On the Transform tab, select the fields to drop.

On the Transform tab, select the fields to drop.

  1. Choose the + icon.
  2. For Node type, choose Custom Transform.
  3. On the Transform tab, add to the custom script.

For this post, we calculate two additional columns, disc_price and price. Then we use glueContext.write_dynamic_frame to write the updated data back on SingleStore using the connection SingleStore_connection we created. See the following code:

def MyTransform (glueContext, dfc) -> DynamicFrameCollection:
    from pyspark.sql.functions import col
    
    df = dfc.select(list(dfc.keys())[0]).toDF()
    df1 = df.withColumn("disc_price",(col("l_extendedprice")*(1-col("l_discount"))).cast("decimal(10,2)"))
    df2 = df1.withColumn("price", (col("disc_price")*(1+col("l_tax"))).cast("decimal(10,2)"))
    dyf = DynamicFrame.fromDF(df2, glueContext, "updated_lineitem")
    
    glueContext.write_dynamic_frame.from_options(frame = dyf, 
     connection_type = "marketplace.spark", 
     connection_options = {"dbtable":"updated_lineitem","connectionName":"SingleStore_connection"})
    
    return(DynamicFrameCollection({"CustomTransform0": dyf}, glueContext))

On the Transform tab, add to the custom script.

  1. On the Output schema tab, add the additional columns price and disc_price created in the custom script.

On the Output schema tab, add the additional columns price and disc_price created in the custom scrip

  1. Keep the default for node SelectFromCollection.
  2. Choose the icon.
  3. For Node type, choose Data Target – S3.
  4. On the Data target properties – S3 tab, for Format, choose Parquet.
  5. For S3 Target Location, enter s3://aws-glue-assets-{Your Account ID as a 12-digit number}-{AWS region}/output/.

On the Data target properties – S3 tab, for Format, choose Parquet.

  1. Choose Save.
  2. Choose Run.

In the following screenshot, a new table updated_lineitem is created with the two additional columns disc_price and price.

In the following screenshot, a new table updated_lineitem is created with the two additional columns disc_price and price.

Conclusion

In this post, you learned how to subscribe to the SingleStore connector for AWS Glue from AWS Marketplace, activate the connector from AWS Glue Studio, and create an ETL job in AWS Glue Studio that uses a SingleStore connector as the source and target using custom transform. You can use AWS Glue Studio to speed up the ETL job creation process, use connectors from AWS Marketplace, or bring in your own custom connectors, and allow different personas to transform data without any previous coding experience.


About the Author

Saurabh ShanbhagSaurabh Shanbhag is a Partner Solutions Architect at AWS and has over 12 years of experience in working with data integration and analytics products. He focuses on enabling partners to build and enhance joint solutions on AWS.

Migrating data from Google BigQuery to Amazon S3 using AWS Glue custom connectors

Post Syndicated from Saurabh Bhutyani original https://aws.amazon.com/blogs/big-data/migrating-data-from-google-bigquery-to-amazon-s3-using-aws-glue-custom-connectors/

In today’s connected world, it’s common to have data sitting in various data sources in a variety of formats. Even though data is a critical component of decision making, for many organizations this data is spread across multiple public clouds. Organizations are looking for tools that make it easy to ingest data from these myriad data sources and be able to customize the data ingestion to meet their needs.

AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easy to prepare and load your data for analytics. AWS Glue provides all the capabilities needed for data integration and analysis can be done in minutes instead of weeks or months. AWS Glue custom connectors, a new capability in AWS Glue and AWS Glue Studio that makes it easy for you to transfer data from SaaS applications and custom data sources to your data lake in Amazon S3. With just a few clicks, you can search and select connectors from the AWS Marketplace and begin your data preparation workflow in minutes. You can also build custom connectors and share them across teams, and integrate open source Spark connectors and Athena federated query connectors into you data preparation workflows. AWS Glue Connector for Google BigQuery allows migrating data cross-cloud from Google BigQuery to Amazon Simple Storage Service (Amazon S3). AWS Glue Studio is a new graphical interface that makes it easy to create, run, and monitor extract, transform, and load (ETL) jobs in AWS Glue. You can visually compose data transformation workflows and seamlessly run them on AWS Glue’s Apache Spark-based serverless ETL engine.

In this post, we focus on using AWS Glue Studio to query BigQuery tables and save the data into Amazon Simple Storage Service (Amazon S3) in Parquet format, and then query it using Amazon Athena. To query BigQuery tables in AWS Glue, we use the new AWS Glue Connector for Google BigQuery from AWS Marketplace.

Solution Overview:

The following architecture diagram shows how AWS Glue connects to Google BigQuery for data ingestion.

The following architecture diagram shows how AWS Glue connects to Google BigQuery for data ingestion.

Prerequisites

Before getting started, make sure you have the following:

  • An account in Google Cloud, specifically a service account that has permissions to Google BigQuery
  • An AWS Identity and Access Management (IAM) user with an access key and secret key to configure the AWS Command Line Interface (AWS CLI)
    • The IAM user also needs permissions to create an IAM role and policies

Configuring your Google account

We create a secret in AWS Secrets Manager to store the Google service account file contents as a base64-encoded string.

  1. Download the service account credentials JSON file from Google Cloud.

For base64 encoding, you can use one of the online utilities or system commands to do that. For Linux and Mac, you can use base64 <<service_account_json_file>> to print the file contents as a base64-encoded string.

  1. On the Secrets Manager console, choose Store a new secret.
  2. For Secret type, select Other type of secret.
  3. Enter your key as credentials and the value as the base64-encoded string.
  4. Leave the rest of the options at their default.
  5. Choose Next.

Choose Next.

  1. Give a name to the secret bigquery_credentials.
  2. Follow through the rest of the steps to store the secret.

For more information, see Tutorial: Creating and retrieving a secret.

Creating an IAM role for AWS Glue

The next step is to create an IAM role with the necessary permissions for the AWS Glue job. Attach the following AWS managed policies to the role:

Create and attach a policy to allow reading the secret from Secrets Manager and write access to the S3 bucket.

The following sample policy demonstrates the AWS Glue job as part of this post. Always make sure to scope down the policies before using in a production environment. Provide your secret ARN for the bigquery_credentials secret you created earlier and the S3 bucket for saving data from BigQuery:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "GetDescribeSecret",
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetResourcePolicy",
                "secretsmanager:GetSecretValue",
                "secretsmanager:DescribeSecret",
                "secretsmanager:ListSecretVersionIds"
            ],
            "Resource": "arn:aws:secretsmanager::<<account_id>>:secret:<<your_secret_id>>"
        },
        {
            "Sid": "S3Policy",
            "Effect": "Allow",
            "Action": [
                "s3:GetBucketLocation",
                "s3:ListBucket",
                "s3:GetBucketAcl",
                "s3:GetObject",
                "s3:PutObject",
                "s3:DeleteObject"
            ],
            "Resource": [
                "arn:aws:s3:::<<your_s3_bucket>>",
                "arn:aws:s3:::<<your_s3_bucket>>/*"
            ]
        }
    ]
}

Subscribing to the Glue Connector for BigQuery

To subscribe to the connector, complete the following steps:

  1. Navigate to the AWS Glue Connector for Google BigQuery on AWS Marketplace.
  2. Choose Continue to Subscribe.

Choose Continue to Subscribe.

  1. Review the terms and conditions, pricing, and other details.
  2. Choose Continue to Configuration.
  3. For Delivery Method, choose your delivery method.
  4. For Software Version, choose your software version.
  5. Choose Continue to Launch.

7. Choose Continue to Launch.

  1. Under Usage instructions, choose Activate the Glue connector in AWS Glue Studio.

Under Usage instructions, choose Activate the Glue connector in AWS Glue Studio.

You’re redirected to AWS Glue Studio.

  1. For Name, enter a name for your connection (for example, bigquery).

For Name, enter a name for your connection (for example, bigquery).

  1. Optionally, choose a VPC, subnet, and security group.
  2. For AWS Secret, choose bigquery_credentials.
  3. Choose Create connection.

A message appears that the connection was successfully created, and the connection is now visible on the AWS Glue Studio console.

Creating the ETL job in AWS Glue Studio

  1. On Glue Studio, choose Jobs.
  2. For Source, choose BigQuery.
  3. For Target, choose S3.
  4. Choose Create.

Choose Create.

  1. Choose ApplyMapping and delete it.
  2. Choose BigQuery.
  3. For Connection, choose bigguery.
  4. Expand Connection options.
  5. Choose Add new option.

Choose Add new option.

  1. Add following Key/Value.
    1. Key: parentProject, Value: <<google_project_name>>
    2. Key: table, Value: bigquery-public-data.covid19_open_data.covid19_open_data

Add following Key/Value.

  1. Choose S3 bucket.
  2. Choose format and Compression Type.
  3. Specify S3 Target Location.

Specify S3 Target Location.

  1. Choose Job details.
  2. For Name, enter BigQuery_S3.
  3. For IAM Role, choose the role you created.
  4. For Type, choose Spark.
  5. For Glue version, choose Glue 2.0 – Supports Spark 2.4, Scala 2, Python3.
  6. Leave rest of the options as defaults.
  7. Choose Save.

Choose Save.

  1. To run the job, choose the Run Job button.

To run the job, choose the Run Job button.

  1. Once the job run succeeds, check the S3 bucket for data.

Once the job run succeeds, check the S3 bucket for data.

In this job, we use the connector to read data from the Big Query public dataset for COVID-19. For more information, see Apache Spark SQL connector for Google BigQuery (Beta) on GitHub.

The code reads the covid19 table in an AWS Glue dynamic DataFrame and writes the data to Amazon S3.

Querying the data

You can now use the Glue Crawlers to crawl the data in S3 bucket. It will create a table covid. You can now go to Athena and query this data. The following screenshot shows our query results.

The following screenshot shows our query results.

Pricing considerations

There might be egress charges for migrating data out of Google BigQuery into Amazon S3. Review and calculate the cost for moving data into Amazon S3.

AWS Glue 2.0 charges $0.44 per DPU-hour, billed per second, with a 1-minute minimum for Spark ETL jobs. An Apache Spark job run in AWS Glue requires a minimum of 2 DPUs. By default, AWS Glue allocates 10 DPUs to each Apache Spark job. Modify the number of workers based on your job requirements. For more information, see AWS Glue pricing.

Conclusion

In this post, we learned how to easily use AWS Glue ETL to connect to BigQuery tables and migrate the data into Amazon S3, and then query the data immediately with Athena. With AWS Glue, you can significantly reduce the cost, complexity, and time spent creating ETL jobs. AWS Glue is serverless, so there is no infrastructure to set up or manage. You pay only for the resources consumed while your jobs are running.

For more information about AWS Glue ETL jobs, see Simplify data pipelines with AWS Glue automatic code generation and workflows and Making ETL easier with AWS Glue Studio.


About the Author

Saurabh Bhutyani is a Senior Big Data Specialist Solutions Architect at Amazon Web Services. He is an early adopter of open-source big data technologies. At AWS, he works with customers to provide architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation. In his free time, he likes to watch movies and spend time with his family.

Building AWS Glue Spark ETL jobs using Amazon DocumentDB (with MongoDB compatibility) and MongoDB

Post Syndicated from Naresh Gautam original https://aws.amazon.com/blogs/big-data/building-aws-glue-spark-etl-jobs-using-amazon-documentdb-with-mongodb-compatibility-and-mongodb/

AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easy to prepare and load your data for analytics. AWS Glue has native connectors to connect to supported data sources on AWS or elsewhere using JDBC drivers. Additionally, AWS Glue now supports reading and writing to Amazon DocumentDB (with MongoDB compatibility) and MongoDB collections using AWS Glue Spark ETL jobs. This feature enables you to connect and read, transform, and load (write) data from and to Amazon DocumentDB and MongoDB collections into services such as Amazon Simple Storage Service (Amazon S3) and Amazon Redshift for downstream analytics. For more information, see Connection Types and Options for ETL in AWS Glue.

This post shows how to build AWS Glue ETL Spark jobs and set up connections with Amazon DocumentDB or MongoDB to read and load data using ConnectionType. The following diagram illustrates the three components of the solution architecture:

The following diagram illustrates the three components of the solution architecture:

Prerequisites

Before getting started, you must complete the following prerequisites:

  1. Create an AWS Identity and Access Management (IAM) user with sufficient permissions to interact with the AWS Management Console. Your IAM permissions must also include access to create IAM roles and policies created by the AWS CloudFormation template provided in this post.
  2. Create an IAM policy for AWS Glue.
  3. Save the following code as DocumentDB-Glue-ETL.py in your S3 bucket.
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext, SparkConf
    from awsglue.context import GlueContext
    from awsglue.job import Job
    import time
    
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    
    output_path = "s3://<bucket>/<folder>/" + str(time.time()) + "/"
    documentdb_uri = "mongodb://<host name>:27017"
    documentdb_write_uri = "mongodb://<host name>:27017"
    
    read_docdb_options = {
        "uri": documentdb_uri,
        "database": "test",
        "collection": "profiles",
        "username": "<username>",
        "password": "<password>",
        "ssl": "true",
        "ssl.domain_match": "false",
        "partitioner": "MongoSamplePartitioner",
        "partitionerOptions.partitionSizeMB": "10",
        "partitionerOptions.partitionKey": "_id"
    }
    
    write_documentdb_options = {
        "uri": documentdb_write_uri,
        "database": "test",
        "collection": "collection1",
        "username": "<username>",
        "password": "<password>",
        "ssl": "true",
        "ssl.domain_match": "false",
        "partitioner": "MongoSamplePartitioner",
        "partitionerOptions.partitionSizeMB": "10",
        "partitionerOptions.partitionKey": "_id"
    }
    
    # Get DynamicFrame from DocumentDB
    dynamic_frame2 = glueContext.create_dynamic_frame.from_options(connection_type="documentdb",
                                                                   connection_options=read_docdb_options)
    
    # Write DynamicFrame to DocumentDB
    glueContext.write_dynamic_frame.from_options(dynamic_frame2, connection_type="documentdb",
                                                 connection_options=write_documentdb_options)
    
    job.commit()

  1. Save the following code as MongoDB-Glue-ETL.py in your S3 bucket.
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext, SparkConf
    from awsglue.context import GlueContext
    from awsglue.job import Job
    import time
    
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    
    output_path = "s3://<bucket>/<folder>/" + str(time.time()) + "/"
    mongo_uri = "mongodb://<host name or IP>:27017"
    write_uri = "mongodb://<host name or IP>:27017"
    
    read_mongo_options = {
        "uri": mongo_uri,
        "database": "test",
        "collection": "profiles",
        "username": "<username>",
        "password": "<password>",
        "partitioner": "MongoSamplePartitioner",
        "partitionerOptions.partitionSizeMB": "10",
        "partitionerOptions.partitionKey": "_id"}
    
    write_mongo_options = {
        "uri": write_uri,
        "database": "test",
        "collection": "collection1",
        "username": "<username>",
        "password": "<password>"
    }
    
    
    # Get DynamicFrame from MongoDB
    dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="mongodb",
                                                                  connection_options=read_mongo_options)
    # Write DynamicFrame to MongoDB 
    glueContext.write_dynamic_frame.from_options(dynamic_frame, connection_type="mongodb", connection_options=write_mongo_options)
    
    job.commit()

Provisioning resources with AWS CloudFormation

For this post, we provide CloudFormation templates for you to review and customize to your needs. Some of the resources deployed by this stack incur costs as long as they remain in use, such as Amazon DocumentDB and Amazon EC2.

For instructions on launching your stacks, see Launching an Amazon DocumentDB AWS CloudFormation Stack and MongoDB on the AWS Cloud: Quick Start Reference Deployment.

The Amazon DocumentDB stack creation can take up to 15 minutes, and MongoDB stack creation can take up 60 minutes.

When stack creation is complete, go to the Outputs tab for the stack on the AWS CloudFormation console and note down the following values (you use these in later steps):

  • DocumentDB CloudFormation – ClusterEndpoint and ClusterPort
  • MongoDB CloudFormation – PrimaryReplicaNodeIp

Preparing your collection

When the CloudFormation stack is complete, use an EC2 instance to connect to your Amazon DocumentDB cluster. For instructions, see Install the mongo shell, Connect to your Amazon DocumentDB cluster, and Insert and query data.

For instructions on accessing Amazon DocumentDB from Amazon EC2 in the same VPC, see Connect Using Amazon EC2.

For more information about MongoDB, see Connect to MongoDB nodes and Testing MongoDB.

Before creating your AWS Glue ETL job, use the mongo shell to insert a few entries into a collection titled profiles. See the following code:

s0:PRIMARY> use test
s0:PRIMARY> db.profiles.insertMany([
            { "_id" : 1, "name" : "Matt", "status": "active", "level": 12, "score":202},
            { "_id" : 2, "name" : "Frank", "status": "inactive", "level": 2, "score":9},
            { "_id" : 3, "name" : "Karen", "status": "active", "level": 7, "score":87},
            { "_id" : 4, "name" : "Katie", "status": "active", "level": 3, "score":27}
            ])

You’re now ready to configure AWS Glue ETL jobs using Amazon DocumentDB and MongoDB ConnectionType.

Setting up AWS Glue connections

You set up two separate connections for Amazon DocumentDB and MongoDB when the databases are in two different VPCs (or if you deployed the databases using the provided CloudFormation template). Complete the following steps for both connections. We first walk you through the Amazon DocumentDB connection.

  1. On the AWS Glue console, under Databases, choose Connections.
  2. Choose Add connection.
  3. For Connection name, enter a name for your connection.
  4. If you have SSL enabled on your Amazon DocumentDB cluster (which is what the CloudFormation template in this post used), select Require SSL connection.
  5. For Connection Type, choose Amazon DocumentDB or MongoDB.
  6. Choose Next.

Choose Next.

  1. For Amazon DocumentDB URL, enter a URL using the output from the CloudFormation stack, such as mongodb://host:port/databasename (use the default port, 27017).
  2. For Username and Password, enter the credentials you entered as parameters when creating the CloudFormation stack.
  3. For VPC, choose the VPC in which you created databases (Amazon DocumentDB and MongoDB).
  4. For Subnet, choose the subnet within your VPC.
  5. For Security groups, select your security group.
  6. Choose Next.

Choose Next.

  1. Review the connection details and choose Finish.

Review the connection details and choose Finish.

Similarly, add the connection for MongoDB with the following changes to the steps:

  • If you used the CloudFormation template in this post, don’t select Require SSL connection for MongoDB
  • For Connection Type, choose MongoDB
  • For MongoDB URL, enter a URL using the output from the CloudFormation stack, such as mongodb://host:port/databasename (use the default port, 27017)

Creating an AWS Glue endpoint, S3 endpoint, and security group

Before testing the connections, make sure you create an AWS Glue endpoint and S3 endpoint in the VPC in which the databases are created. Complete the following steps for both Amazon DocumentDB and MongoDB instances separately:

  1. To create your AWS Glue endpoint, on the Amazon VPC console, choose Endpoints.
  2. Choose Create endpoint.
  3. For Service Name, choose AWS Glue.
  4. Search for and select com.amazonaws.<region>.glue (for example, com.amazonaws.us-west-2.glue). Enter the appropriate Region where the database instance was created.
  5. For VPC, choose the VPC of the Amazon DocumentDB

For VPC, choose the VPC of the Amazon DocumentDB

  1. For Security group, select the security groups of the Amazon DocumentDB cluster.
  2. Choose Create endpoint.

Choose Create endpoint.

  1. To create your S3 endpoint, on the Amazon VPC console, choose Endpoints.
  2. Choose Create endpoint.
  3. For Service Name, choose Amazon S3.
  4. Search for and select com.amazonaws.<region>.s3 (for example, com.amazonaws.us-west-2.s3). Enter the appropriate Region.
  5. For VPC, choose the VPC of the Amazon DocumentDB
  6. For Configure route tables, select the route table ID of the associated subnet of the database.

13. For Configure route tables, select the route table ID of the associated subnet of the database.

  1. Choose Create endpoint.

Choose Create endpoint.

Similarly, add an AWS Glue endpoint and S3 endpoint for MongoDB with the following changes:

  • Choose the VPC of the Amazon MongoDB instance

The Amazon security group must include itself as a source in its inbound rules. Complete the following steps for both Amazon DocumentDB and MongoDB instances separately:

  1. On the Security Groups page, choose Edit Inbound Rules.
  2. Choose Add rule.
  3. For Type, choose All traffic.
  4. For Source, choose the same security group.
  5. Choose Save rules.

Choose Save rules.

The objective of setting up a connection is to establish private connections between the Amazon DocumentDB and MongoDB instances in the VPC and AWS Glue via the S3 endpoint, AWS Glue endpoint, and security group. It’s not required to test the connection because that connection is established by the AWS Glue job when you run it. At the time of writing, testing an AWS Glue connection is not supported for Amazon DocumentDB connections.

Code for building the AWS Glue ETL job

The following sample code sets up a read connection with Amazon DocumentDB for your AWS Glue ETL job (PySpark):

read_docdb_options = {
    "uri": documentdb_uri,
    "database": "test",
    "collection": "profiles",
    "username": "<username>",
    "password": "<password>",
    "ssl": "true",
    "ssl.domain_match": "false",
    "partitioner": "MongoSamplePartitioner",
    "partitionerOptions.partitionSizeMB": "10",
    "partitionerOptions.partitionKey": "_id"
}

The following sample code sets up a write connection with Amazon DocumentDB for your AWS Glue ETL job (PySpark):

write_documentdb_options = {
    "uri": documentdb_write_uri,
    "database": "test",
    "collection": "collection1",
    "username": "<username>",
    "password": "<password>",
    "ssl": "true",
    "ssl.domain_match": "false",
    "partitioner": "MongoSamplePartitioner",
    "partitionerOptions.partitionSizeMB": "10",
    "partitionerOptions.partitionKey": "_id"
}

The following sample code creates an AWS Glue DynamicFrame by using the read and write connections for your AWS Glue ETL job (PySpark):

# Get DynamicFrame from DocumentDB
dynamic_frame2 = glueContext.create_dynamic_frame.from_options(connection_type="documentdb",
                                                               connection_options=read_docdb_options)

# Write DynamicFrame to DocumentDB
glueContext.write_dynamic_frame.from_options(dynamic_frame2, connection_type="documentdb",
                                             connection_options=write_documentdb_options)

Setting up AWS Glue ETL jobs

You’re now ready to set up your ETL job in AWS Glue. Complete the following steps for both Amazon DocumentDB and MongoDB instances separately:

  1. On the AWS Glue console, under ETL, choose Jobs.
  2. Choose Add job.
  3. For Job Name, enter a name.
  4. For IAM role, choose the IAM role you created as a prerequisite.
  5. For Type, choose Spark.
  6. For Glue Version, choose Python (latest version).
  7. For This job runs, choose An existing script that you provide.
  8. Choose the Amazon S3 path where the script (DocumentDB-Glue-ETL.py) is stored.
  9. Under Advanced properties, enable Job bookmark.

Job bookmarks help AWS Glue maintain state information and prevent the reprocessing of old data.

  1. Keep the remaining settings at their defaults and choose Next.
  2. For Connections, choose the Amazon DocumentDB connection you created.
  3. Choose Save job and edit scripts.
  4. Edit the following parameters:
    1. documentdb_uri or mongo_uri
    2. documentdb_write_uri or write_uri
    3. user
    4. password
    5. output_path
  5. Choose Run job.

When the job is finished, validate the data loaded in the collection.

Similarly, add the job for MongoDB with the following changes:

  • Choose the Amazon S3 path where the script (MongoDB-Glue-ETL.py) is stored
  • For Connections, choose the Amazon MongoDB connection you created
  • Change the parameters applicable to MongoDB (mongo_uri and write_uri)

Cleaning up

After you finish, don’t forget to delete the CloudFormation stack, because some of the AWS resources deployed by the stack in this post incur a cost as long as you continue to use them.

You can delete the CloudFormation stack to delete all AWS resources created by the stack.

  1. On the AWS CloudFormation console, on the Stacks page, select the stack to delete. The stack must be currently running.
  2. On the stack details page, choose Delete.
  3. Choose Delete stack when prompted.

Additionally, delete the AWS Glue endpoint, S3 endpoint, AWS Glue connections, and AWS Glue ETL jobs.

Summary

In this post, we showed you how to build AWS Glue ETL Spark jobs and set up connections using ConnectionType for Amazon DocumentDB and MongoDB databases using AWS CloudFormation. You can use this solution to read data from Amazon DocumentDB or MongoDB, and transform it and write to Amazon DocumentDB or MongoDB or other targets like Amazon S3 (using Amazon Athena to query), Amazon Redshift, Amazon DynamoDB, Amazon Elasticsearch Service (Amazon ES), and more.

If you have any questions or suggestions, please leave a comment.


About the Authors

Naresh Gautam is a Sr. Analytics Specialist Solutions Architect at AWS. His role is helping customers architect highly available, high-performance, and cost-effective data analytics solutions to empower customers with data-driven decision-making. In his free time, he enjoys meditation and cooking.

 

 

Srikanth Sopirala is a Sr. Analytics Specialist Solutions Architect at AWS. He is a seasoned leader with over 20 years of experience, who is passionate about helping customers build scalable data and analytics solutions to gain timely insights and make critical business decisions. In his spare time, he enjoys reading, spending time with his family and road biking.

Writing to Apache Hudi tables using AWS Glue Custom Connector

Post Syndicated from Vishal Pathak original https://aws.amazon.com/blogs/big-data/writing-to-apache-hudi-tables-using-aws-glue-connector/

In today’s world, most organizations have to tackle the 3 V’s of variety, volume and velocity of big data. In this blog post, we talk about dealing with the variety and volume aspects of big data. The challenge of dealing with the variety involves processing data from various SQL and NoSQL systems. This variety can include data from rdbms sources such as Amazon Aurora or NoSQL sources such as Amazon DynamoDB or 3rd party APIs.

AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development. In order to enable customers process data from a variety of sources, the AWS Glue team has introuduced AWS Glue Custom Connectors, a new capability in AWS Glue and AWS Glue Studio that makes it easy for you to transfer data from SaaS applications and custom data sources to your data lake in Amazon S3. With just a few clicks, you can search and select connectors from the AWS Marketplace and begin your data preparation workflow in minutes. This new feature is over and above the AWS Glue Connections feature in the AWS Glue service.

In this post, we simplify the process to create Hudi tables with AWS Glue Custom Connector. The jar wrapped by the first version of AWS Glue Custom Connector is based on Apache Hudi 0.5.3. Instructions on creating the JAR file are in the previous post of this series.

Whereas the first post focused on creating an end-to-end architecture for replicating the data in a rdbms source to Lakehouse, this post focuses on volume aspect of big data. In this post, we create a Hudi table with an initial load of over 200 million records and then update 70 million of those records. The connector not only writes the data to Amazon Simple Storage Service (Amazon S3), but also creates the tables in the AWS Glue Data Catalog. If you’re creating a partitioned Hudi table, the connector also creates the partitions in the Data Catalog. We discuss the code for creating a partitioned Hudi table in the previous post in this series.

We use the Copy On Write storage type, which gives better read performance compared to Merge On Read. For more information about Hudi storage types, see Hudi Dataset Storage Types and Storage Types & Views.

Note that this post focuses on using the AWS Glue Custom Connector to write to Apache Hudi tables. Please implement other best practices such as encryption and network security while implementing the architecture for your workloads.

Creating the Apache Hudi connection using AWS Glue Custom Connector

To create your AWS Glue job with an AWS Glue Custom Connector, complete the following steps:

  1. Go to the AWS Glue Studio Console, search for AWS Glue Connector for Apache Hudi and choose AWS Glue Connector for Apache Hudi link.
    Go to the AWS Glue Studio Console, search for AWS Glue Connector for Apache Hudi and choose AWS Glue Connector for Apache Hudi link.
  2. Choose Continue to Subscribe.
    Choose Continue to Subscribe
  3. Review the Terms and Conditions and choose the Accept Terms button to continue.Review the Terms and Conditions and choose the Accept Terms button to continue.
  4. Make sure that the subscription is complete and you see the Effective date populated next to the product and then choose Continue to Configuration button.
    Make sure that the subscription is complete and you see the Effective date populated next to the product and then choose Continue to Configuration button.
  5. As of writing this blog, 0.5.3 is the latest version of the AWS Glue Connector for Apache Hudi. Make sure that 0.5.3 (Nov 19, 2020) is selected in the Software Version dropdown and Activate in AWS Glue Studio is selected in the Delivery Method dropdown. Choose Continue to Launch button.
    5. Choose Continue to Launch button.
  6. Under Launch this software, choose Usage Instructions and then choose Activate the Glue connector for Apache Hudi in AWS Glue Studio.
    6. Activate the Glue connector for Apache Hudi in AWS Glue Studio.

You’re redirected to AWS Glue Studio.

  1. For Name, enter a name for your connection (for example, hudi-connection).
  2. For Description, enter a description.
    8. For Description, enter a description.
  3. Choose Create connection and activate connector.

A message appears that the connection was successfully created, and the connection is now visible on the AWS Glue Studio console.

A message appears that the connection was successfully created, and the connection is now visible on the AWS Glue Studio console.

Configuring resources and permissions

For this post, we provide an AWS CloudFormation template to create the following resources:

  • Two AWS Glue jobs: hudi-init-load-job and hudi-upsert-job
  • An S3 bucket to store the Python scripts for these jobs
  • An S3 bucket to store the output files of these jobs
  • An AWS Lambda function to copy the scripts from the public S3 bucket to your account
  • AWS Identity and Access Management (IAM) roles and policies with appropriate permissions

Launch the following stack, providing your connection name, created in Step 9 of the previous section, for the HudiConnectionName parameter:

Launch the following stack, providing your connection name for the HudiConnectionName parameter:

Please check I acknowledge that AWS CloudFormation might create IAM resources with custom names check box before clicking the Create Stack button.

If you have AWS Lake Formation enabled in the Region in which you’re implementing this solution, make sure that you give HudiConnectorExecuteGlueHudiJobRole Create table permission in the default database. HudiConnectorExecuteGlueHudiJobRole is created by the CloudFormation stack that you created above.

Create table permission in the default database.

HudiConnectorExecuteGlueHudiJobRole should also have Create Database permission. You can grant this permission in Database creators section under Admins and database creators tab.

You can grant this permission in Database creators section under Admins and database creators tab.

Running the load job

You’re now ready to run the first of your two jobs. 

  1. On the AWS Glue console, select the job hudi-init-load-job.
  2. On the Action menu, choose Run job.
    On the Action menu, choose Run job.

My job finished in less than 10 minutes. The job inserted over 204 million records into the Hudi table.

The job inserted over 204 million records into the Hudi table.

Although rest of the code is standard Hudi PySpark code, I want to call out the last line of the code to show how easy it is to write to Hudi tables using AWS Glue:

glueContext.write_dynamic_frame.from_options(frame = DynamicFrame.fromDF(inputDf, glueContext, "inputDf"), connection_type = "marketplace.spark", connection_options = combinedConf)

In the preceding code, combinedConf is a Python dictionary that includes all your Apache Hudi configurations. You can download the HudiInitLoadNYTaxiData.py script to use.

Querying the data

The ny_yellow_trip_data table is now visible in the default database, and you can query it through Athena.

If you have Lake Formation enabled in this Region, the role or user querying the table should have Select permissions on the table.

You can now run the following query:

select count(*) cnt, vendorid from default.ny_yellow_trip_data group by vendorid

The following screenshot shows our output.

The following screenshot shows our output.

If you have Lake Formation enabled in this Region, make sure that you give Drop permission to HudiConnectorExecuteLambdaFnsRole so the CloudFormation template can drop the default.ny_yellow_trip_data table when you delete the stack.

If you have Lake Formation enabled in this Region, make sure that you give Drop permission to HudiConnectorExecuteLambdaFnsRole so the CloudFormation template can drop the default.ny_yellow_trip_data table when you delete the stack.

Running the upsert job

You can now run your second job, hudi-upsert-job. This job reads the newly written data and updates the vendor IDs of all the records that have vendorid=1. The new vendor ID for these records (over 78 million) is set as 9. You can download the HudiUpsertNYTaxiData.py script to use.

This job also finished in under 10 minutes.

This job also finished in under 10 minutes.

Querying the updated data

You can now query the updated Hudi table in Athena. The following screenshot shows that the vendor ID of over 78 million records has been changed to 9.

The following screenshot shows that the vendor ID of over 78 million records has been changed to 9.

Additional considerations

The AWS Glue Connector for Apache Hudi has not been tested for AWS Glue streaming jobs. Additionally, there are some hardcoded Hudi options in the AWS Glue job scripts. These options are set for the sample table that we create for this post. Update the options based on your workload.

Conclusion

In this post, we created an Apache Hudi table with AWS Glue Custom Connector and AWS Glue 2.0 jobs. We read over 200 million records from a public S3 bucket and created an Apache Hudi table using it. We then updated over 70 million of these records. With the new AWS Glue Custom Connector feature, we can now directly write an AWS Glue DynamicFrame to an Apache Hudi table.

Note that you can also use Glue jobs to write to Apache Hudi MoR tables. Creating a source to Lakehouse data replication pipe using Apache Hudi, AWS Glue, AWS DMS, and Amazon Redshift talks about the process in detail. While it uses jars as an external dependency, you can now use the AWS Glue Connector for Apache Hudi for the same operation. The post uses HudiJob.py to write to MoR tables and then uses HudiMoRCompactionJob.scala to compact the MoR tables. Note that HudiMoRCompactionJob.scala has also been implemented using Glue jobs and hence you can use AWS Glue for compaction job too.


About the Author

Vishal Pathak is a Data Lab Solutions Architect at AWS. Vishal works with the customers on their use cases, architects a solution to solve their business problems and helps the customers build an scalable prototype. Prior to his journey in AWS, Vishal helped customers implement BI, DW and DataLake projects in US and Australia.

Validate, evolve, and control schemas in Amazon MSK and Amazon Kinesis Data Streams with AWS Glue Schema Registry

Post Syndicated from Brian Likosar original https://aws.amazon.com/blogs/big-data/validate-evolve-and-control-schemas-in-amazon-msk-and-amazon-kinesis-data-streams-with-aws-glue-schema-registry/

Data streaming technologies like Apache Kafka and Amazon Kinesis Data Streams capture and distribute data generated by thousands or millions of applications, websites, or machines. These technologies serve as a highly available transport layer that decouples the data-producing applications from data processors. However, the sheer number of applications producing, processing, routing, and consuming data can make it hard to coordinate and evolve data schemas, like adding or removing a data field, without introducing data quality issues and downstream application failures. Developers often build complex tools, write custom code, or rely on documentation, change management, and Wikis to protect against schema changes. This is quite error prone because it relies too heavily on human oversight. A common solution with data streaming technologies is a schema registry that provides for validation of schema changes to allow for safe evolution as business needs adjust over time.

AWS Glue Schema Registry, a serverless feature of AWS Glue, enables you to validate and reliably evolve streaming data against Apache Avro schemas at no additional charge. Through Apache-licensed serializers and deserializers, the Glue Schema Registry integrates with Java applications developed for Apache Kafka, Amazon Managed Streaming for Apache Kafka (Amazon MSK), Kinesis Data Streams, Apache Flink, Amazon Kinesis Data Analytics for Apache Flink, and AWS Lambda.

This post explains the benefits of using the Glue Schema Registry and provides examples of how to use it with both Apache Kafka and Kinesis Data Streams.

With the Glue Schema Registry, you can eliminate defensive coding and cross-team coordination, improve data quality, reduce downstream application failures, and use a registry that is integrated across multiple AWS services. Each schema can be versioned within the guardrails of a compatibility mode, providing developers the flexibility to reliably evolve schemas. Additionally, the Glue Schema Registry can serialize data into a compressed format, helping you save on data transfer and storage costs.

Although there are many ways to leverage the Glue Schema Registry (including using the API to build your own integrations), in this post, we show two use cases. The Schema Registry is a free feature that can significantly improve data quality and developer productivity. If you use Avro schemas, you should be using the Schema Registry to supplement your solutions built on Apache Kafka (including Amazon MSK) or Kinesis Data Streams. The following diagram illustrates this architecture.

AWS Glue Schema Registry features

Glue Schema Registry has the following features:

  • Schema discovery – When a producer registers a schema change, metadata can be applied as a key-value pair to provide searchable information for administrators or developers. This metadata can indicate the original source of the data (source=MSK_west), the team name to contact (owner=DataEngineering), or AWS tags (environment=Production). You could potentially encrypt a field in your data on the producing client and use metadata to specify to potential consumer clients which public key fingerprint to use for decryption.
  • Schema compatibility – The versioning of each schema is governed by a compatibility mode. If a new version of a schema is requested to be registered that breaks the specified compatibility mode, the request fails and an exception is thrown. Compatibility checks enable developers building downstream applications to have a bounded set of scenarios to build applications against, which helps to prepare for the changes without issue. Commonly used modes are FORWARD, BACKWARD, and FULL. For more information about mode definitions, see Schema Versioning and Compatibility.
  • Schema validation – Glue Schema Registry serializers work to validate that the schema used during data production is compatible. If it isn’t, the data producer receives an exception from the serializer. This ensures that potentially breaking changes are found earlier in development cycles, and can also help prevent unintentional schema changes due to human error.
  • Auto-registration of schemas – If configured to do so, the producer of data can auto-register schema changes as they flow in the data stream. This is especially useful for use cases where the source of the data is change data capture from a database.
  • IAM support – Thanks to integrated AWS Identity and Access Management (IAM) support, only authorized producers can change certain schemas. Furthermore, only those consumers authorized to read the schema can do so. Schema changes are typically performed deliberately and with care, so it’s important to use IAM to control who performs these changes. Additionally, access control to schemas is important in situations where you might have sensitive information included in the schema definition itself. In the examples that follow, IAM roles are inferred via the AWS SDK for Java, so they are inherited from the Amazon Elastic Compute Cloud (Amazon EC2) instance’s role that the application runs in. IAM roles can also be applied to any other AWS service that could contain this code, such as containers or Lambda functions.
  • Integrations and other support – The provided serializers and deserializers are currently for Java clients using Apache Avro for data serialization. The GitHub repo also contains support for Apache Kafka Streams, Apache Kafka Connect, and Apache Flink—all licensed using the Apache License 2.0. We’re already working on additional language and data serialization support, but we need your feedback on what you’d like to see next.
  • Secondary deserializer – If you have already registered schemas in another schema registry, there’s an option for specifying a secondary deserializer when performing schema lookups. This allows for migrations from other schema registries without having to start anew. If the schema ID being used isn’t known to the Glue Schema Registry, it’s looked for in the secondary deserializer.
  • Compression – Using the Avro format already reduces message size due to its compact, binary format. Using a schema registry can further reduce data payload by no longer needing to send and receive schemas with each message. Glue Schema Registry libraries also provide an option for zlib compression, which can reduce data requirements even further by compressing the payload of the message. This varies by use case, but compression can reduce the size of the message significantly.

Example schema

For this post, we use the following schema to begin each of our use cases:

{
 "namespace": "Customer.avro",
 "type": "record",
 "name": "Customer",
 "fields": [
 {"name": "first_name", "type": "string"},
 {"name": "last_name", "type": "string"}
 ]
}

Using AWS Glue Schema Registry with Amazon MSK and Apache Kafka

You can use the following Apache Kafka producer code to produce Apache Avro formatted messages to a topic with the preceding schema:

package com.amazon.gsrkafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import com.amazonaws.services.schemaregistry.serializers.avro.AWSKafkaAvroSerializer;
import com.amazonaws.services.schemaregistry.serializers.avro.AWSAvroSerializer;
import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import java.util.Properties;
import java.io.IOException;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.File;

public class gsrkafka {
private static final Properties properties = new Properties();
private static final String topic = "test";
public static void main(final String[] args) throws IOException {
// Set the default synchronous HTTP client to UrlConnectionHttpClient
System.setProperty("software.amazon.awssdk.http.service.impl", "software.amazon.awssdk.http.urlconnection.UrlConnectionSdkHttpService");
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, AWSKafkaAvroSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AWSKafkaAvroSerializer.class.getName());
properties.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2");
properties.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "liko-schema-registry");
properties.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "customer");
properties.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL);
properties.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
Schema schema_customer = new Parser().parse(new File("Customer.avsc"));
GenericRecord customer = new GenericData.Record(schema_customer);

try (KafkaProducer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(properties)) {
final ProducerRecord<String, GenericRecord> record = new ProducerRecord<String, GenericRecord>(topic, customer);
customer.put("first_name", "Ada");
customer.put("last_name", "Lovelace");
customer.put("full_name", "Ada Lovelace");
producer.send(record);
System.out.println("Sent message");
Thread.sleep(1000L);

customer.put("first_name", "Sue");
customer.put("last_name", "Black");
customer.put("full_name", "Sue Black");
producer.send(record);
System.out.println("Sent message");
Thread.sleep(1000L);

customer.put("first_name", "Anita");
customer.put("last_name", "Borg");
customer.put("full_name", "Anita Borg");
producer.send(record);
System.out.println("Sent message");
Thread.sleep(1000L);

customer.put("first_name", "Grace");
customer.put("last_name", "Hopper");
customer.put("full_name", "Grace Hopper");
producer.send(record);
System.out.println("Sent message");
Thread.sleep(1000L);

customer.put("first_name", "Neha");
customer.put("last_name", "Narkhede");
customer.put("full_name", "Neha Narkhede");
producer.send(record);
System.out.println("Sent message");
Thread.sleep(1000L);
producer.flush();
System.out.println("Successfully produced 5 messages to a topic called " + topic);
} catch (final InterruptedException | SerializationException e) {
e.printStackTrace();
}
}
}

Use the following Apache Kafka consumer code to look up the schema information while consuming from a topic to learn the schema details:

package com.amazon.gsrkafka;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer;
import com.amazonaws.services.schemaregistry.deserializers.avro.AWSAvroDeserializer;
import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import java.util.Collections;
import java.util.Properties;
import java.io.IOException;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.File;


public class gsrkafka {
private static final Properties properties = new Properties();
private static final String topic = "test";
public static void main(final String[] args) throws IOException {
// Set the default synchronous HTTP client to UrlConnectionHttpClient
System.setProperty("software.amazon.awssdk.http.service.impl", "software.amazon.awssdk.http.urlconnection.UrlConnectionSdkHttpService");
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "gsr-client");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, AWSKafkaAvroDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AWSKafkaAvroDeserializer.class.getName());
properties.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2");
properties.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "liko-schema-registry");
properties.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

try (final KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(properties)) {
consumer.subscribe(Collections.singletonList(topic));
while (true) {
final ConsumerRecords<String, GenericRecord> records = consumer.poll(1000);
for (final ConsumerRecord<String, GenericRecord> record : records) {
final GenericRecord value = record.value();
System.out.println("Received message: value = " + value);
}
			}
} catch (final SerializationException e) {
e.printStackTrace();
}
}
}

Using AWS Glue Schema Registry with Kinesis Data Streams

You can use the following Kinesis Producer Library (KPL) code to publish messages in Apache Avro format to a Kinesis data stream with the preceding schema:

private static final String SCHEMA_DEFINITION = "{"namespace": "Customer.avro",\n"
+ " "type": "record",\n"
+ " "name": "Customer",\n"
+ " "fields": [\n"
+ " {"name": "first_name", "type": "string"},\n"
+ " {"name": "last_name", "type": "string"}\n"
+ " ]\n"
+ "}";

KinesisProducerConfiguration config = new KinesisProducerConfiguration();
config.setRegion("us-west-1")

//[Optional] configuration for Schema Registry.

GlueSchemaRegistryConfiguration schemaRegistryConfig = 
new GlueSchemaRegistryConfiguration("us-west-1");

schemaRegistryConfig.setCompression(true);

config.setGlueSchemaRegistryConfiguration(schemaRegistryConfig);

///Optional configuration ends.

final KinesisProducer producer = 
new KinesisProducer(config);

final ByteBuffer data = getDataToSend();

com.amazonaws.services.schemaregistry.common.Schema gsrSchema = 
new Schema(SCHEMA_DEFINITION, DataFormat.AVRO.toString(), "demoSchema");

ListenableFuture<UserRecordResult> f = producer.addUserRecord(
config.getStreamName(), TIMESTAMP, Utils.randomExplicitHashKey(), data, gsrSchema);

private static ByteBuffer getDataToSend() {
org.apache.avro.Schema avroSchema = 
new org.apache.avro.Schema.Parser().parse(SCHEMA_DEFINITION);

GenericRecord user = new GenericData.Record(avroSchema);
user.put("name", "Emily");
user.put("favorite_number", 32);
user.put("favorite_color", "green");

ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().directBinaryEncoder(outBytes, null);
new GenericDatumWriter<>(avroSchema).write(user, encoder);
encoder.flush();
return ByteBuffer.wrap(outBytes.toByteArray());
}

On the consumer side, you can use the Kinesis Client Library (KCL) (v2.3 or later) to look up schema information while retrieving messages from a Kinesis data stream:

GlueSchemaRegistryConfiguration schemaRegistryConfig = 
new GlueSchemaRegistryConfiguration(this.region.toString());

 GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer = 
new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), schemaRegistryConfig);

 RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient));
 retrievalConfig.glueSchemaRegistryDeserializer(glueSchemaRegistryDeserializer);
 
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
retrievalConfig
);

 public void processRecords(ProcessRecordsInput processRecordsInput) {
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
log.info("Processing {} record(s)", 
processRecordsInput.records().size());
processRecordsInput.records()
.forEach(
r -> 
log.info("Processed record pk: {} -- Seq: {} : data {} with schema: {}", 
r.partitionKey(), r.sequenceNumber(), recordToAvroObj(r).toString(), r.getSchema()));
} catch (Throwable t) {
log.error("Caught throwable while processing records. Aborting.");
Runtime.getRuntime().halt(1);
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
 }
 
 private GenericRecord recordToAvroObj(KinesisClientRecord r) {
byte[] data = new byte[r.data().remaining()];
r.data().get(data, 0, data.length);
org.apache.avro.Schema schema = new org.apache.avro.Schema.Parser().parse(r.schema().getSchemaDefinition());
DatumReader datumReader = new GenericDatumReader<>(schema);

BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(data, 0, data.length, null);
return (GenericRecord) datumReader.read(null, binaryDecoder);
 }

Example of schema evolution

As a producer, let’s say you want to add an additional field to our schema:

{
 "namespace": "Customer.avro",
 "type": "record",
 "name": "Customer",
 "fields": [
 {"name": "first_name", "type": "string"},
 {"name": "last_name", "type": "string"},
 {"name": "full_name", "type": ["string", “null”], “default”: null}
]
}

Regardless of whether you’re following the Apache Kafka or Kinesis Data Streams example, you can use the previously provided producer code to publish new messages using this new schema version with the full_name field. This is simply a concatenation of first_name and last_name.

This schema change added an optional field (full_name), which is indicated by the type field having an option of null in addition to string with a default of null. In adding this optional field, we’ve created a schema evolution. This qualifies as a FORWARD compatible change because the producer has modified the schema and the consumer can read without updating its version of the schema. It’s a good practice to provide a default for a given field. This allows for its eventual removal if necessary. If it’s removed by the producer, the consumer uses the default that it knew for that field from before the removal.

This change is also a BACKWARD compatible change, because if the consumer changes the schema it expects to receive, it can use that default to fill in the value for the field it isn’t receiving. By being both FORWARD and BACKWARD compatible, it is therefore a FULL compatible change. The Glue Schema Registry serializers default to BACKWARD compatible, so we have to add a line declaring it as FULL.

In looking at the full option set, you may find FORWARD_ALL, BACKWARD_ALL, and FULL_ALL. These typically only come into play when you want to change data types for a field whose name you don’t change. The most common observed compatibility mode is BACKWARD, which is why it’s the default.

As a consumer application, however, you don’t want to have to recompile your application to handle the addition of a new field. If you want to reference the customer by full name, that’s your choice in your app instead of being forced to consume the new field and use it. When you consume the new messages you’ve just produced, your application doesn’t crash or have problems, because it’s still using the prior version of the schema, and that schema change is compatible with your application. To experience this in action, run the consumer code in one window and don’t interrupt it. As you run the producer application again, this time with messages following the new schema, you can still see output without issue, thanks to the Glue Schema Registry.

Conclusion

In this post, we discussed the benefits of using the Glue Schema Registry to register, validate, and evolve schemas for data streams as business needs change. We also provided examples of how to use Glue Schema Registry with Apache Kafka and Kinesis Data Streams.

For more information and to get started, see AWS Glue Schema Registry.


About the Authors

Brian Likosar is a Senior Streaming Specialist Solutions Architect at Amazon Web Services. Brian loves helping customers capture value from real-time streaming architectures, because he knows life doesn’t happen in batch. He’s a big fan of open-source collaboration, theme parks, and live music.

 

 

Larry Heathcote is a Senior Product Marketing Manager at Amazon Web Services for data streaming and analytics. Larry is passionate about seeing the results of data-driven insights on business outcomes. He enjoys walking his Samoyed Sasha in the mornings so she can look for squirrels to bark at.

 

 

Automating Recommendation Engine Training with Amazon Personalize and AWS Glue

Post Syndicated from Alexander Spivak original https://aws.amazon.com/blogs/architecture/automating-recommendation-engine-training-with-amazon-personalize-and-aws-glue/

Customers from startups to enterprises observe increased revenue when personalizing customer interactions. Still, many companies are not yet leveraging the power of personalization, or, are relying solely on rule-based strategies. Those strategies are effort-intensive to maintain and not effective. Common reasons for not launching machine learning (ML) based personalization projects include: the complexity of aggregating and preparing the datasets, gaps in data science expertise and the lack of trust regarding the quality of ML recommendations.

This blog post demonstrates an approach for product recommendations to mitigate those concerns using historical datasets. To get started with your personalization journey, you don’t need ML expertise or a data lake. The following serverless end-to-end architecture involves aggregating and transforming the required data, as well as automatically training an ML-based recommendation engine.

I will outline the architectural production-ready setup for personalized product recommendations based on historical datasets. This is of interest to data analysts who look for ways to bring an existing recommendation engine to production, as well as solutions architects new to ML.

Solution Overview

The two core elements to create a proof-of-concept for ML-based product recommendations are:

  1. the recommendation engine and,
  2. the data set to train the recommendation engine.

Let’s start with the recommendation engine first, and work backwards to the corresponding data needs.

Product recommendation engine

To create the product recommendation engine, we use Amazon Personalize. Amazon Personalize differentiates three types of input data:

  1. user events called interactions (user events like views, signups or likes),
  2. item metadata (description of your items: category, genre or availability), and
  3. user metadata (age, gender, or loyalty membership).

An interactions dataset is typically the minimal requirement to build a recommendation system. Providing user and item metadata datasets improves recommendation accuracy, and enables cold starts, item discovery and dynamic recommendation filtering.

Most companies already have existing historical datasets to populate all three input types. In the case of retail companies, the product order history is a good fit for interactions. In the case of the media and entertainment industry, the customer’s consumption history maps to the interaction dataset. The product and media catalogs map to the items dataset and the customer profiles to the user dataset.

Amazon Personalize: from datasets to a recommendation API

Amazon Personalize: from datasets to a recommendation API

The Amazon Personalize Deep Dive Series provides a great introduction into the service and explores the topics of training, inference and operations. There are also multiple blog posts available explaining how to create a recommendation engine with Amazon Personalize and how to select the right metadata for the engine training. Additionally, the Amazon Personalize samples repository in GitHub showcases a variety of topics: from getting started with Amazon Personalize, up to performing a POC in a Box using existing datasets, and, finally, automating the recommendation engine with MLOps. In this post, we focus on getting the data from the historical data sources into the structure required by Amazon Personalize.

Creating the dataset

While manual data exports are a quick way to get started with one-time datasets for experiments, we use AWS Glue to automate this process. The automated approach with AWS Glue speeds up the proof of concept (POC) phase and simplifies the process to production by:

  • easily reproducing dataset exports from various data sources. This are used to iterate with other feature sets for recommendation engine training.
  • adding additional data sources and using those to enrich existing datasets
  • efficiently performing transformation logic like column renaming and fuzzy matching out of the box with code generation support.

AWS Glue is a serverless data integration service that is scalable and simple to use. It provides all of the capabilities needed for data integration and supports a wide variety of data sources: Amazon S3 buckets, JDBC connectors, MongoDB databases, Kafka, and Amazon Redshift, the AWS data warehouse. You can even make use of data sources living outside of your AWS environment, e.g. on-premises data centers or other services outside of your VPC. This enables you to perform a data-driven POC even when the data is not yet in AWS.

Modern application environments usually combine multiple heterogeneous database systems, like operational relational and NoSQL databases, in addition to, the BI-powering data warehouses. With AWS Glue, we orchestrate the ETL (extract, transform, and load) jobs to fetch the relevant data from the corresponding data sources. We then bring it into a format that Amazon Personalize understands: CSV files with pre-defined column names hosted in an Amazon S3 bucket.

Each dataset consists of one or multiple CSV files, which can be uniquely identified by an Amazon S3 prefix. Additionally, each dataset must have an associated schema describing the structure of your data. Depending on the dataset type, there are required and pre-defined fields:

  • USER_ID (string) and one additional metadata field for the users dataset
  • ITEM_ID (string) and one additional metadata field for the items dataset
  • USER_ID (string), ITEM_ID (string), TIMESTAMP (long; as Epoch time) for the interactions dataset

The following graph presents a high-level architecture for a retail customer, who has a heterogeneous data store landscape.

Using AWS Glue to export datasets from heterogeneous data sources to Amazon S3

Using AWS Glue to export datasets from heterogeneous data sources to Amazon S3

To understand how AWS Glue connects to the variety of data sources and allows transforming the data into the required format, we need to drill down into the AWS Glue concepts and components.

One of the key components of AWS Glue is the AWS Glue Data Catalog: a persistent metadata store containing table definitions, connection information, as well as, the ETL job definitions.
The tables are metadata definitions representing the structure of the data in the defined data sources. They do not contain any data entries from the sources but solely the structure definition. You can create a table either manually or automatically by using AWS Glue Crawlers.

AWS Glue Crawlers scan the data in the data sources, extract the schema information from it, and store the metadata as tables in the AWS Glue Data Catalog. This is the preferred approach for defining tables. The crawlers use AWS Glue Connections to connect to the data sources. Each connection contains the properties that are required to connect to a particular data store. The connections will be also used later by the ETL jobs to fetch the data from the data sources.

AWS Glue Crawlers also help to overcome a challenge frequently appearing in microservice environments. Microservice architectures are frequently operated by fully independent and autonomous teams. This means that keeping track of changes to the data source format becomes a challenge. Based on a schedule, the crawlers can be triggered to update the metadata for the relevant data sources in the AWS Glue Data Catalog automatically. To detect cases when a schema change would break the ETL job logic, you can combine the CloudWatch Events emitted by AWS Glue on updating the Data Catalog tables with an AWS Lambda function or a notification send via the Amazon Simple Notification Service (SNS).

The AWS Glue ETL jobs use the defined connections and the table information from the Data Catalog to extract the data from the described sources, apply the user-defined transformation logic and write the results into a data sink. AWS Glue can automatically generate code for ETL jobs to help perform a variety of useful data transformation tasks. AWS Glue Studio makes the ETL development even simpler by providing an easy-to-use graphical interface that accelerates the development and allows designing jobs without writing any code. If required, the generated code can be fully customized.

AWS Glue supports Apache Spark jobs, written either in Python or in Scala, and Python Shell jobs. Apache Spark jobs are optimized to run in a highly scalable, distributed way dealing with any amount of data and are a perfect fit for data transformation jobs. The Python Shell jobs provide access to the raw Python execution environment, which is less scalable but provides a cost-optimized option for orchestrating AWS SDK calls.

The following diagram visualizes the interaction between the components described.

The basic concepts of populating your Data Catalog and processing ETL dataflow in AWS Glue

The basic concepts of populating your Data Catalog and processing ETL dataflow in AWS Glue

For each Amazon Personalize dataset type, we create a separate ETL job. Since those jobs are independent, they also can run in parallel. After all jobs have successfully finished, we can start the recommendation engine training. AWS Glue Workflows allow simplifying data pipelines by visualizing and monitoring complex ETL activities involving multiple crawlers, jobs, and triggers, as a single entity.

The following graph visualizes a typical dataset export workflow for training a recommendation engine, which consists of:

  • a workflow trigger being either manual or scheduled
  • a Python Shell job to remove the results of the previous export workflow from S3
  • a trigger firing when the removal job is finished and initiating the parallel execution of the dataset ETL jobs
  • the three Apache Spark ETL jobs, one per dataset type
  • a trigger firing when all three ETL jobs are finished and initiating the training notification job
  • a Python Shell job to initiate a new dataset import or a full training cycle in Amazon Personalize (e.g. by triggering the MLOps pipeline using the AWS SDK)

 

AWS Glue workflow for extracting the three datasets and triggering the training workflow of the recommendation engine

AWS Glue workflow for extracting the three datasets and triggering the training workflow of the recommendation engine

Combining the data export and the recommendation engine

In the previous sections, we discussed how to create an ML-based recommendation engine and how to create the datasets for the training of the engine. In this section, we combine both parts of the solution leveraging an adjusted version of the MLOps pipeline solution available on GitHub to speed up the iterations on new solution versions by avoiding manual steps. Moreover, automation means new items can be put faster into production.

The MLOps pipeline uses a JSON file hosted in an S3 bucket to describe the training parameters for Amazon Personalize. The creation of a new parameter file version triggers a new training workflow orchestrated in a serverless manner using AWS Step Functions and AWS Lambda.

To integrate the Glue data export workflow described in the previous section, we also enable the Glue workflow to trigger the training pipeline. Additionally, we manipulate the pipeline to read the parameter file as the first pipeline step. The resulting architecture enables an automated end-to-end set up from dataset export up to the recommendation engine creation.

End-to-end architecture combining the data export with AWS Glue, the MLOps training workflow and Amazon Personalize

End-to-end architecture combining the data export with AWS Glue, the MLOps training workflow, and Amazon Personalize

The architecture for the end-to-end data export and recommendation engine creation solution is completely serverless. This makes it highly scalable, reliable, easy to maintain, and cost-efficient. You pay only for what you consume. For example, in the case of the data export, you pay only for the duration of the AWS Glue crawler executions and ETL jobs. These are only need to run to iterate with a new dataset.

The solution is also flexible in terms of the connected data sources. This architecture is also recommended for use cases with a single data source. You can also start with a single data store and enrich the datasets on-demand with additional data sources in future iterations.

Testing the quality of the solution

A common approach to validate the quality of the solution is the A/B testing technique, which is widely used to measure the efficacy of generated recommendations. Based on the testing results, you can iterate on the recommendation engine by optimizing the underlying datasets and models. The high degree of automation increases the speed of iterations and the resiliency of the end-to-end process.

Conclusion

In this post, I presented a typical serverless architecture for a fully automated, end-to-end ML-based recommendation engine leveraging available historical datasets. As you begin to experiment with ML-based personalization, you will unlock value currently hidden in the data. This helps mitigate potential concerns like the lack of trust in machine learning and you can put the resulting engine into production.

Start your personalization journey today with the Amazon Personalize code samples and bring the engine to production with the architecture outlined in this blog. As a next step, you can involve recording real-time events to update the generated recommendations automatically based on the event data.

 

Building complex workflows with Amazon MWAA, AWS Step Functions, AWS Glue, and Amazon EMR

Post Syndicated from Dipankar Ghosal original https://aws.amazon.com/blogs/big-data/building-complex-workflows-with-amazon-mwaa-aws-step-functions-aws-glue-and-amazon-emr/

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a fully managed service that makes it easy to run open-source versions of Apache Airflow on AWS and build workflows to run your extract, transform, and load (ETL) jobs and data pipelines.

You can use AWS Step Functions as a serverless function orchestrator to build scalable big data pipelines using services such as Amazon EMR to run Apache Spark and other open-source applications on AWS in a cost-effective manner, and use AWS Glue for a serverless environment to prepare (extract and transform) and load large amounts of datasets from a variety of sources for analytics and data processing with Apache Spark ETL jobs

For production pipelines, a common use case is to read data originating from a variety of sources. This data requires transformation to extract business value and generate insights before sending to downstream applications, such as machine learning algorithms, analytics dashboards, and business reports.

This post demonstrates how to use Amazon MWAA as a primary workflow management service to create and run complex workflows and extend the directed acyclic graph (DAG) to start and monitor a state machine created using Step Functions. In Airflow, a DAG is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.

Architectural overview

The following diagram illustrates the architectural overview of the components involved in the orchestration of the workflow. This workflow uses Amazon EMR to preprocess data and starts a Step Functions state machine. The state machine transforms data using AWS Glue.

The state machine transforms data using AWS Glue.

The workflow includes the following core components:

  1. Airflow Scheduler triggers the DAG based on a schedule or manually.
  2. DAG uses PythonOperator to create an EMR cluster and waits for the cluster creation process to complete.
  3. DAG uses a custom operator EmrSubmitAndMonitorStepOperator to submit and monitor the Amazon EMR step.
  4. DAG uses PythonOperator to stop the EMR cluster when the preprocessing tasks are complete.
  5. DAG starts a Step Functions state machine and monitors it for completion using PythonOperator.

You can build complex ETL pipelines with Step Functions separately and trigger them from an Airflow DAG.

Prerequisites

Before starting, create an Amazon MWAA environment. If this is your first time using Amazon MWAA, see Introducing Amazon Managed Workflows for Apache Airflow (MWAA).

Take a note of the Amazon Simple Storage Service (Amazon S3) bucket that stores the DAGs. It’s located on the environment details page on the Amazon MWAA console.

Take a note of the Amazon Simple Storage Service (Amazon S3) bucket that stores the DAGs.

Also note the AWS Identity and Access Management (IAM) execution role. This role should be modified to allow MWAA to read and write from your S3 bucket, submit an Amazon EMR step, start a Step Functions state machine, and read from the AWS Systems Manager Parameter Store. The IAM role is available in the Permissions section of the environment details.

The IAM role is available in the Permissions section of the environment details.

The solution references Systems Manager parameters in an AWS CloudFormation template and scripts. For information on adding and removing IAM identity permissions, see Adding and removing IAM identity permissions. A sample IAM policy is also provided in the GitHub repository amazon-mwaa-complex-workflow-using-step-functions.

For this post, we use the MovieLens dataset. We concurrently convert the MovieLens CSV files to Parquet format and save them to Amazon S3 as part of preprocessing.

Setting up the state machine using Step Functions

Our solution extends the ETL pipeline to run a Step Functions state machine from the Airflow DAG. Step Functions lets you build visual workflows that enable fast translation of business requirements into technical requirements. With Step Functions, you can set up dependency management and failure handling using a JSON-based template. A workflow is a series of steps, such as tasks, choices, parallel runs, and timeouts with the output of one step acting as input into the next. For more information about other use cases, see AWS Step Functions Use Cases.

The following diagram shows the ETL process set up through a Step Functions state machine.

The following diagram shows the ETL process set up through a Step Functions state machine.

In the workflow, the Process Data step runs an AWS Glue job, and the Get Job Status step periodically checks for the job completion. The AWS Glue job reads the input datasets and creates output data for the most popular movies and top-rated movies. After the job is complete, the Run Glue Crawler step runs an AWS Glue crawler to catalog the data. The workflow also allows you to monitor and respond to failures at any stage.

Creating resources

Create your resources by following the installation instructions provided in the amazon-mwaa-complex-workflow-using-step-functions README.md.

Running the ETL workflow

To run your ETL workflow, complete the following steps:

  1. On the Amazon MWAA console, choose Open Airflow UI.
  2. Locate the mwaa_movielens_demo DAG.
  3. Turn on the DAG.

Turn on the DAG.

  1. Select the mwaa_movielens_demo DAG and choose Graph View.

This displays the overall ETL pipeline managed by Airflow.

This displays the overall ETL pipeline managed by Airflow.

  1. To view the DAG code, choose Code.

To view the DAG code, choose Code.

The code for the custom operator can be found in the amazon-mwaa-complex-workflow-using-step-functions GitHub repo. 

  1. From the Airflow UI, select the mwaa_movielens_demo DAG and choose Trigger DAG.
  2. Leave the Optional Configuration JSON box blank.

Leave the Optional Configuration JSON box blank.

When the Airflow DAG runs, the first task calls the PythonOperator to create an EMR cluster using Boto3. Boto is the AWS SDK for Python. It enables Python developers to create, configure, and manage AWS services, such as Amazon Elastic Compute Cloud (Amazon EC2) and Amazon S3. Boto provides object-oriented API, as well as low-level access to AWS services.

The second task waits until the EMR cluster is ready and in the Waiting state. As soon as the cluster is ready, the data load task runs, followed by the data preprocessing tasks, which are started in parallel using EmrSubmitAndMonitorStepOperator. Concurrency in the current Airflow DAG is set to 3, which runs three tasks in parallel. You can change the concurrency of Amazon EMR to run multiple Amazon EMR steps in parallel.

When the data preprocessing tasks are complete, the EMR cluster is stopped and the DAG starts the Step Functions state machine to initiate data transformation.

The final task in the DAG monitors the completion of the Step Functions state machine.

The DAG run should complete in approximately 10 minutes.

Verifying the DAG run

While the DAG is running, you can view the task logs.

  1. From Graph View, select any task and choose View Log.

From Graph View, select any task and choose View Log.

  1. When the DAG starts the Step Functions state machine, verify the status on the Step Functions console.

When the DAG starts the Step Functions state machine, verify the status on the Step Functions console.

  1. You can also monitor ETL process completion from the Airflow UI.

You can also monitor ETL process completion from the Airflow UI.

  1. On the Airflow UI, verify the completion from the log entries.

On the Airflow UI, verify the completion from the log entries.

Querying the data

After the successful completion of the Airflow DAG, two tables are created in the AWS Glue Data Catalog. To query the data with Amazon Athena, complete the following steps:

  1. On the Athena console, choose Databases.
  2. Select the mwaa-movielens-demo-db database.

You should see the two tables. If the tables aren’t listed, verify that the AWS Glue crawler run is complete and that the console is showing the correct Region.

  1. Run the following query:
    SELECT * FROM "mwaa-movielens-demo-db"."most_popular_movies" limit 10;

The following screenshot shows the output.

The following screenshot shows the output.

Cleaning up

To clean up the resources created as part of our CloudFormation template, delete the mwaa-demo-foundations stack. You can either use the AWS CloudFormation console or the AWS Command Line Interface (AWS CLI).

Conclusion

In this post, we used Amazon MWAA to orchestrate an ETL pipeline on Amazon EMR and AWS Glue with Step Functions. We created an Airflow DAG to demonstrate how to run data processing jobs concurrently and extended the DAG to start a Step Functions state machine to build a complex ETL pipeline. A custom Airflow operator submitted and then monitored the Amazon EMR steps synchronously.

If you have comments or feedback, please leave them in the comments section.


About the Author

Dipankar GhosalDipankar Ghosal is a Sr Data Architect at Amazon Web Services and is based out of Minneapolis, MN. He has a focus in analytics and enjoys helping customers solve their unique use cases. When he’s not working, he loves going hiking with his wife and daughter.

Estimating scoring probabilities by preparing soccer matches data with AWS Glue DataBrew

Post Syndicated from Arash Rowshan original https://aws.amazon.com/blogs/big-data/estimating-scoring-probabilities-by-preparing-soccer-matches-data-with-aws-glue-databrew/

In soccer (or football outside of the US), players decide to take shots when they think they can score. But how do they make that determination vs. when to pass or dribble? In a fraction of a second, in motion, while chased from multiple directions by other professional athletes, they think about their distance from the goal, the speed they’re running, the ball movement, the number of defenders, the goalkeeper’s position, their own shot power, accuracy, angle, and more. For a moment, time stands still. A decision is made. To shoot or not to shoot.

This post was inspired by AWS’s collaboration with Germany’s Bundesliga, which lead to Match Fact xGoals. We use representative sample datasets and walk through using AWS Glue DataBrew to prepare data prior to training a model that can predict the probability of a player scoring at any given moment in the game.

We start with two datasets that have the information about the events in the game and the players. We describe how a problem in this domain could be framed to benefit from data preparation and how that can lead us to making predictions.

No prior knowledge is required to enjoy this post. However, basic familiarity with data preparation and machine learning (ML) is beneficial. By the end of this post, you should have a good sense regarding what DataBrew offers as well as how you can apply the approaches offered here to other use cases.

Sample datasets

Let’s assume we have collected a lot of data from video records of soccer matches. We extracted the following dataset by taking a snapshot of every shot taken in every match. For each shot, we also recorded if it resulted in a goal or if it did not. The dataset is fictionalized and not based on historic soccer games.

The dataset is fictionalized and not based on historic soccer games.

The following table summarizes what each column contains.

Column Name Description
event_id Unique identifier for each record
game_minute Minute of the game between 0 to approximately 90
player_id Unique identifier for each player
player_name Name of the player who took the shot
defenders Number of defenders between the player and the opponent’s goalkeeper
player_position [x, y] coordinate of the player taking the shot
player_angle Angle of the player taking the shot
player_speed Speed of the player at the moment taking the shot in km/h
goalkeeper_position [x, y] coordinate of the opponent’s goalkeeper
situation Open play, free kick, or penalty kick
result Goal or no goal

We also use the FIFA 20 complete player dataset, a dataset on soccer players across the globe that attempts to estimate their qualities quantitatively. The following screenshot shows some of the columns this dataset contains.

The following screenshot shows some of the columns this dataset contains.

We’re interested in the following columns:

  • age
  • height
  • weight
  • overall
  • preferred_foot
  • weak_foot
  • pace
  • shooting
  • attacking_finishing
  • skill_fk_accuracy
  • movement_acceleration
  • movement_sprint_speed
  • power_shot_power
  • mentality_penalties 

We talk more about how each of these columns can be relevant later in this post.

As you can imagine, these datasets aren’t quite ready to train an ML model. We need to combine, clean, and extract values in numeric forms so we can start creating our training set. Normally that can take a lot of time and is somewhat tedious work. Data scientists prefer to focus on training their model, but they have to spend most of their time wrangling data to the shape that can be used for their ML flow.

But fear not! Enter AWS Glue DataBrew, your friendly neighborhood visual data prep tool. As businesswire puts it, “

“…AWS Glue DataBrew offers customers over 250 pre-built transformations to automate data preparation tasks that would otherwise require days or weeks writing hand-coded transformations.”

Let’s get to work and see how we can prepare a dataset for training an ML model using DataBrew.

Preparing the data with DataBrew

Let’s start with the shots dataset. I first create a project on the DataBrew console and upload this dataset.

I first create a project on the DataBrew console and upload this dataset.

When my session is ready, I should see all the columns and some analytics that give me useful insights about my dataset.

When my session is ready, I should see all the columns and some analytics that give me useful insights about my dataset.

The player_position column is recorded as [x,y] coordinates with x between 0–1000 and y between 0–500. The following image shows how a player is positioned based on this data.

The following image shows how a player is positioned based on this data.

In CSV, this is recorded as a string and has two values. We want to extract the values and combine them into a single value that can be represented as a number. This can help our model draw better conclusions from this feature. We can start preparing our data by completing the following steps:

  1. Using the Remove values transform, I remove the opening and closing brackets from the entries of the player_position column.

Using the Remove values transform, I remove the opening and closing brackets from the entries of the player_position column

  1. I split this column by a comma to generate two columns that hold the x and y coordinates.

I split this column by a comma to generate two columns that hold the x and y coordinates.

  1. We also need to convert these newly generated columns from strings to numbers.

We also need to convert these newly generated columns from strings to numbers.

Now we can generate a single value using the two columns that we generated. Imagine we want to break down the whole soccer field into 1×1 squares. If we think about the field similar to rows and columns, we can calculate the number for each square as such: x * 500 + y.

The benefit of this approach is that squares with higher numbers tend to be closer to the opponent’s goal, and this single feature can help our model draw good correlations between a player’s location and event outcomes. Note that the squares in the following image aren’t drawn to scale.

Note that the squares in the following image aren’t drawn to scale.

We can calculate the square numbers by using the the Functions transform Multiply and then Sum.

  1. First I multiple the x coordinate by 500.

First I multiple the x coordinate by 500.

  1. Using the ADD function, I sum this generated column with the y coordinate.

Using the ADD function, I sum this generated column with the y coordinate.

  1. We apply the same transforms to the goal_keeper position to achieve a single number for that feature as well.

We apply the same transforms to the goal_keeper position to achieve a single number for that feature as well.

Next, let’s take a look at the player_angle column.

Next, let’s take a look at the player_angle column.

When positioned on the lower half of the field, a positive angle likely presents a better opportunity to score, and when on the top half, a negative angle faces the players more toward the goal. Another point to consider is the player’s strong or weak foot. The bottom half presents a wide angle for a left-footed player and the top half does so for a right-footed player. Angle in combination with strong foot can help our model draw good conclusions. We add the information on players’ strong or weak foot later in this post.

We have three different situations recorded in our dataset.

  1. We apply one-hot-encoding to this column to refactor the situations as 1 (True) or 0 (False) values suitable for training our model.

We apply one-hot-encoding to this column to refactor the situations as 1 (True) or 0 (False) values suitable for training our model.

  1. For the shots dataset, we change the results column to 0s and 1s using the custom flag transform.
  2. Because this is what we want to predict, I name the column y.

We apply one-hot-encoding to this column to refactor the situations as 1 (True) or 0 (False) values suitable for training our model.

We can enrich the data further by joining this dataset with the player dataset to take each player’s qualities into consideration. You may have noticed that we generated a lot of extra columns while applying the previous transforms. We can address that in one step while we do the join and clean things up a bit.

  1. When applying the join, we can connect the players dataset.

  1. I use an inner join and choose player_id as my key.
  2. I also only select the columns that I’m interested in.

You can select more or fewer columns depending on what features you want to feed into your model. For instance, I’m not selecting the player’s nationality, but you may want your model to take that into consideration. That’s really up to you, so feel free to play around with your options.

You can select more or fewer columns depending on what features you want to feed into your model.

  1. I deselect the extra columns from the shots dataset and only select the following from the players dataset:
    1. age
    2. overall
    3. preferred_foot
    4. weak_foot
    5. shooting
    6. attacking_finishing
    7. skill_fk_accuracy
    8. movement_acceleration
    9. movement_sprint_speed
    10. power_shot_power
    11. mentality_penalties 

We’re almost done. We just need to apply a few transforms to the newly added player columns.

  1. I one-hot-encode preferred foot.

I one-hot-encode preferred foot.

We can normalize some of the columns depending on the ML model that we want to run on this dataset afterwards. I want to train a basic logistic regression model.

  1. I use min-max normalization on most columns to scale values between 0–1.

Depending on your model, it may make more sense to center around 0, use a custom range, or apply z-score for your normalization.

  1. I also apply mean normalization to the player angle.

I also apply mean normalization to the player angle.

  1. Now that I have the normalized columns, I can delete all their source columns.

Now that I have the normalized columns, I can delete all their source columns.

  1. Lastly, I move the result column all the way to the end because this is my output column (what I intend to predict).

Lastly, I move the result column all the way to the end because this is my output column (what I intend to predict).

  1. Now we have a complete recipe and it’s time to run a job to apply the steps on the full dataset and generate an output file to use to train our model.

Now we have a complete recipe and it’s time to run a job to apply the steps on the full dataset and generate an output file to use to train our model.

Training the model

When the job is finished, I retrieve the output from my Amazon Simple Storage Service (Amazon S3) bucket. This rich dataset is now ready to be fed into a model. Logistic regression or Support Vector Machines (SVM) could be good candidates for our dataset. You could use Amazon SageMaker to train a model and generate a probability of scoring per event. The following screenshot shows a basic logistic regression model created using scikit-learn.

The following screenshot shows a basic logistic regression model created using scikit-learn.

We see an approximately 80% probability that this model correctly predicts a scoring opportunity. You may get even higher accuracy using SVM. Feel free to try those or edit one of your data preparation steps and see how it affects your model accuracy.

Conclusion

In this post, we started with some raw fictionalized data of soccer shots and players. We framed the problem based on our domain knowledge and the data available. We used DataBrew to rapidly and visually connect the dots and forge the original datasets into an enriched form that could be used to train an ML model.

I encourage you to apply the same methodology to a problem domain that interests you and see how DataBrew can speed up your workflow.


About the Author

Arash RowshanArash Rowshan is a Software Engineer at Amazon Web Services. He’s passionate about big data and applications of ML. Most recently he was part of the team that launched AWS Glue DataBrew. Any time he’s not at his desk, he’s likely playing soccer somewhere. You can follow him on Twitter @OSO_Arash.

 

 

Orchestrating an AWS Glue DataBrew job and Amazon Athena query with AWS Step Functions

Post Syndicated from Sakti Mishra original https://aws.amazon.com/blogs/big-data/orchestrating-an-aws-glue-databrew-job-and-amazon-athena-query-with-aws-step-functions/

As the industry grows with more data volume, big data analytics is becoming a common requirement in data analytics and machine learning (ML) use cases. Also, as we start building complex data engineering or data analytics pipelines, we look for a simpler orchestration mechanism with graphical user interface-based ETL (extract, transform, load) tools.

Recently, AWS announced the general availability of AWS Glue DataBrew, a new visual data preparation tool that helps you clean and normalize data without writing code. This reduces the time it takes to prepare data for analytics and ML by up to 80% compared to traditional approaches to data preparation.

Regarding orchestration or workflow management, AWS provides AWS Step Functions, a serverless function orchestrator that makes it easy to build a workflow by integrating different AWS services like AWS Lambda, Amazon Simple Notification Service (Amazon SNS), AWS Glue, and more. With its built-in operational controls, Step Functions manages sequencing, error handling, retry logic, and states, removing a significant operational burden from your team.

Today, we’re launching Step Functions support for DataBrew, which means you can now invoke DataBrew jobs in your Step Functions workflow to build an end-to-end ETL pipeline. Recently, Step Functions also started supporting Amazon Athena integration, which means that you can submit SQL queries to the Athena engine through a Step Functions state.

In this post, we walk through a solution where we integrate a DataBrew job for data preparation, invoke a series of Athena queries for data refresh, and integrate Amazon QuickSight for business reporting. The whole solution is orchestrated through Step Functions and is invoked through Amazon EventBridge.

Use case overview

For our use case, we use two public datasets. The first dataset is a sales pipeline dataset, which contains a list of over 20,000 sales opportunity records for a fictitious business. Each record has fields that specify the following:

  • A date, potentially when an opportunity was identified
  • The salesperson’s name
  • A market segment to which the opportunity belongs
  • Forecasted monthly revenue

The second dataset is an online marketing metrics dataset. This dataset contains records of marketing metrics, aggregated by day. The metrics describe user engagement across various channels, such as websites, mobile, and social media, plus other marketing metrics. The two datasets are unrelated, but for the purpose of this post, we assume that they’re related.

For our use case, these sales and marketing CSV files are maintained by your organization’s Marketing team, which uploads the updated full CSV file to Amazon Simple Storage Service (Amazon S3) every month. The aggregated output data is created through a series of data preparation steps, and the business team uses the output data to create business intelligence (BI) reports.

Architecture overview

To automate the complete process, we use the following architecture, which integrates Step Functions for orchestration, DataBrew for data preparation, Athena for data analysis with standard SQL, and QuickSight for business reporting. In addition, we use Amazon SNS for sending notifications to users, and EventBridge is integrated to schedule running the Step Functions workflow.

We use Amazon SNS for sending notifications to users, and EventBridge is integrated to schedule running the Step Functions workflow.

The workflow includes the following steps:

  • Step 1 – The Marketing team uploads the full CSV file to an S3 input bucket every month.
  • Step 2 – An EventBridge rule, scheduled to run every month, triggers the Step Functions state machine.
  • Steps 3 and 4 – We receive two separate datasets (sales data and marketing data), so Step Functions triggers two parallel DataBrew jobs, which create additional year, month, and day columns from the existing date field and uses those three columns for partitioning. The jobs write the final output to our S3 output bucket.
  • Steps 5, 6, 7, 8 – After the output data is written, we can create external tables on top of it with Athena create table statements and then load partitions with MCSK REPAIR commands. After the AWS Glue Data Catalog tables are created for sales and marketing, we run an additional query through Athena, which merges these two tables by year and month to create another table with aggregated output.
  • Steps 9 and 10 – As the last step of the Step Functions workflow, we send a notification to end-users through Amazon SNS to notify them that the data refresh job completed successfully.
  • Steps 11, 12, 13 – After the aggregated table data is refreshed, business users can use QuickSight for BI reporting, which fetches data through Athena. Data analysts can also use Athena to analyze the complete refreshed dataset.

Prerequisites

Before beginning this tutorial, make sure you have the required permissions to create the resources required as part of the solution.

Additionally, create the S3 input and output buckets with the required subfolders to capture the sales and marketing data, and upload the input data into their respective folders.

Creating a DataBrew project

To create a DataBrew project for the marketing data, complete the following steps:

  1. On the DataBrew console, choose Projects.
  2. Choose Create a project.
  3. For Project name, enter a name (for this post, marketing-data-etl).
  4. For Select a dataset, select New dataset.

For Select a dataset, select New dataset.

  1. For Enter your source from S3, enter the S3 path of the marketing input CSV.

For Enter your source from S3, enter the S3 path of the marketing input CSV.

  1. Under Permissions, for Role name, choose an AWS Identity and Access Management (IAM) role that allows DataBrew to read from your Amazon S3 input location.

You can choose a role if you already created one, or create a new one. Please read here for steps to create the IAM role.

  1. After the dataset is loaded, on the Functions menu, choose Date functions.
  2. Choose YEAR.

Choose YEAR.

  1. Apply the year function on the date column to create a new column called year.

  1. Repeat these steps to create month and day columns.

Repeat these steps to create month and day columns.

For our use case, we created a few new columns that we plan to use for partitioning, but you can integrate additional transformations as needed.

  1. After you have finished applying all your transformations, choose Publish on the recipe.
  2. Provide a description of the recipe version and choose Publish.

Creating a DataBrew job

Now that our recipe is ready, we can create a job for it, which gets invoked through our Step Functions state machine.

  1. On the DataBrew console, choose Jobs.
  2. Choose Create a job.
  3. For Job name¸ enter a name (for example, marketing-data-etl).

Your recipe is already linked to the job.

  1. Under Job output settings¸ for File type, choose your final storage format (for this post, we choose PARQUET).
  2. For S3 location, enter your final S3 output bucket path.
  3. For Compression, choose the compression type you want to apply (for this post, we choose Snappy).
  4. Under Additional configurations, for Custom partition by column values, choose year, month, and day.
  5. For File output storage, select Replace output files for each job run.

We choose this option because our use case is to do a full refresh.

We choose this option because our use case is to do a full refresh.

  1. Under Permissions, for Role name¸ choose your IAM role.
  2. Choose Create job.

We choose this because we don’t want to run it now; we plan to invoke it through Step Functions.

We choose this because we don’t want to run it now; we plan to invoke it through Step Functions.

  1. When your marketing job is ready, repeat the same steps for your sales data, using the sales data output file location as needed.

Creating a Step Functions state machine

We’re now ready to create a Step Functions state machine for the complete flow.

  1. On the Step Functions console, choose Create state machine.
  2. For Define state machine¸ select Author with code snippets.
  3. For Type, choose Standard.

For Type, choose Standard.

In the Definition section, Step Functions provides a list of service actions that you can use to automatically generate a code snippet for your state machine’s state. The following screenshot shows that we have options for Athena and DataBrew, among others.

  1. For Generate code snippet, choose AWS Glue DataBrew: Start a job run.

4. For Generate code snippet, choose AWS Glue DataBrew: Start a job run.

  1. For Job name, choose Select job name from a list and choose your DataBrew job.

The JSON snippet appears in the Preview pane.

  1. Select Wait for DataBrew job runs to complete.
  2. Choose Copy to clipboard.

Choose Copy to clipboard.

  1. Integrate the code into the final state machine JSON code:
    {
       "Comment":"Monthly Refresh of Sales Marketing Data",
       "StartAt":"Refresh Sales Marketing Data",
       "States":{
          "Refresh Sales Marketing Data":{
             "Type":"Parallel",
             "Branches":[
                {
                   "StartAt":"Sales DataBrew ETL Job",
                   "States":{
                      "Sales DataBrew ETL Job":{
                         "Type":"Task",
                         "Resource":"arn:aws:states:::databrew:startJobRun.sync",
                         "Parameters":{
                            "Name":"sales-data"
                         },
                         "Next":"Drop Old Sales Table"
                      },
                      "Drop Old Sales Table":{
                         "Type":"Task",
                         "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
                         "Parameters":{
                            "QueryString":"DROP TABLE IF EXISTS sales_data_output",
                            "WorkGroup":"primary",
                            "ResultConfiguration":{
                               "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                            }
                         },
                         "Next":"Create Sales Table"
                      },
                      "Create Sales Table":{
                         "Type":"Task",
                         "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
                         "Parameters":{
                            "QueryString":"CREATE EXTERNAL TABLE `sales_data_output`(`date` string, `salesperson` string, `lead_name` string, `segment` string, `region` string, `target_close` string, `forecasted_monthly_revenue` int,   `opportunity_stage` string, `weighted_revenue` int, `closed_opportunity` boolean, `active_opportunity` boolean, `latest_status_entry` boolean) PARTITIONED BY (`year` string,`month` string, `day` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION  's3://<your-bucket-name>/sales-pipeline/transformed/sales/' TBLPROPERTIES ('classification'='parquet', 'compressionType'='none', 'typeOfData'='file')",
                            "WorkGroup":"primary",
                            "ResultConfiguration":{
                               "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                            }
                         },
                         "Next":"Load Sales Table Partitions"
                      },
                      "Load Sales Table Partitions":{
                         "Type":"Task",
                         "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
                         "Parameters":{
                            "QueryString":"MSCK REPAIR TABLE sales_data_output",
                            "WorkGroup":"primary",
                            "ResultConfiguration":{
                               "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                            }
                         },
                         "End":true
                      }
                   }
                },
                {
                   "StartAt":"Marketing DataBrew ETL Job",
                   "States":{
                      "Marketing DataBrew ETL Job":{
                         "Type":"Task",
                         "Resource":"arn:aws:states:::databrew:startJobRun.sync",
                         "Parameters":{
                            "Name":"marketing-data-etl"
                         },
                         "Next":"Drop Old Marketing Table"
                      },
                      "Drop Old Marketing Table":{
                         "Type":"Task",
                         "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
                         "Parameters":{
                            "QueryString":"DROP TABLE IF EXISTS marketing_data_output",
                            "WorkGroup":"primary",
                            "ResultConfiguration":{
                               "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                            }
                         },
                         "Next":"Create Marketing Table"
                      },
                      "Create Marketing Table":{
                         "Type":"Task",
                         "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
                         "Parameters":{
                            "QueryString":"CREATE EXTERNAL TABLE `marketing_data_output`(`date` string, `new_visitors_seo` int, `new_visitors_cpc` int, `new_visitors_social_media` int, `return_visitors` int, `twitter_mentions` int,   `twitter_follower_adds` int, `twitter_followers_cumulative` int, `mailing_list_adds_` int,   `mailing_list_cumulative` int, `website_pageviews` int, `website_visits` int, `website_unique_visits` int,   `mobile_uniques` int, `tablet_uniques` int, `desktop_uniques` int, `free_sign_up` int, `paid_conversion` int, `events` string) PARTITIONED BY (`year` string, `month` string, `day` string) ROW FORMAT SERDE   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION  's3://<your-bucket-name>/sales-pipeline/transformed/marketing/' TBLPROPERTIES ('classification'='parquet', 'compressionType'='none', 'typeOfData'='file')",
                            "WorkGroup":"primary",
                            "ResultConfiguration":{
                               "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                            }
                         },
                         "Next":"Load Marketing Table Partitions"
                      },
                      "Load Marketing Table Partitions":{
                         "Type":"Task",
                         "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
                         "Parameters":{
                            "QueryString":"MSCK REPAIR TABLE marketing_data_output",
                            "WorkGroup":"primary",
                            "ResultConfiguration":{
                               "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                            }
                         },
                         "End":true
                      }
                   }
                }
             ],
             "Next":"Drop Old Summerized Table"
          },
          "Drop Old Summerized Table":{
             "Type":"Task",
             "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
             "Parameters":{
                "QueryString":"DROP TABLE default.sales_marketing_revenue",
                "WorkGroup":"primary",
                "ResultConfiguration":{
                   "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                }
             },
             "Next":"Create Summerized Output"
          },
          "Create Summerized Output":{
             "Type":"Task",
             "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
             "Parameters":{
                "QueryString":"CREATE TABLE default.sales_marketing_revenue AS SELECT * FROM (SELECT sales.year, sales.month, total_paid_conversion, total_weighted_revenue FROM (SELECT year, month, sum(paid_conversion) as total_paid_conversion FROM default.marketing_data_output group by year, month) sales INNER JOIN (SELECT year, month, sum(weighted_revenue) as total_weighted_revenue FROM default.sales_data_output group by year, month) marketing on sales.year=marketing.year AND sales.month=marketing.month) ORDER BY year DESC, month DESC",
                "WorkGroup":"primary",
                "ResultConfiguration":{
                   "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                }
             },
             "Next":"Notify Users"
          },
          "Notify Users":{
             "Type":"Task",
             "Resource":"arn:aws:states:::sns:publish",
             "Parameters":{
                "Message":{
                   "Input":"Monthly sales marketing data refreshed successfully!"
                },
                "TopicArn":"arn:aws:sns:us-east-1:<account-id>:<sns-topic-name>"
             },
             "End":true
          }
       }
    }

The following diagram is the visual representation of the state machine flow. With the Step Functions parallel task type, we created two parallel job runs for the sales and marketing data. When both flows are complete, they join to create an aggregated table in Athena and send an SNS notification to the end-users.

The following diagram is the visual representation of the state machine flow.

Creating an EventBridge scheduling rule

Now let’s integrate EventBridge to schedule the invocation of our Step Functions state machine on the first day of every month.

  1. On the EventBridge console, under Events, choose Rules.
  2. Choose Create a rule.
  3. For Name, enter a name (for example, trigger-step-funcion-rule).
  4. Under Define pattern, select Schedule.
  5. Select Cron expression.
  6. Enter 001** to specify that the job runs on the first day of every month at midnight.

  1. In the Select targets section, for Target, choose Step Functions state machine
  2. For State machine, choose your state machine.

For State machine, choose your state machine.

Now when the step function is being invoked, its run flow looks like the following screenshot, where blue represents the DataBrew jobs currently running.
Now when the step function is being invoked, its run flow looks like the following screenshot, where blue represents the DataBrew jobs currently running.

When the job is complete, all the steps should be green.

When the job is complete, all the steps should be green.

You also receive the notification “Monthly sales marketing data refreshed successfully!”

Running an Athena query

Let’s validate the aggregated table output in Athena by running a simple SELECT query. The following screenshot shows the output.

Let’s validate the aggregated table output in Athena by running a simple SELECT query.

Creating reports in QuickSight

Now let’s do our final step of the architecture, which is creating BI reports through QuickSight by connecting to the Athena aggregated table.

  1. On the QuickSight console, choose Athena as your data source.

On the QuickSight console, choose Athena as your data source.

  1. Select the database and table name you have in Athena.

Select the database and table name you have in Athena.

Now you can create a quick report to visualize your output, as shown in the following screenshot.

Now you can create a quick report to visualize your output, as shown in the following screenshot.

 

If QuickSight is using SPICE storage, you need to refresh the dataset in QuickSight after you receive notification about the completion of data refresh. If the QuickSight report is running an Athena query for every request, you might see a “table not found” error when data refresh is in progress. We recommend leveraging SPICE storage to get better performance.

Conclusion

This post explains how to integrate a DataBrew job and Athena queries with Step Functions to implement a simple ETL pipeline that refreshes aggregated sales and marketing data for BI reporting.

I hope this gives you a great starting point for using this solution with your datasets and applying business rules to build a complete serverless data analytics pipeline.


About the Author

Sakti Mishra

Sakti Mishra is a Senior Data Lab Solution Architect at AWS, where he helps customers architect data analytics solutions, which gives them an accelerated path towards modernization initiatives. Outside of work, Sakti enjoys learning new technologies, watching movies, and visiting places.

Testing data quality at scale with PyDeequ

Post Syndicated from Calvin Wang original https://aws.amazon.com/blogs/big-data/testing-data-quality-at-scale-with-pydeequ/

You generally write unit tests for your code, but do you also test your data? Incoming data quality can make or break your application. Incorrect, missing, or malformed data can have a large impact on production systems. Examples of data quality issues include the following:

  • Missing values can lead to failures in production system that require non-null values (NullPointerException)
  • Changes in the distribution of data can lead to unexpected outputs of machine learning (ML) models
  • Aggregations of incorrect data can lead to wrong business decisions

In this post, we introduce PyDeequ, an open-source Python wrapper over Deequ (an open-source tool developed and used at Amazon). Deequ is written in Scala, whereas PyDeequ allows you to use its data quality and testing capabilities from Python and PySpark, the language of choice of many data scientists. PyDeequ democratizes and extends the power of Deequ by allowing you to use it alongside the many data science libraries that are available in that language. Furthermore, PyDeequ allows for fluid interface with Pandas DataFrames as opposed to restricting within Apache Spark DataFrames.

Deequ allows you to calculate data quality metrics on your dataset, define and verify data quality constraints, and be informed about changes in the data distribution. Instead of implementing checks and verification algorithms on your own, you can focus on describing how your data should look. Deequ supports you by suggesting checks for you. Deequ is implemented on top of Apache Spark and is designed to scale with large datasets (billions of rows) that typically live in a data lake, distributed file system, or a data warehouse. PyDeequ gives you access to this capability, but also allows you to use it from the familiar environment of your Python Jupyter notebook.

Deequ at Amazon

Deequ is used internally at Amazon to verify the quality of many large production datasets. Dataset producers can add and edit data quality constraints. The system computes data quality metrics on a regular basis (with every new version of a dataset), verifies constraints defined by dataset producers, and publishes datasets to consumers in case of success. In error cases, dataset publication can be stopped, and producers are notified to take action. Data quality issues don’t propagate to consumer data pipelines, reducing their blast radius.

Deequ is also used within Amazon SageMaker Model Monitor. Now with the availability of PyDeequ, you can use it from a broader set of environments— Amazon SageMaker notebooks, AWS Glue, Amazon EMR, and more.

Overview of PyDeequ

Let’s look at PyDeequ’s main components, and how they relate to Deequ (shown in the following diagram):

  • Metrics computation – Deequ computes data quality metrics, that is, statistics such as completeness, maximum, or correlation. Deequ uses Spark to read from sources such as Amazon Simple Storage Service (Amazon S3) and compute metrics through an optimized set of aggregation queries. You have direct access to the raw metrics computed on the data.
  • Constraint verification – As a user, you focus on defining a set of data quality constraints to be verified. Deequ takes care of deriving the required set of metrics to be computed on the data. Deequ generates a data quality report, which contains the result of the constraint verification.
  • Constraint suggestion – You can choose to define your own custom data quality constraints or use the automated constraint suggestion methods that profile the data to infer useful constraints.
  • Python wrappers – You can call each Deequ function using Python syntax. The wrappers translate the commands to the underlying Deequ calls and return their response.

Let’s look at PyDeequ’s main components, and how they relate to Deequ (shown in the following diagram)

Use case overview

As a running example, we use a customer review dataset provided by Amazon on Amazon S3. We intentionally follow the example in the post Test data quality at scale with Deequ to show the similarity in functionality and implementation. We begin the way many data science projects do: with initial data exploration and assessment in a Jupyter notebook.

If you’d like to follow along with a live Jupyter notebook, check out the notebook on our GitHub repo.

During the data exploration phase, you want to easily answer some basic questions about the data:

  • Are the fields that are supposed to contain unique values really unique? Are there fields that are missing values?
  • How many distinct categories are there in the categorical fields?
  • Are there correlations between some key features?
  • If there are two supposedly similar datasets (such as different categories or different time periods), are they really similar?

We also show you how to scale this approach to large-scale datasets, using the same code on an Amazon EMR cluster. This is how you’d likely do your ML training, and later as you move into a production setting.

Starting a PySpark session in a SageMaker notebook

To follow along with this post, open up a SageMaker notebook instance, clone the PyDeequ GitHub on the Sagemaker notebook instance, and run the test_data_quality_at_scale.ipynb notebook from the tutorials directory from the PyDeequ repository.

Let’s install our dependencies first in a terminal window:

$ pip install pydeequ

Next, in a cell of our SageMaker notebook, we need to create a PySpark session:

import sagemaker_pyspark
import pydeequ

classpath = ":".join(sagemaker_pyspark.classpath_jars())

spark = (SparkSession
    .builder
    .config("spark.driver.extraClassPath", classpath)
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate())

Loading data

Load the dataset containing reviews for the category Electronics into our Jupyter notebook:

df = spark.read.parquet("s3a://amazon-reviews-pds/parquet/product_category=Electronics/")

After you load the DataFrame, you can run df.printSchema() to view the schema of the dataset:

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- year: integer (nullable = true)

Data analysis

Before we define checks on the data, we want to calculate some statistics on the dataset; we call them metrics. As with Deequ, PyDeequ supports a rich set of metrics. For more information, see Test data quality at scale with Deequ or the GitHub repo. In the following example, we use the AnalysisRunner to capture the metrics you’re interested in:

from pydeequ.analyzers import *

analysisResult = AnalysisRunner(spark) \
                    .onData(df) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Completeness("review_id")) \
                    .addAnalyzer(ApproxCountDistinct("review_id")) \
                    .addAnalyzer(Mean("star_rating")) \
                    .addAnalyzer(Compliance("top star_rating", "star_rating >= 4.0")) \
                    .addAnalyzer(Correlation("total_votes", "star_rating")) \
                    .addAnalyzer(Correlation("total_votes", "helpful_votes")) \
                    .run()
                    
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()

The following table summarizes our findings.

Name Instance Value
ApproxCountDistinct review_id 3010972
Completeness review_id 1
Compliance top star_rating 0.74941
Correlation helpful_votes,total_votes 0.99365
Correlation total_votes,star_rating -0.03451
Mean star_rating 4.03614
Size * 3120938

From this, we learn the following:

  • review_id has no missing values and approximately 3,010,972 unique values
  • 9% of reviews have a star_rating of 4 or higher
  • total_votes and star_rating are not correlated
  • helpful_votes and total_votes are strongly correlated
  • The average star_rating is 4.0
  • The dataset contains 3,120,938 reviews

Defining and running tests for data

After analyzing and understanding the data, we want to verify that the properties we have derived also hold for new versions of the dataset. By defining assertions on the data distribution as part of a data pipeline, we can ensure that every processed dataset is of high quality, and that any application consuming the data can rely on it.

For writing tests on data, we start with the VerificationSuite and add checks on attributes of the data. In this example, we test for the following properties of our data:

  • At least 3 million rows in total
  • review_id is never NULL
  • review_id is unique
  • star_rating has a minimum of 1.0 and maximum of 5.0
  • marketplace only contains US, UK, DE, JP, or FR
  • year does not contain negative values

This is the code that reflects the previous statements. For information about all available checks, see the GitHub repo. You can run this directly in the Spark shell as previously explained:

from pydeequ.checks import *
from pydeequ.verification import *

check = Check(spark, CheckLevel.Warning, "Amazon Electronics Review Check")

checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check.hasSize(lambda x: x >= 3000000) \
        .hasMin("star_rating", lambda x: x == 1.0) \
        .hasMax("star_rating", lambda x: x == 5.0)  \
        .isComplete("review_id")  \
        .isUnique("review_id")  \
        .isComplete("marketplace")  \
        .isContainedIn("marketplace", ["US", "UK", "DE", "JP", "FR"]) \
        .isNonNegative("year")) \
    .run()
    
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()

After calling run(), PyDeequ translates your test description into Deequ, which translates it into a series of Spark jobs that are run to compute metrics on the data. Afterwards, it invokes your assertion functions (for example, lambda x: x == 1.0 for the minimum star rating check) on these metrics to see if the constraints hold on the data. The following table summarizes our findings.

Constraint constraint_status constraint_message
SizeConstraint(Size(None)) Success
MinimumConstraint(Minimum(star_rating,None)) Success
MaximumConstraint(Maximum(star_rating,None)) Success
CompletenessConstraint(Completeness(review_id,None)) Success
UniquenessConstraint(Uniqueness(List(review_id))) Failure Value: 0.9926566948782706 does not meet the constraint requirement!
CompletenessConstraint(Completeness(marketplace,None)) Success
ComplianceConstraint(Compliance(marketplace contained in US,UK,DE,JP,FR,marketplace IS NULL OR marketplace IN (‘US’,’UK’,’DE’,’JP’,’FR’),None)) Success
ComplianceConstraint(Compliance(year is non-negative,COALESCE(year, 0.0) >= 0,None)) Success

Interestingly, the review_id column isn’t unique, which resulted in a failure of the check on uniqueness. We can also look at all the metrics that Deequ computed for this check by running the following:

checkResult_df = VerificationResult.successMetricsAsDataFrame(spark, checkResult)
checkResult_df.show()

The following table summarizes our findings.

Name Instance Value
Completeness review_id 1
Completeness marketplace 1
Compliance marketplace contained in US,UK,DE,JP,FR 1
Compliance year is non-negative 1
Maximum star_rating 5
Minimum star_rating 1
Size * 3120938
Uniqueness review_id 0.99266

Automated constraint suggestion

If you own a large number of datasets or if your dataset has many columns, it may be challenging for you to manually define appropriate constraints. Deequ can automatically suggest useful constraints based on the data distribution. Deequ first runs a data profiling method and then applies a set of rules on the result. For more information about how to run a data profiling method, see the GitHub repo.

from pydeequ.suggestions import *

suggestionResult = ConstraintSuggestionRunner(spark) \
             .onData(df) \
             .addConstraintRule(DEFAULT()) \
             .run()

# Constraint Suggestions in JSON format
print(json.dumps(suggestionResult, indent=2))

The result contains a list of constraints with descriptions and Python code, so that you can directly apply it in your data quality checks. Call print(json.dumps(result_json)) to inspect the suggested constraints; the following table shows a subset.

Column Constraint Python code
customer_id customer_id is not null .isComplete("customer_id")
customer_id customer_id has type Integral .hasDataType("customer_id", ConstrainableDataTypes.Integral)
customer_id customer_id has no negative values .isNonNegative("customer_id")
helpful_votes helpful_votes is not null .isComplete("helpful_votes")
helpful_votes helpful_votes has no negative values .isNonNegative("helpful_votes")
marketplace marketplace has value range “US”, “UK”, “DE”, “JP”, “FR” .isContainedIn("marketplace", ["US", "UK", "DE", "JP", "FR"])
product_title product_title is not null .isComplete("product_title")
star_rating star_rating is not null .isComplete("star_rating")
star_rating star_rating has no negative values .isNonNegative("star_rating")
vine vine has value range “N”, “Y” .isContainedIn("vine", ["N", "Y"])

You can explore the other tutorials in the PyDeequ GitHub repo.

Scaling to production

So far, we’ve shown you how to use these capabilities in the context of data exploration using a Jupyter notebook running on a SageMaker notebook instance. As your project matures, you need to use the same capabilities on larger and larger datasets, and in a production environment. With PyDeequ, it’s easy to make that transition. The following diagram illustrates deployment options for local and production purposes on AWS.

The following diagram illustrates deployment options for local and production purposes on AWS.

Amazon EMR and AWS Glue interface with PyDeequ through the PySpark drivers that PyDeequ utilizes as its main engine. PyDeequ can run as a PySpark application in both contexts when the Deequ JAR is added the Spark context. You can run PyDeequ’s data validation toolkit after the Spark context and drivers are configured and your data is loaded into a DataFrame. We describe the Amazon EMR configuration options and use cases in this section (configurations 2 and 3 in the diagram).

Data exploration from a SageMaker notebook via an EMR cluster

As shown in configuration 2 in the diagram, you can connect to an EMR cluster from a SageMaker notebook to run PyDeequ. This enables you to explore much larger volumes of data than you can using a single notebook. Your Amazon EMR cluster must be running Spark v2.4.6, available with Amazon EMR version 5.31 or higher, in order to work with PyDeequ. After you have a running cluster that has those components and a SageMaker notebook, you configure a SparkSession object using the following template to connect to your cluster. For more information about connecting a SageMaker notebook to Amazon EMR or the necessary IAM permissions, see Submitting User Applications with spark-submit.

In the SageMaker notebook, run the following JSON in a cell before you start your SparkSession to configure your EMR cluster:

%%configure -f
{ "conf":{
          "spark.pyspark.python": "python3",
          "spark.pyspark.virtualenv.enabled": "true",
          "spark.pyspark.virtualenv.type":"native",
          "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv",
          "spark.jars.packages": "com.amazon.deequ:deequ:1.0.3",
          "spark.jars.excludes": "net.sourceforge.f2j:arpack_combined_all"
         }
}

Start your SparkSession object in a cell after the preceding configuration by running spark. Then install PyDeequ onto your EMR cluster using the SparkContext (default named sc) with the following command:

sc.install_pypi_package('pydeequ')

Now you can start using PyDeequ from your notebook to run the same statements as before, but with much larger volumes of data.

Running a transient EMR cluster

Another way to leverage the power of an EMR cluster is to treat it as a transient cluster and run it in a headless configuration, as shown in configuration 3 in the diagram. We use spark-submit in an EMR add-step to run PyDeequ on Amazon EMR. For each of the following steps, make sure to replace the values in brackets accordingly.

  1. Create a bootstrap shell script and upload it to an S3 bucket. The following code is an example of pydeequ-emr-bootstrap.sh:
    #!/bin/bash
    
    sudo python3 -m pip install --no-deps pydeequ
    sudo python3 -m pip install pandas 

  1. Create an EMR cluster via the AWS Command Line Interface (AWS CLI):
    $ aws emr create-cluster \
    --name 'my-pydeequ-cluster' \
    --release-label emr-5.31.0 --applications Name=Spark Name=Hadoop Name=Hive Name=Livy Name=Pig Name=Hue 
    --use-default-roles \
    --instance-type m5.xlarge \
    --instance-count 2 \
    --bootstrap-actions \
        Path="s3://<S3_PATH_TO_BOOTSTRAP>/pydeequ-emr-bootstrap.sh",Name='install_pydeequ' \
    --visible-to-all-users \
    --enable-debugging \
    --ec2-attributes KeyName="<MY_SSH_KEY>",SubnetId="<MY_SUBNET>" \
    --auto-scaling-role EMR_AutoScaling_DefaultRole

  1. Create your PySpark PyDeequ run script and upload into Amazon S3. The following code is our example of pydeequ-test.py:
    import sys
    import pydeequ
    from pydeequ.checks import *
    from pydeequ.verification import *
    from pyspark.sql import SparkSession, Row
    
    if __name__ == "__main__":
    
        with SparkSession.builder.appName("pydeequ").getOrCreate() as spark:
    
            df = spark.sparkContext.parallelize([
                Row(a="foo", b=1, c=5),
                Row(a="bar", b=2, c=6),
                Row(a="baz", b=3, c=None)]).toDF()
    
            check = Check(spark, CheckLevel.Error, "Integrity checks")
    
            checkResult = VerificationSuite(spark) \
                .onData(df) \
                .addCheck(
                    check.hasSize(lambda x: x >= 3) \
                    .hasMin("b", lambda x: x == 0) \
                    .isComplete("c")  \
                    .isUnique("a")  \
                    .isContainedIn("a", ["foo", "bar", "baz"]) \
                    .isNonNegative("b")) \
                .run()
    
            checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
            checkResult_df.repartition(1).write.csv("s3a://<PATH_TO_OUTPUT>/pydeequ-out.csv", sep='|')

  1. When your cluster is running and in the WAITING stage, submit your Spark job to Amazon EMR via the AWS CLI:
    $ aws emr add-steps \
    --cluster-id <MY_CLUSTER_ID> \
    --steps Type=Spark,Name="pydeequ-spark-submit",Args=[--deploy-mode,cluster,--master,yarn,--conf,spark.yarn.submit.waitAppCompletion=true,--packages,com.amazon.deequ:deequ:1.0.3,--exclude-packages,net.sourceforge.f2j:arpack_combined_all,s3a://pydeequ-emr/setup/pydeequ-test.py],ActionOnFailure=CANCEL_AND_WAIT

Congratulations, you have now submitted a PyDeequ PySpark job to Amazon EMR. Give the job a few minutes to run, after which you can view your results at the S3 output path specified on the last line of pydeequ-test.py.

Afterwards, remember to clean up your results and spin down the EMR cluster using the following command:

$ aws emr terminate-clusters --cluster-ids <MY_CLUSTER_ID>

Now you can use Amazon EMR to process large datasets in batch using PyDeequ to plug into your pipelines and provide scalable tests on your data.

More examples on GitHub

You can find examples of more advanced features on the Deequ GitHub page:

  • Deequ provides more than data quality checks with fixed thresholds. Learn how to use anomaly detection on data quality metrics to apply tests on metrics that change over time.
  • Deequ offers support for storing and loading metrics. Learn how to use the MetricsRepository for this use case.
  • If your dataset grows over time or is partitioned, you can use Deequ’s incremental metrics computation For each partition, Deequ stores a state for each computed metric. To compute metrics for the union of partitions, Deequ can use these states to efficiently derive overall metrics without reloading the data.

Conclusion

This post showed you how to use PyDeequ for calculating data quality metrics, verifying data quality metrics, and profiling data to automate the configuration of data quality checks. PyDeequ is available via pip install and on GitHub now for you to build your own data quality management pipeline.

Learn more about the inner workings of Deequ in the VLDB 2018 paper Automating large-scale data quality verification.

Stay tuned for another post demonstrating production workflows on AWS Glue.


About the Authors

Calvin Wang is a Data Scientist at AWS AI/ML. He holds a B.S. in Computer Science from UC Santa Barbara and loves using machine learning to build cool stuff.

 

 

Chris Ghyzel is a Data Engineer for AWS Professional Services. Currently, he is working with customers to integrate machine learning solutions on AWS into their production pipelines.

 

 

 

Veronika Megler, PhD, is Principal Data Scientist for Amazon.com Consumer Packaging. Until recently she was the Principal Data Scientist for AWS Professional Services. She enjoys adapting innovative big data, AI, and ML technologies to help companies solve new problems, and to solve old problems more efficiently and effectively. Her work has lately been focused more heavily on economic impacts of ML models and exploring causality.

Building a serverless data quality and analysis framework with Deequ and AWS Glue

Post Syndicated from Vara Bonthu original https://aws.amazon.com/blogs/big-data/building-a-serverless-data-quality-and-analysis-framework-with-deequ-and-aws-glue/

With ever-increasing amounts of data at their disposal, large organizations struggle to cope with not only the volume but also the quality of the data they manage. Indeed, alongside volume and velocity, veracity is an equally critical issue in data analysis, often seen as a precondition to analyzing data and guaranteeing its value. High-quality data is commonly described as fit for purpose and a fair representation of the real-world constructs it depicts. Ensuring data sources meet these requirements is an arduous task that is best addressed through an automated approach and adequate tooling.

Challenges when running data quality at scale can include choosing the right data quality tools, managing the rules and constraints to apply on top of the data, and taking on the large upfront cost of setting up infrastructure in production.

Deequ, an open-source data quality library developed internally at Amazon, addresses these requirements by defining unit tests for data that it can then scale to datasets with billions of records. It provides multiple features, like automatic constraint suggestions and verification, metrics computation, and data profiling. For more information about how Deequ is used at Amazon, see Test data quality data at scale with Deequ.

You need to follow several steps to implement Deequ in production, including building the infrastructure, writing custom AWS Glue jobs, profiling the data, and generating rules before applying them. In this post, we introduce an open-source Data Quality and Analysis Framework (DQAF) that simplifies this process and its orchestration. Built on top of Deequ, this framework makes it easy to create the data quality jobs that you need, manage the associated constraints through a web UI, and run them on the data as you ingest it into your data lake.

Architecture

As illustrated in the following architecture diagram, the DQAF exclusively uses serverless AWS technology. It takes a database and tables in the AWS Glue Data Catalog as inputs to AWS Glue jobs, and outputs various data quality metrics into Amazon Simple Storage Service (Amazon S3). Additionally, it saves time by automatically generating constraints on previously unseen data. The resulting suggestions are stored in Amazon DynamoDB tables and can be reviewed and amended at any point by data owners in the AWS Amplify managed UI. Amplify makes it easy to create, configure, and implement scalable web applications on AWS. The orchestration of these operations is carried out by an AWS Step Functions workflow. The code, artifacts, and an installation guide are available in the GitHub repository.

As illustrated in the following architecture diagram, the DQAF exclusively uses serverless AWS technology.

In this post, we walk through a deployment of the DQAF using some sample data. We assume you have a database in the AWS Glue Data Catalog hosting one or more tables in the same Region where you deploy the framework. We use a legislators database with two tables (persons_json and organizations_json) referencing data about United States legislators. For more information about this database, see Code Example: Joining and Relationalizing Data.

In this post, we walk through a deployment of the DQAF using some sample data.

Deploying the solution

Click on the button below to launch an AWS CloudFormation stack that deploys the solution in your AWS account in the last Region that was used:

The process takes 10–15 minutes to complete. You can verify that the framework was successfully deployed by checking that the CloudFormation stacks show the status CREATE_COMPLETE.

You can verify that the framework was successfully deployed by checking that the CloudFormation stacks show the status CREATE_COMPLETE.

Testing the data quality and analysis framework

The next step is to understand (profile) your test data and set up data quality constraints. Constraints can be defined as a set of rules to validate whether incoming data meets specific requirements along various dimensions (such as completeness, consistency, or contextual accuracy). Creating these rules can be a painful process if you have lots of tables with multiple columns, but DQAF makes it easy by sampling your data and suggesting the constraints automatically.

On the Step Functions console, locate the data-quality-sm state machine, which represents the entry point to data quality in the framework. When you provide a valid input, it starts a series of AWS Glue jobs running Deequ. This step function can be called on demand, on a schedule, or based on an event. You run the state machine by entering a value in JSON format.

You run the state machine by entering a value in JSON format.

First pass and automatic suggestion of constraints

After the step function is triggered, it calls the AWS Glue controller job, which is responsible for determining the data quality checks to perform. Because the submitted tables were never checked before, a first step is to generate data quality constraints on attributes of the data. In Deequ, this is done through an automatic suggestion of constraints, a process where data is profiled and a set of heuristic rules is applied to suggest constraints. It’s particularly useful when dealing with large multi-column datasets. In the framework, this operation is performed by the AWS Glue suggestions job, which logs the constraints into the DataQualitySuggestions DynamoDB table and outputs preliminary quality check results based on those suggestions into Amazon S3 in Parquet file format.

AWS Glue suggestions job

The Deequ suggestions job generates constraints based on three major dimensions:

  • Completeness – Measures the presence of null values, for example isComplete("gender") or isComplete("name")
  • Consistency – Consistency of data types and value ranges, for example .isUnique("id") or isContainedIn("gender", Array("female", "male"))
  • Statistics – Univariate dimensions in the data, for example .hasMax("Salary", “90000”) or .hasSize(_>=10)

The following table lists the available constraints that can be manually added in addition to the automatically suggested ones.

Constraints Argument Semantics
Dimension Completeness
isComplete column Check that there are no missing values in a column
hasCompleteness column, udf Custom validation of missing values in a column
Dimension Consistency
isUnique column Check that there are no duplicates in a column
hasUniqueness column, udf Custom validation of the unique value ratio in a column
hasDistinctness column, udf Custom validation of the unique row ratio in a column
isInRange column, value range Validation of the fraction of values that are in a valid range
hasConsistentType column Validation of the largest fraction of values that have the same type
isNonNegative column Validation whether all the values in a numeric column are non-negative
isLessThan column pair Validation whether all the values in the first column are always less than the second column
satisfies predicate Validation whether all the rows match predicate
satisfiesIf predicate pair Validation whether all the rows matching first predicate also match the second predicate
hasPredictability column, column(s), udf User-defined validation of the predictability of a column
Statistics (can be used to verify dimension consistency)
hasSize udf Custom validation of the number of records
hasTypeConsistency column, udf Custom validation of the maximum fraction of the values of the same datatype
hastCountDistinct column Custom validation of the number of distinct non null values in a column
hasApproxCountDistinct column, udf Custom validation of the approximate number of distinct non-null values
hasMin column, udf Custom validation of the column’s minimum value
hasMax column, udf Custom validation of the column’s maximum value
hasMean column, udf Custom validation of the column’s mean value
hasStandardDeviation column, udf Custom validation of the column’s standard deviation value
hasApproxQuantile column,quantile,udf Custom validation of a particular quantile of a column (approximate)
hasEntropy column, udf Custom validation of the column’s entropy
hasMutualInformation column pair,udf Custom validation of the column pair’s mutual information
hasHistogramValues column, udf Custom validation of the column’s histogram
hasCorrelation column pair,udf Custom validation of the column pair’s correlation

The following screenshot shows the DynamoDB table output with suggested constraints generated by the AWS Glue job.

The following screenshot shows the DynamoDB table output with suggested constraints generated by the AWS Glue job.

AWS Glue data profiler job

Deequ also supports single-column profiling of data, and its implementation scales to large datasets with billions of rows. As a result, we get a profile for each column in the data, which allows us to inspect the completeness of the column, the approximate number of distinct values, and the inferred datatype.

The controller triggers an AWS Glue data profiler job in parallel to the suggestions job. This profiler Deequ process runs three passes over the data and avoids any shuffles in order to easily scale to large datasets. Results are stored as Parquet files in the S3 data quality bucket.

When the controller job is complete, the second step in the data quality state machine is to crawl the Amazon S3 output data into a data_quality_db database in the AWS Glue Data Catalog, which is then immediately available to be queried in Amazon Athena. The following screenshot shows the list of tables created by this AWS Glue framework and a sample output from the data profiler results.

The following screenshot shows the list of tables created by this AWS Glue framework and a sample output from the data profiler results.

Reviewing and verifying data quality constraints

As good as Deequ is at suggesting data quality rules, the data stewards should first review the constraints before applying them in production. Because it may be cumbersome to edit large tables in DynamoDB directly, we have created a web app that enables you to add or amend the constraints. The changes are updated in the relevant DynamoDB tables in the background.

Accessing the web front end

To access the user interface, on the AWS Amplify console, choose the deequ-constraints app. Choosing the URL (listed as https://<env>.<appsync_app_id>.amplifyapp.com) opens the data quality constraints front end. After you complete the registration process with Amazon Cognito (create an account) and sign in, you see a UI similar to the following screenshot.

After you complete the registration process with Amazon Cognito (create an account) and sign in, you see a UI similar to the following screenshot.

It lists data quality constraint suggestions produced by the AWS Glue job in the previous step. Data owners can add or remove and enable or disable these constraints at any point via the UI. Suggestions are not enabled by default. This makes sure all constraints are human reviewed before they are processed. Choosing the check box enables a constraint.

Data analyzer (metric computations)

Alongside profiling, Deequ can also generate column-level statistics called data analyzer metrics (such as completeness, maximum, and correlation). They can help uncover data quality problems, for example by highlighting the share of null values in a primary key or the correlation between two columns.

The following table lists the metrics that you can apply to any column.

Metric Semantics
Dimension Completeness
Completeness Fraction of non-missing values in a column
Dimension Consistency
Size Number of records
Compliance Ratio of columns matching predicate
Uniqueness Unique value ratio in a column
Distinctness Unique row ratio in a column
ValueRange Value range verification for a column
DataType Data type inference for a column
Predictability Predictability of values in a column
Statistics (can be used to verify dimension consistency)
Minimum Minimal value in a column
Maximum Maximal value in a column
Mean Mean value in a column
StandardDeviation Standard deviation of the value distribution in a column
CountDistinct Number of distinct values in a column
ApproxCountDistinct Number of distinct values in a column
ApproxQuantile Approximate quantile of the value in a column
Correlation Correlation between two columns
Entropy Entropy of the value distribution in a column
Histogram Histogram of an optionally binned column
MutualInformation Mutual information between two columns

In the web UI, you can add these metrics on the Analyzers tab. In the following screenshot, we add an ApproxCountDistinct metric on an id column. Choosing Create analyzer inserts the record into the DataQualityAnalyzer table in DynamoDB and enables the constraint.

In the following screenshot, we add an ApproxCountDistinct metric on an id column.

AWS Glue verification job

We’re now ready to put our rules into production and can use Athena to look at the resultsYou can start running the step function with the same JSON as input:

{
  "glueDatabase": "legislators",
  "glueTables": "persons_json, organizations_json"
}

This time the AWS Glue verification job is triggered by the controller. This job performs two actions: it verifies the suggestion constraints and performs metric computations. You can immediately query the results in Athena under the constraints_verification_results table.

The following screenshot shows the verification output.

The following screenshot shows the verification output.

The following screenshot shows the metric computation results.

The following screenshot shows the metric computation results.

Summary

Dealing with large, real-world datasets requires a scalable and automated approach to data quality. Deequ is the tool of choice at Amazon when it comes to measuring the quality of large production datasets. It’s used to compute data quality metrics, suggest and verify constraints, and profile data.

This post introduced an open-source, serverless Data Quality and Analysis Framework that aims to simplify the process of deploying Deequ in production by setting up the necessary infrastructure and making it easy to manage data quality constraints. It enables data owners to generate automated data quality suggestions on previously unseen data that can then be reviewed and amended in a UI. These constraints serve as inputs to various AWS Glue jobs in order to produce data quality results queryable via Athena. Try this framework on your data and leave suggestions on how to improve it on our open-source GitHub repo.


About the Authors

Vara Bonthu is a Senior BigData/DevOps Architect for ProServe working with the Global Accounts team. He is passionate about big data and Kubernetes. He helps customers all over the world design, build, and migrate end-to-end data analytics and container-based solutions. In his spare time, he develops applications to help educate his 7-year-old autistic son.

 

Abdel Jaidi is a Data & Machine Learning Engineer for AWS Professional Services. He works with customers on Data & Analytics projects, helping them shorten their time to value and accelerate business outcomes. In his spare time, he enjoys participating in triathlons and walking dogs in parks in and around London.

7 most common data preparation transformations in AWS Glue DataBrew

Post Syndicated from Shilpa Mohan original https://aws.amazon.com/blogs/big-data/7-most-common-data-preparation-transformations-in-aws-glue-databrew/

For all analytics and ML modeling use cases, data analysts and data scientists spend a bulk of their time running data preparation tasks manually to get a clean and formatted data to meet their needs. We ran a survey among data scientists and data analysts to understand the most frequently used transformations in their data preparation workflow. AWS Glue DataBrew provides more than 250 built-in transformations which will make most of these tasks 80% faster. This blog covers use case based walkthroughs of how we can achieve the top 7 among those transformations in AWS Glue DataBrew.

This blog covers use case based walkthroughs of how we can achieve the top 7 among those transformations in AWS Glue DataBrew.

#1 Handling/Imputing missing values

Missing data is predominant in all datasets and can have a significant impact on the analytics or ML models using the data. Missing values in datasets can skew or bias the data and result in invalid conclusions. Handling missing values is one of the most frequently used data preparation steps.

In DataBrew project you can get a quick view of missing values in your sample data under Data quality in the Schema view and the Column statistics.

In DataBrew project you can get a quick view of missing values in your sample data under Data quality in the Schema view and the Column statistics.

For any data column you can choose to either remove the missing rows or fill it with an empty string, null, last valid value, most frequent value or a custom value. For numerical data columns you can also fill missing values with numerical aggregates of values like average, mode, sum or median of values.

Here’s an example of how we filled missing values in column “sub-product” with the custom values “None” for the Consumer complaints dataset.

Here’s an example of how we filled missing values in column "sub-product" with the custom values "None" for the Consumer complaints dataset.

#2 Combining datasets

Data analysis is often performed on a single dataset, however the key information required to arrive at useful insights might be spread throughout multiple datasets. Joins are a predominantly used data preparation step to bring together information from multiple different datasets together. In many large data scenarios, the information in a single dataset can be split or partitioned into multiple files, Union is a data preparation step used in this case to consolidate all parts of the dataset together. DataBrew supports Union and Join to combine data from multiple datasets. You can union multiple files into one at the beginning of a project or as a recipe step or join a dataset based on one or more join keys.

Multiple files as an input dataset
You can union multiple files together as a single input for any DataBrew project. Here is an example of how we union four files for NYC Parking tickets dataset.

You can select all files in a folder as an input for your project to union all the data in the files. DataBrew supports parameterized input path to customize the files you would like to combine.

DataBrew supports parameterized input path to customize the files you would like to combine.

Union as a transformation

In a project, you can add the union as a recipe step to combine multiple files. You will need to pre-create all the required datasets in DataBrew to perform this as a recipe step. Union is available as a transformation in the project toolbar.

Union is available as a transformation in the project toolbar.

You can select multiple datasets with preview for the Union transform. You can then specify whether to map column by names or position in the dataset and also order the datasets to control the order of rows in the data after union.

Detailed column mapping allows you to customize the columns mapping, resulting column name and column type.

Detailed column mapping allows you to customize the columns mapping, resulting column name and column type.

You can preview the resulting dataset before applying the transformation, which is then added to your recipe.

Joining datasets

The Join transform allows you to bring in data from a secondary dataset. The example covers joining of UN General Assembly Votes – Resolutions data with UN General Assembly Votes – States data using “resolution session” as the join key.

You can choose from the different Join types, the visual representation helps you identify the right type of join for your scenario. You can add one or more join keys.

The columns list allows you to search the entire list of columns from both the datasets and choose which columns you want to retain as part of the Join operation.

The columns list allows you to search the entire list of columns from both the datasets and choose which columns you want to retain as part of the Join operation.

You can also preview your joined table before you complete the transform.

#3 Creating columns

Often data available in a dataset might not be represented as the values you may need for downstream data analysis or data modeling. As part of data prep, data analyst and data scientist create these custom data columns in a format that would suit their data analysis needs.

You can create columns with extracted values or flagged values from existing column. DataBrew also provides a collection of functions that help you create new columns. It covers math, aggregate, text, date, windows, web and other functions.

You can create columns with extracted values or flagged values from existing column. DataBrew also provides a collection of functions that help you create new columns.

For example in a Netflix Movies and TV Shows dataset you can create a column that tell you how many years the title has been available on Netflix but using a DATEDIFF function on the “date_added” column.

Create a column using a function

You can select the DATEDIFF from the date functions from the toolbar. You can calculate the number of years by doing  calculating the difference of the “date_added” value and current date. You can set the output to be calculated in years.

You can set the output to be calculated in years.

As all transformations, you can preview the transform before applying it.

Creating a Flag column

From the same Netflix Movies and TV Shows dataset, you can create a flag column from the create column option in the toolbar.

You can flag titles that starred Kate Hudson by flagging a custom value “Kate Hudson” in the “Cast” column. You have different options for values of the flag column, in this case we will go with Yes and No.

You have different options for values of the flag column, in this case we will go with Yes and No.

#4 Filtering data

You can filter values in a dataset as a transformation or as a filter the data in your grid view.

If you select “Apply as a step”, the filter is added to your recipe as a step.

You can “Filter values” to filter values in view of the project. All applied filters are shown in the toolbar. You can choose to apply all the filters as a step from the toolbar at any time.

You can conditionally apply transformations on the conditionally filtered values.

For example, in the Netflix Movies and TV Shows dataset, we can replace the word “deformed” with “physically different” but only for Kids related titles.

For example, in the Netflix Movies and TV Shows dataset, we can replace the word “deformed” with “physically different” but only for Kids related titles.

We can filter the “listed_on” column by “Kid’s TV” and “Children & Family Movies”. Now on the “description” column we can replace values from the clean menu on the toolbar. On the transformation panel, under “Apply transform to” you can choose to apply the transformations only to the filtered rows. This will replace the term “deformed” with “physically different” only for Kids titles in the entire dataset.

This will replace the term “deformed” with “physically different” only for Kids titles in the entire dataset.

#5 Aggregating data

You can aggregate data in DataBrew by using the Group by transformation. For the example dataset of New York City Airbnb Open Data, we can create an aggregated minimum and maximum price by neighborhood.

You can select Group By transformation from the toolbar. We first select the column to group by “Neighborhood”. We can then add two aggregated columns based on column “Price” for “min” and “max” of the values in the column.

You can select Group By transformation from the toolbar. We first select the column to group by “Neighborhood”.

By default the columns are added as new columns in the existing dataset. You can choose to create a table with only the specified columns above.

#6 Handling Categorical values

For most ML modeling algorithms with categorical values like Gender, Product category or Education level need to be converted to numerical formats. DataBrew supports Categorical mapping and One-Hot Encoding.

Categorical or label mapping

Ordinal categorical values are ordered or hierarchical like Education level or T-shirt sizes e.g: Large is greater than Small so small can be labeled as 1 and large as 2 in numerical format. Easiest way to handle such variables is by using Categorical mapping in DataBrew. It can be accessed from the toolbar under Mapping.

For the example dataset of New York City Airbnb Open Data the “room_type” has values that are ordered. On the Categorical mapping form, you can choose to “Map all values” under mapping options. You can then select the checkbox “Map values to numeric values” or custom enter the values as required. Apply and you have new column with numerical values mapped to the original column.

Apply and you have new column with numerical values mapped to the original column.

One-Hot Encoding 

For all non-ordinal categorical values like gender or product category, One-hot encoding is the most common way to convert them to numerical format. It can be accessed in a DataBrew project in the toolbar under Encoding.

For the column “neighborhood_group” in the New York City Airbnb Open Data, all you have to do is open the one-hot encoding form and click apply. All the required columns with the encoded numerical values are generated instantly. This would usually take a Data Scientist hours to perform manually.

This would usually take a Data Scientist hours to perform manually.

#7 Handling Numerical values

Columns with numerical values are often not streamlined enough to be processed by an ML algorithm. A dataset may have columns with numerical values that are on very different scales for e.g: capacity would have a range of values from 0 to 100 but price could have a range of 10 to 10000. For such instances the columns would need to be rescaled to a common scale like 0 to 1. Along with scaling issues, we may also have data with outliers that need to be handled, hence scaling, normalizing and standardizing are some common transformations performed on numerical values on datasets used for ML models.

You can handle a column with numerical values like “Price” in New York City Airbnb Open Data using any of the transformations under Scale in the toolbar.

DataBrew provides you multiple techniques to rescale you data like Min-max normalizationScaling between specified valuesMean normalization or standardization, and Z-score normalization or standardization. In this use case, as price has an outlier you can select Z-score normalization or standardization to best scale the values for your ML model.

DataBrew provides you multiple techniques to rescale you data.

These are some of the most frequently used Data preparation transformations demonstrated in AWS Glue DataBrew. With more than 250 built-in transformation, you can find one that meets your data preparation use case and reduce the time and effort that goes into cleaning data.

Jump in and try out AWS Glue DataBrew today.

Datasets used in this blog:

Consumer complaints : https://www.consumerfinance.gov/data-research/consumer-complaints/

NYC Parking tickets : https://www.kaggle.com/new-york-city/nyc-parking-tickets

UN General Assembly Votes – Resolutions: Sample dataset available in AWS Glue DataBrew

UN General Assembly Votes – States: Sample dataset available in AWS Glue DataBrew

Netflix Movies and TV Shows : https://www.kaggle.com/shivamb/netflix-shows

New York City Airbnb Open Data – https://www.kaggle.com/dgomonov/new-york-city-airbnb-open-data


About the Authors

Shilpa Mohan is a Sr. UX designer at AWS and leads the design of AWS Glue DataBrew. With over 13 years of experience across multiple enterprise domains, she is currently crafting products for Database, Analytics and AI services for AWS. Shilpa is a passionate creator, she spends her time creating anything from content, photographs to crafts

 

 

Romi Boimer is a Sr. Software Development Engineer at AWS and a technical lead for AWS Glue DataBrew. She designs and builds solutions that enable customers to efficiently prepare and manage their data. Romi has a passion for aerial arts, in her spare time she enjoys fighting gravity and hanging from fabric.