Tag Archives: AWS Glue

Orchestrating AWS Glue crawlers using AWS Step Functions

Post Syndicated from Benjamin Smith original https://aws.amazon.com/blogs/compute/orchestrating-aws-glue-crawlers-using-aws-step-functions/

This blog post is written by Justin Callison, General Manager, AWS Workflow.

Organizations generate terabytes of data every day in a variety of semistructured formats. AWS Glue and Amazon Athena can give you a simpler and more cost-effective way to analyze this data with no infrastructure to manage. AWS Glue crawlers identify the schema of your data and manage the metadata required to analyze the data in place, without the need to transform this data and load into a data warehouse.

The timing of when your crawlers run and complete is important. You must ensure the crawler runs after your data has updated and before you query it with Athena or analyze with an AWS Glue job. If not, your analysis may experience errors or return incomplete results.

In this blog, you learn how to use AWS Step Functions, a low-code visual workflow service that integrates with over 220 AWS services. The service orchestrates your crawlers to control when they start, confirm completion, and combine them into end-to-end, serverless data processing workflows.

Using Step Functions to orchestrate multiple AWS Glue crawlers, provides a number of benefits when compared to implementing a solution directly with code. Firstly, the workflow provides an instant visual understanding of the application, and any errors that might occur during execution. Step Functions’ ability to run nested workflows inside a Map state helps to decouple and reuse application components with native array iteration. Finally, the Step Functions wait state lets the workflow periodically poll the status of the crawl job, without incurring additional cost for idol wait time.

Deploying the example

With this example, you create three datasets in Amazon S3, then use Step Functions to orchestrate AWS Glue crawlers to analyze the datasets and make them available to query using Athena.

You deploy the example with AWS CloudFormation using the following steps:

  1. Download the template.yaml file from here.
  2. Log in to the AWS Management Console and go to AWS CloudFormation.
  3. Navigate to Stacks -> Create stack and select With new resources (standard).
  4. Select Template is ready and Upload a template file, then Choose File and select the template.yaml file that you downloaded in Step 1 and choose Next.
  5. Enter a stack name, such as glue-stepfunctions-demo, and choose Next.
  6. Choose Next, check the acknowledgement boxes in the Capabilities and transforms section, then choose Create stack.
  7. After deployment, the status updates to CREATE_COMPLETE.

Create your datasets

Navigate to Step Functions in the AWS Management Console and select the create-dataset state machine from the list. This state machine uses Express Workflows and the Parallel state to build three datasets concurrently in S3. The first two datasets include information by user and location respectively and include files per day over the 5-year period from 2016 to 2020. The third dataset is a simpler, all-time summary of data by location.

To create the datasets, you choose Start execution from the toolbar for the create-dataset state machine, then choose Start execution again in the dialog box. This runs the state machine and creates the datasets in S3.

Navigate to the S3 console and view the glue-demo-databucket created for this example. In this bucket, in a folder named data, there are three subfolders, each containing a dataset.

The all-time-location-summaries folder contains a set of JSON files, one for each location.

The daily-user-summaries and daily-location-summaries contain a folder structure with nested folders for each year, month, and date. In addition to making this data easier to navigate via the console, this folder structure provides hints to AWS Glue that it can use to partition this dataset and make it more efficient to query.

Crawling

You now use AWS Glue crawlers to analyze these datasets and make them available to query. Navigate to the AWS Glue console, select Crawlers to see the list of Crawlers that you created when you deployed this example. Select the daily-user-summaries crawler to view details and note that they have tags assigned to indicate metadata such as the datatype of the data and whether the dataset is-partitioned.

Now, return to the Step Functions console and view the run-crawlers-with-tags state machine. This state machine uses AWS SDK service integrations to get a list of all crawlers matching the tag criteria you enter. It then uses the map state and the optimized service integration for Step Functions to execute the run-crawler state machine for each of the matching crawlers concurrently. The run-crawler state machine starts each crawler and monitors status until the crawler completes. Once each of the individual crawlers have completed, the run-crawlers-with-tags state machine also completes.

To initiate the crawlers:

  1. Choose Start execution from the top of the page when viewing the run-crawlers-with-tags state machine
  2. Provide the following as Input
    {"tags": {"datatype": "json"}}
  3. Choose Start execution.

After 2-3 minutes, the execution finishes with a Succeeded status once all three crawlers have completed. During this time, you can navigate to the run-crawler state machine to view the individual, nested executions per crawler or to the AWS Glue console to see the status of the crawlers.

Querying the data using Amazon Athena

Now, navigate to the Athena console where you can see the database and tables created by your crawlers. Note that AWS Glue recognized the partitioning scheme and included fields for year, month, and date in addition to user and usage fields for the data contained in the JSON files.

If you have not used Athena in this account before, you see a message instructing you to set a query result location. Choose View settings -> Manage -> Browse S3 and select the athena-results bucket that you created when you deployed the example. Choose Save then return to the Editor to continue.

You can now run queries such as the following, to calculate the total usage for all users over 5 years.

SELECT SUM(usage) all_time_usage FROM “daily_user_summaries”

You can also add filters, as shown in the following example, which limit results to those from 2016.

SELECT SUM(usage) all_time_usage FROM “daily_user_summaries” WHERE year = ‘2016’

Note this second query scanned only 17% as much data (133 KB vs 797 KB) and completed faster. This is because Athena used the partitioning information to avoid querying the full dataset. While the differences in this example are small, for real-world datasets with terabytes of data, your cost and latency savings from partitioning data can be substantial.

The disadvantage of a partitioning scheme is that new folders are not included in query results until you add new partitions. Re-running your crawler identifies and adds the new partitions and using Step Functions to orchestrate these crawlers makes that task simpler.

Extending the example

You can use these example state machines as they are in your AWS accounts to manage your existing crawlers. You can use Amazon S3 event notifications with Amazon EventBridge to trigger crawlers based on data changes. With the Optimized service integration for Amazon Athena, you can extend your workflows to execute queries against these crawled datasets. And you can use these examples to integrate crawler execution into your end-to-end data processing workflows, creating reliable, auditable workflows from ingestion through to analysis.

Conclusion

In this blog post, you learn how to use Step Functions to orchestrate AWS Glue crawlers. You deploy an example that generates three datasets, then uses Step Functions to start and coordinate crawler runs that analyze this data and make it available to query using Athena.

To learn more about Step Functions, visit Serverless Land.

Accelerate Amazon DynamoDB data access in AWS Glue jobs using the new AWS Glue DynamoDB Export connector

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/accelerate-amazon-dynamodb-data-access-in-aws-glue-jobs-using-the-new-aws-glue-dynamodb-elt-connector/

Modern data architectures encourage the integration of data lakes, data warehouses, and purpose-built data stores, enabling unified governance and easy data movement. With a modern data architecture on AWS, you can store data in a data lake and use a ring of purpose-built data services around the lake, allowing you to make decisions with speed and agility.

To achieve a modern data architecture, AWS Glue is the key service that integrates data over a data lake, data warehouse, and purpose-built data stores. AWS Glue simplifies data movement like inside-out, outside-in, or around the perimeter. A powerful purpose-built data store is Amazon DynamoDB, which is widely used by hundreds of thousands of companies, including Amazon.com. It’s common to move data from DynamoDB to a data lake built on top of Amazon Simple Storage Service (Amazon S3). Many customers move data from DynamoDB to Amazon S3 using AWS Glue extract, transform, and load (ETL) jobs.

Today, we’re pleased to announce the general availability of a new AWS Glue DynamoDB export connector. It’s built on top of the DynamoDB table export feature. It’s a scalable and cost-efficient way to read large DynamoDB table data in AWS Glue ETL jobs. This post describes the benefit of this new export connector and its use cases.

The following are typical use cases to read from DynamoDB tables using AWS Glue ETL jobs:

  • Move the data from DynamoDB tables to different data stores
  • Integrate the data with other services and applications
  • Retain historical snapshots for auditing
  • Build an S3 data lake from the DynamoDB data and analyze the data from various services, such as Amazon Athena, Amazon Redshift, and Amazon SageMaker

The new AWS Glue DynamoDB export connector

The old version of the AWS Glue DynamoDB connector reads DynamoDB tables through the DynamoDB Scan API. Instead, the new AWS Glue DynamoDB export connector reads DynamoDB data from the snapshot, which is exported from DynamoDB tables. This approach has following benefits:

  • It doesn’t consume read capacity units of the source DynamoDB tables
  • The read performance is consistent for large DynamoDB tables

Especially for large DynamoDB tables more than 100 GB, this new connector is significantly faster than the traditional connector.

To use this new export connector, you need to enable point-in-time recovery (PITR) for the source DynamoDB table in advance.

How to use the new connector on AWS Glue Studio Visual Editor

AWS Glue Studio Visual Editor is a graphical interface that makes it easy to create, run, and monitor AWS Glue ETL jobs in AWS Glue. The new DynamoDB export connector is available on AWS Glue Studio Visual Editor. You can choose Amazon DynamoDB as the source.

After you choose Create, you see the visual Directed Acyclic Graph (DAG). Here, you can choose your DynamoDB table that exists in this account or Region. This allows you to select DynamoDB tables (with PITR enabled) directly as a source in AWS Glue Studio. This provides a one-click export from any of your DynamoDB tables to Amazon S3. You can also easily add any data sources and targets or transformations to the DAG. For example, it allows you to join two different DynamoDB tables and export the result to Amazon S3, as shown in the following screenshot.

The following two connection options are automatically added. This location is used to store temporary data during the DynamoDB export phase. You can set S3 bucket lifecycle policies to expire temporary data.

  • dynamodb.s3.bucket – The S3 bucket to store temporary data during DynamoDB export
  • dynamodb.s3.prefix – The S3 prefix to store temporary data during DynamoDB export

How to use the new connector on the job script code

You can use the new export connector when you create an AWS Glue DynamicFrame in the job script code by configuring the following connection options:

  • dynamodb.export – (Required) You need to set this to ddb or s3
  • dynamodb.tableArn – (Required) Your source DynamoDB table ARN
  • dynamodb.unnestDDBJson – (Optional) If set to true, performs an unnest transformation of the DynamoDB JSON structure that is present in exports. The default value is false.
  • dynamodb.s3.bucket – (Optional) The S3 bucket to store temporary data during DynamoDB export
  • dynamodb.s3.prefix – (Optional) The S3 prefix to store temporary data during DynamoDB export

The following is the sample Python code to create a DynamicFrame using the new export connector:

dyf = glue_context.create_dynamic_frame.from_options(
    connection_type="dynamodb",
    connection_options={
        "dynamodb.export": "ddb",
        "dynamodb.tableArn": "test_source",
        "dynamodb.unnestDDBJson": True,
        "dynamodb.s3.bucket": "bucket name",
        "dynamodb.s3.prefix": "bucket prefix"
    }
)

The new export connector doesn’t require configurations related to AWS Glue job parallelism, unlike the old connector. Now you no longer need to change the configuration when you scale out the AWS Glue job. It also doesn’t require any configuration regarding DynamoDB table read/write capacity and its capacity mode (on demand or provisioned).

DynamoDB table schema handling

By default, the new export connector reads data in DynamoDB JSON structure that is present in exports. The following is an example schema of the frame using the Amazon Customer Review Dataset:

root
|-- Item: struct (nullable = true)
| |-- product_id: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- review_id: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- total_votes: struct (nullable = true)
| | |-- N: string (nullable = true)
| |-- product_title: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- star_rating: struct (nullable = true)
| | |-- N: string (nullable = true)
| |-- customer_id: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- marketplace: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- helpful_votes: struct (nullable = true)
| | |-- N: string (nullable = true)
| |-- review_headline: struct (nullable = true)
| | |-- S: string (nullable = true)
| | |-- NULL: boolean (nullable = true)
| |-- review_date: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- vine: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- review_body: struct (nullable = true)
| | |-- S: string (nullable = true)
| | |-- NULL: boolean (nullable = true)
| |-- verified_purchase: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- product_category: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- year: struct (nullable = true)
| | |-- N: string (nullable = true)
| |-- product_parent: struct (nullable = true)
| | |-- S: string (nullable = true)

To read DynamoDB item columns without handling nested data, you can set dynamodb.unnestDDBJson to True. The following is an example of the schema of the same data where dynamodb.unnestDDBJson is set to True:

root
|-- product_id: string (nullable = true)
|-- review_id: string (nullable = true)
|-- total_votes: string (nullable = true)
|-- product_title: string (nullable = true)
|-- star_rating: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- marketplace: string (nullable = true)
|-- helpful_votes: string (nullable = true)
|-- review_headline: string (nullable = true)
|-- review_date: string (nullable = true)
|-- vine: string (nullable = true)
|-- review_body: string (nullable = true)
|-- verified_purchase: string (nullable = true)
|-- product_category: string (nullable = true)
|-- year: string (nullable = true)
|-- product_parent: string (nullable = true)

Data freshness

Data freshness is the measure of staleness of the data from the live tables in the original source. In the new export connecor, the option dynamodb.export impacts data freshness.

When dynamodb.export is set to ddb, the AWS Glue job invokes a new export and then reads the export placed in an S3 bucket into DynamicFrame. It reads exports of the live table, so data can be fresh. On the other hand, when dynamodb.export is set to s3, the AWS Glue job skips invoking a new export and directly reads an export already placed in an S3 bucket. It reads exports of the past table, so data can be stale, but you can reduce overhead to trigger the exports.

The following table explains the data freshness and pros and cons of each option.

.. dynamodb.export Config Data Freshness Data Source Pros Cons
New export connector s3 Stale Export of the past table
  • RCU is not consumed
  • Can skip triggering exports
  • Data can be stale
New export connector ddb Fresh Export of the live table
  • Data can be fresh
  • RCU is not consumed
  • Overhead to trigger exports and wait for completion
Old connector N/A Most fresh Scan of the live tables
  • Data can be fresh
  • Read capacity unit (RCU) is consumed

Performance

The following benchmark shows the performance improvements between the old version of the AWS Glue DynamoDB connector and the new export connector. The comparison uses the DynamoDB tables storing the TPC-DS benchmark dataset with different scales from 10 MB to 2 TB. The sample Spark job reads from the DynamoDB table and calculates the count of the items. All the Spark jobs are run on AWS Glue 3.0, G.2X, 60 workers.

The following chart compares AWS Glue job duration between the old connector and the new export connector. For small DynamoDB tables, the old connector is faster. For large tables more than 80 GB, the new export connector is faster. In other words, the DynamoDB export connector is recommended for jobs that take the old connector more than 5–10 minutes to run. Also, the chart shows that the duration of the new export connector increases slowly as data size increases, although the duration of the old connector increases rapidly as data size increases. This means that the new export connector is suitable especially for larger tables.

The following chart compares dollar cost between the old connector and the new export connector. It contains the AWS Glue DPU hour cost summed with the cost for reading data from DynamoDB. For the old connector, we include the read request cost. For the new export connector, we include the cost in the DynamoDB data export to Amazon S3. Both are calculated in DynamoDB on-demand capacity mode.

With AWS Glue Auto Scaling

AWS Glue Auto Scaling is a new feature to automatically resize computing resources for better performance at lower cost. You can take advantage of AWS Glue Auto Scaling with the new DynamoDB export connector.

As the following chart shows, with AWS Glue Auto Scaling, the duration of the new export connector is shorter than the old connector when the size of the source DynamoDB table is 100 GB or more. It shows a similar trend without AWS Glue Auto Scaling.

You get the cost benefits as only Spark driver is active for most of the time duration during the DynamoDB export (which is nearly 30% of the total job duration time with the old scan-based connector).

Conclusion

AWS Glue is a key service to integrate with multiple data stores. At AWS, we keep improving the performance and cost-efficiency of our services. In this post, we announced the availability of the new AWS Glue DynamoDB export connector. With this new connector, you can easily integrate your large data on DynamoDB tables with different data stores. It helps you read the large tables faster from AWS Glue jobs at lower cost.

The new AWS Glue DynamoDB export connector is now generally available in all supported Glue Regions. Let’s start using the new AWS Glue DynamoDB export connector today! We are looking forward to your feedback and stories on how you utilize the connector for your needs.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts that help customers build data lakes on the cloud.

Neil Gupta is a Software Development Engineer on the AWS Glue team. He enjoys tackling big data problems and learning more about distributed systems.

Andrew Kim is a Software Development Engineer on the AWS Glue team. His passion is to build scalable and effective solutions to challenging problems and working with distributed systems.

Savio Dsouza is a Software Development Manager on the AWS Glue team. His team works on distributed systems for efficiently managing data lakes on AWS and optimizing Apache Spark for performance and reliability.

Use the AWS Glue connector to read and write Apache Iceberg tables with ACID transactions and perform time travel

Post Syndicated from Tomohiro Tanaka original https://aws.amazon.com/blogs/big-data/use-the-aws-glue-connector-to-read-and-write-apache-iceberg-tables-with-acid-transactions-and-perform-time-travel/

Nowadays, many customers have built their data lakes as the core of their data analytic systems. In a typical use case of data lakes, many concurrent queries run to retrieve consistent snapshots of business insights by aggregating query results. A large volume of data constantly comes from different data sources into the data lakes. There is also a common demand to reflect the changes occurring in the data sources into the data lakes. This means that not only inserts but also updates and deletes need to be replicated into the data lakes.

Apache Iceberg provides the capability of ACID transactions on your data lakes, which allows concurrent queries to add or delete records isolated from any existing queries with read-consistency for queries. Iceberg is an open table format designed for large analytic workloads on huge datasets. You can perform ACID transactions against your data lakes by using simple SQL expressions. It also enables time travel, rollback, hidden partitioning, and schema evolution changes, such as adding, dropping, renaming, updating, and reordering columns.

AWS Glue is one of the key elements to building data lakes. It extracts data from multiple sources and ingests your data to your data lake built on Amazon Simple Storage Service (Amazon S3) using both batch and streaming jobs. To expand the accessibility of your AWS Glue extract, transform, and load (ETL) jobs to Iceberg, AWS Glue provides an Apache Iceberg connector. The connector allows you to build Iceberg tables on your data lakes and run Iceberg operations such as ACID transactions, time travel, rollbacks, and so on from your AWS Glue ETL jobs.

In this post, we give an overview of how to set up the Iceberg connector for AWS Glue and configure the relevant resources to use Iceberg with AWS Glue jobs. We also demonstrate how to run typical Iceberg operations on AWS Glue interactive sessions with an example use case.

Apache Iceberg connector for AWS Glue

With the Apache Iceberg connector for AWS Glue, you can take advantage of the following Iceberg capabilities:

  • Basic operations on Iceberg tables – This includes creating Iceberg tables in the AWS Glue Data Catalog and inserting, updating, and deleting records with ACID transactions in the Iceberg tables
  • Inserting and updating records – You can run UPSERT (update and insert) queries for your Iceberg table
  • Time travel on Iceberg tables – You can read a specific version of an Iceberg table from table snapshots that Iceberg manages
  • Rollback of table versions – You can revert an Iceberg table back to a specific version of the table

Iceberg offers additional useful capabilities such as hidden partitioning; schema evolution with add, drop, update, and rename support; automatic data compaction; and more. For more details about Iceberg, refer to the Apache Iceberg documentation.

Next, we demonstrate how the Apache Iceberg connector for AWS Glue works for each Iceberg capability based on an example use case.

Overview of example customer scenario

Let’s assume that an ecommerce company sells products on their online platform. Customers can buy products and write reviews to each product. Customers can add, update, or delete their reviews at any time. The customer reviews are an important source for analyzing customer sentiment and business trends.

In this scenario, we have the following teams in our organization:

  • Data engineering team – Responsible for building and managing data platforms.
  • Data analyst team – Responsible for analyzing customer reviews and creating business reports. This team queries the reviews daily, creates a business intelligence (BI) report, and shares it with sales team.
  • Customer support team – Responsible for replying to customer inquiries. This team queries the reviews when they get inquiries about the reviews.

Our solution has the following requirements:

  • Query scalability is important because the website is huge.
  • Individual customer reviews can be added, updated, and deleted.
  • The data analyst team needs to use both notebooks and ad hoc queries for their analysis.
  • The customer support team sometimes needs to view the history of the customer reviews.
  • Customer reviews can always be added, updated, and deleted, even while one of the teams is querying the reviews for analysis. This means that any result in a query isn’t affected by uncommitted customer review write operations.
  • Any changes in customer reviews that are made by the organization’s various teams need to be reflected in BI reports and query results.

In this post, we build a data lake of customer review data on top of Amazon S3. To meet these requirements, we introduce Apache Iceberg to enable adding, updating, and deleting records; ACID transactions; and time travel queries. We also use an AWS Glue Studio notebook to integrate and query the data at scale. First, we set up the connector so we can create an AWS Glue connection for Iceberg.

Set up the Apache Iceberg connector and create the Iceberg connection

We first set up Apache Iceberg connector for AWS Glue to use Apache Iceberg with AWS Glue jobs. Particularly, in this section, we set up the Apache Iceberg connector for AWS Glue and create an AWS Glue job with the connector. Complete the following steps:

  1. Navigate to the Apache Iceberg connector for AWS Glue page in AWS Marketplace.
  2. Choose Continue to Subscribe.

  1. Review the information under Terms and Conditions, and choose Accept Terms to continue.

  1. When the subscription is complete, choose Continue to Configuration.

  1. For Fulfillment option, choose Glue 3.0. (1.0 and 2.0 are also available options.)
  2. For Software version, choose the latest software version.

As of this writing, 0.12.0-2 is the latest version of the Apache Iceberg connector for AWS Glue.

  1. Choose Continue to Launch.

  1. Choose Usage instructions.
  2. Choose Activate the Glue connector from AWS Glue Studio.

You’re redirected to AWS Glue Studio.

  1. For Name, enter a name for your connection (for example, iceberg-connection).

  1. Choose Create connection and activate connector.

A message appears that the connection was successfully added, and the connection is now visible on the AWS Glue Studio console.

Configure resources and permissions

We use a provided AWS CloudFormation template to set up Iceberg configuration for AWS Glue. AWS CloudFormation creates the following resources:

  • An S3 bucket to store an Iceberg configuration file and actual data
  • An AWS Lambda function to generate an Iceberg configuration file based on parameters provided by a user for the CloudFormation template, and to clean up the resources created through this post
  • AWS Identity and Access Management (IAM) roles and policies with necessary permissions
  • An AWS Glue database in the Data Catalog to register Iceberg tables

To deploy the CloudFormation template, complete the following steps:

  1. Choose Launch Stack:

Launch Button

  1. For DynamoDBTableName, enter a name for an Amazon DynamoDB table that is created automatically when AWS Glue creates an Iceberg table.

This table is used for an AWS Glue job to obtain a commit lock to avoid concurrently modifying records in Iceberg tables. For more details about commit locking, refer to DynamoDB for Commit Locking. Note that you shouldn’t specify the name of an existing table.

  1. For IcebergDatabaseName, enter a name for the AWS Glue database that is created in the Data Catalog and used for registering Iceberg tables.
  2. Choose Next.

  1. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  2. Choose Create stack.

Start an AWS Glue Studio notebook to use Apache Iceberg

After you launch the CloudFormation stack, you create an AWS Glue Studio notebook to perform Iceberg operations. Complete the following steps:

  1. Download the Jupyter notebook file.
  2. On the AWS Glue console, choose Jobs in the navigation pane.
  3. Under Create job, select Jupyter Notebook.

  1. Select Upload and edit an existing notebook and upload iceberg-with-glue.ipynb.

  1. Choose Create.
  2. For Job name, enter a name.
  3. For IAM role, choose IcebergConnectorGlueJobRole, which was created via the CloudFormation template.
  4. Choose Start notebook job.

The process takes a few minutes to complete, after which you can see an AWS Glue Studio notebook view.

  1. Choose Save to save the notebook.

Set up the Iceberg configuration

To set up the Iceberg configuration, complete the following steps:

  1. Run the following cells with multiple options (magics). Note that you set your connection name for the %connections magic in the cell.

For more information, refer to Configuring AWS Glue Interactive Sessions for Jupyter and AWS Glue Studio notebooks.

A message Session <session-id> has been created appears when your AWS Glue Studio notebook is ready.

In the last cell in this section, you load your Iceberg configuration, which you specified when launching the CloudFormation stack. The Iceberg configuration includes a warehouse path for Iceberg actual data, a DynamoDB table name for commit locking, a database name for your Iceberg tables, and more.

To load the configuration, set the S3 bucket name that was created via the CloudFormation stack.

  1. On the AWS CloudFormation console, choose Stacks in the navigation pane.
  2. Choose the stack you created.
  3. On the Outputs tab, copy the S3 bucket name.

  1. Set the S3 name as the S3_BUCKET parameter in your notebook.

  1. Run the cell and load the Iceberg configuration that you set.

Initialize the job with Iceberg configurations

We continue to run cells to initiate a SparkSession in this section.

  1. Set an Iceberg warehouse path and a DynamoDB table name for Iceberg commit locking from the user_config parameter.
  2. Initialize a SparkSession by setting the Iceberg configurations.
  3. With the SparkSession object, create SparkContext and GlueContext objects.

The following screenshot shows the relevant section in the notebook.

We provide the details of each parameter that you configure for the SparkSession in the appendix of this post.

For this post, we demonstrate setting the Spark configuration for Iceberg. You can also set the configuration as AWS Glue job parameters. For more information, refer to the Usage Information section in the Iceberg connector product page.

Use case walkthrough

To walk through our use case, we use two tables; acr_iceberg and acr_iceberg_report. The table acr_iceberg contains the customer review data. The table acr_iceberg_report contains BI analysis results based on the customer review data. All changes to acr_iceberg also impact acr_iceberg_report. The table acr_iceberg_report needs to be updated daily, right before sharing business reports with stakeholders.

To demonstrate this use case, we walk through the following typical steps:

  1. A data engineering team registers the acr_iceberg and acr_iceberg_report tables in the Glue Data Catalog.
  2. Customers (ecommerce users) add reviews to products in the Industrial_Supplies category. These reviews are added to the Iceberg table.
  3. A customer requests to update their reviews. We simulate updating the customer review in the acr_iceberg table.
  4. We reflect the customer’s request of the updated review in acr_iceberg into acr_iceberg_report.
  5. We revert the customer’s request of the updated review for the customer review table acr_iceberg, and reflect the reversion in acr_iceberg_report.

1. Create Iceberg tables of customer reviews and BI reports

In this step, the data engineering team creates the acr_iceberg Iceberg table for customer reviews data (based on the Amazon Customer Reviews Dataset), and the team creates the acr_iceberg_report Iceberg table for BI reports.

Create the acr_iceberg table for customer reviews

The following code initially extracts the Amazon customer reviews, which are stored in a public S3 bucket. Then it creates an Iceberg table of the customer reviews and loads these reviews into your specified S3 bucket (created via CloudFormation stack). Note that the script loads partial datasets to avoid taking a lot of time to load the data.

# Loading the dataset and creating an Iceberg table. This will take about 3-5 minutes.
spark.read \
    .option('basePath', INPUT_BASE_PATH) \
    .parquet(*INPUT_CATEGORIES) \
    .writeTo(f'{CATALOG}.{DATABASE}.{TABLE}') \
    .tableProperty('format-version', '2') \
    .create()

Regarding the tableProperty parameter, we specify format version 2 to make the table version compatible with Amazon Athena. For more information about Athena support for Iceberg tables, refer to Considerations and limitations. To learn more about the difference between Iceberg table versions 1 and 2, refer to Appendix E: Format version changes.

Let’s run the following cells. Running the second cell takes around 3–5 minutes.

After you run the cells, the acr_iceberg table is available in your specified database in the Glue Data Catalog.

You can also see the actual data and metadata of the Iceberg table in the S3 bucket that is created through the CloudFormation stack. Iceberg creates the table and writes actual data and relevant metadata that includes table schema, table version information, and so on. See the following objects in your S3 bucket:

$ aws s3 ls 's3://your-bucket/data/' --recursive
YYYY-MM-dd hh:mm:ss   83616660 data/iceberg_blog_default.db/acr_iceberg/data/00000-44-c2983230-c43a-4f4a-9b89-1f7c13e59645-00001.parquet
YYYY-MM-dd hh:mm:ss   83247771 
...
YYYY-MM-dd hh:mm:ss       5134 data/iceberg_blog_default.db/acr_iceberg/metadata/00000-bc5d3ea2-280f-4e28-a71f-4c2b749ed637.metadata.json
YYYY-MM-dd hh:mm:ss     116950 data/iceberg_blog_default.db/acr_iceberg/metadata/411308cd-1f4d-4535-9444-f6b56a56697f-m0.avro
YYYY-MM-dd hh:mm:ss       3821 data/iceberg_blog_default.db/acr_iceberg/metadata/snap-6122957686233868728-1-411308cd-1f4d-4535-9444-f6b56a56697f.avro

The job tries to create a DynamoDB table, which you specified in the CloudFormation stack (in the following screenshot, its name is myGlueLockTable), if it doesn’t exist already. As we discussed earlier, the DynamoDB table is used for commit locking for Iceberg tables.

Create the acr_iceberg_report Iceberg table for BI reports

The data engineer team also creates the acr_iceberg_report table for BI reports in the Glue Data Catalog. This table initially has the following records.

comment_count avg_star product_category
1240 4.20729367860598 Camera
95 4.80167540490342 Industrial_Supplies
663 3.80123467540571 PC

To create the table, run the following cell.

The two Iceberg tables have been created. Let’s check the acr_iceberg table records by running a query.

Determine the average star rating for each product category by querying the Iceberg table

You can see the Iceberg table records by using a SELECT statement. In this section, we query the acr_iceberg table to simulate seeing a current BI report data by running an ad hoc query.

Run the following cell in the notebook to get the aggregated number of customer comments and mean star rating for each product_category.

The cell output has the following results.

Another way to query Iceberg tables is using Amazon Athena (when you use the Athena with Iceberg tables, you need to set up the Iceberg environment) or Amazon EMR.

2. Add customer reviews in the Iceberg table

In this section, customers add comments for some products in the Industrial Supplies product category, and we add these comments to the acr_iceberg table. To demonstrate this scenario, we create a Spark DataFrame based on the following new customer reviews and then add them to the table with an INSERT statement.

marketplace customer_id review_id product_id product_
parent
product_
title
star_
rating
helpful_
votes
total_
votes
vine verified_
purchase
review_
headline
review_
body
review_
date
year product_
category
US 12345689 ISB35E4556F144 I00EDBY7X8 989172340 plastic containers 5 0 0 N Y Five Stars Great product! 2022-02-01 2022 Industrial_
Supplies
US 78901234 IS4392CD4C3C4 I00D7JFOPC 952000001 battery tester 3 0 0 N Y nice one, but
it broke
some days later
nope 2022-02-01 2022 Industrial_
Supplies
US 12345123 IS97B103F8B24C I002LHA74O 818426953 spray bottle 2 1 1 N N Two Stars the bottle isn’t
as big as pictured.
2022-02-01 2022 Industrial_
Supplies
US 23000093 ISAB4268D46F3X I00ARPLCGY 562945918 3d printer 5 3 3 N Y Super great very useful 2022-02-01 2022 Industrial_
Supplies
US 89874312 ISAB4268137V2Y I80ARDQCY 564669018 circuit board 4 0 0 Y Y Great, but
a little bit expensive
you should buy this,
but note the price
2022-02-01 2022 Industrial_
Supplies

Run the following cells in the notebook to insert the customer comments to the Iceberg table. The process takes about 1 minute.

Run the next cell to see an addition to the product category Industrial_Supplies with 5 under comment_count.

3. Update a customer review in the Iceberg table

In the previous section, we added new customer reviews to the acr_iceberg Iceberg table. In this section, a customer requests an update of their review. Specifically, customer 78901234 requests the following update of the review ID IS4392CD4C3C4.

  • change star_rating from 3 to 5
  • update the review_headline from nice one, but it broke some days later to very good

We update the customer comment by using an UPDATE query by running the following cell.

We can review the updated record by running the next cell as follows.

Also, when you run this cell for the reporting table, you can see the updated avg_star column value for the Industrial_Supplies product category. Specifically, the avg_star value has been updated from 3.8 to 4.2 as a result of the star_rating changing from 3 to 5:

4. Reflect changes in the customer reviews table in the BI report table with a MERGE INTO query

In this section, we reflect the changes in the acr_iceberg table into the BI report table acr_iceberg_report. To do so, we run the MERGE INTO query and combine the two tables based on the condition of the product_category column in each table. This query works as follows:

  • When the product_category column in each table is the same, the query returns the sum of each column record
  • When the column in each table is not the same, the query just inserts a new record

This MERGE INTO operation is also referred to as an UPSERT (update and insert).

Run the following cell to reflect the update of customer reviews in the acr_iceberg table into the acr_iceberg_report BI table.

After the MERGE INTO query is complete, you can see the updated acr_iceberg_report table by running the following cell.

The MERGE INTO query performed the following changes:

  • In the Camera, Industrial_Supplies, and PC product categories, each comment_count is the sum between the initial value of the acr_iceberg_report table and the aggregated table value. For example, in the Industrial_Supplies product category row, the comment_count 100 is calculated by 95 (in the initial version of acr_iceberg_report) + 5 (in the aggregated report table).
  • In addition to comment_count, the avg_star in the Camera, Industrial_Supplies, or PC product category row is also computed by averaging between each avg_star value in acr_iceberg_report and in the aggregated table.
  • In other product categories, each comment_count and avg_star is the same as each value in the aggregated table, which means that each value in the aggregated table is inserted into the acr_iceberg_report table.

5. Roll back the Iceberg tables and reflect changes in the BI report table

In this section, the customer who requested the update of the review now requests to revert the updated review.

Iceberg stores versioning tables through the operations for Iceberg tables. We can see the information of each version of table by inspecting tables, and we can also time travel or roll back tables to an old table version.

To complete the customer request to revert the updated review, we need to revert the table version of acr_iceberg to the earlier version when we first added the reviews. Additionally, we need to update the acr_iceberg_report table to reflect the rollback of the acr_iceberg table version. Specifically, we need to perform the following three steps to complete these operations:

  1. Check the history of table changes of acr_iceberg and acr_iceberg_report to get each table snapshot.
  2. Roll back acr_iceberg to the version when first we inserted records, and also roll back the acr_iceberg_report table to the initial version to reflect the customer review update.
  3. Merge the acr_iceberg table with the acr_iceberg_report table again.

Get the metadata of each report table

As a first step, we check table versions by inspecting the table. Run the following cells.

Now you can see the following table versions in acr_iceberg and acr_iceberg_report:

  • acr_iceberg has three versions:
    • The oldest one is the initial version of this table, which shows the append operation
    • The second oldest one is the record insertion, which shows the append operation
    • The latest one is the update, which shows the overwrite operation
  • acr_iceberg_report has two versions:
    • The oldest one is the initial version of this table, which shows the append operation
    • The other one is from the MERGE INTO query in the previous section, which shows the overwrite operation

As shown in the following screenshot, we roll back to the acr_iceberg table version, inserting records based on the customer revert request. We also roll back to the acr_iceberg_report table version in the initial version to discard the MERGE INTO operation in the previous section.

Roll back the acr_iceberg and acr_iceberg_report tables

Based on your snapshot IDs, you can roll back each table version:

  • For acr_iceberg, use the second-oldest snapshot_id (in this example, 5440744662350048750) and replace <Type snapshot_id in ace_iceberg table> in the following cell with this snapshot_id.
  • For acr_iceberg_report table, use the initial snapshot_id (in this example, 7958428388396549892) and replace <Type snaphost_id in ace_iceberg_report table> in the following cell with this snapshot_id.

After you specify the snapshot_id for each rollback query, run the following cells.

When this step is complete, you can see the previous and current snapshot IDs of each table.

Each Iceberg table has been reverted to the specific version now.

Reflect changes in acr_iceberg into acr_iceberg_report again

We reflect the acr_iceberg table reversion into the current acr_iceberg_report table. To complete this, run the following cell.

After you rerun the MERGE INTO query, run the following cell to see the new table records. When we compare the table records, we observe that the avg_star value in Industrial_Supplies is lower than the value of the previous table avg_star.

You were able to reflect a customer’s request of reverting their updated review on the BI report table. Specifically, you can get the updated avg_star record in the Industrial_Supplies product category.

Clean up

To clean up all resources that you created, delete the CloudFormation stack.

Conclusion

In this post, we walked through using the Apache Iceberg connector with AWS Glue ETL jobs. We created an Iceberg table built on Amazon S3, and ran queries such as reading the Iceberg table data, inserting a record, merging two tables, and time travel.

The operations for the Iceberg table that we demonstrated in this post aren’t all of the operations Iceberg supports. Refer to the Apache Iceberg documentation for information about more operations.

Appendix: Spark configurations to use Apache Iceberg on AWS Glue

As we mentioned earlier, the notebook sets up a Spark configuration to integrate Iceberg with AWS Glue. The following table shows what each parameter defines.

Spark configuration key Value Description
spark.sql.catalog.{CATALOG} org.apache.iceberg.spark.SparkCatalog Specifies a Spark catalog interface that communicates with Iceberg tables.
spark.sql.catalog.{CATALOG}.warehouse {WAREHOUSE_PATH} A warehouse path for jobs to write iceberg metadata and actual data.
spark.sql.catalog.{CATALOG}.catalog-impl org.apache.iceberg.aws.
glue.GlueCatalog
The implementation of the Spark catalog class to communicate between Iceberg tables and the AWS Glue Data Catalog.
spark.sql.catalog.{CATALOG}.io-impl org.apache.iceberg.aws.s3.S3FileIO Used for Iceberg to communicate with Amazon S3.
spark.sql.catalog.{CATALOG}.lock-impl org.apache.iceberg.aws.glue.
DynamoLockManager
Used for Iceberg to manage table locks.
spark.sql.catalog.{CATALOG}.lock.table {DYNAMODB_TABLE} A DynamoDB table name to store table locks.
spark.sql.extensions org.apache.icerberg.spark.extensions.
IcebergSparkSessionExtensions
The implementation that enables Spark to run Iceberg-specific SQL commands.
spark.sql.session.timeZone UTC Sets the time zone of the Spark environment to UTC for further Iceberg time travel queries. The epoch time is in the UTC time zone.

About the Author

Tomohiro Tanaka is a Cloud Support Engineer at Amazon Web Services. He builds Glue connectors such as Apache Iceberg connector and TPC-DS connector. He’s passionate about helping customers build data lakes using ETL workloads. In his free time, he also enjoys coffee breaks with his colleagues and making coffee at home.

Build an Apache Iceberg data lake using Amazon Athena, Amazon EMR, and AWS Glue

Post Syndicated from Kishore Dhamodaran original https://aws.amazon.com/blogs/big-data/build-an-apache-iceberg-data-lake-using-amazon-athena-amazon-emr-and-aws-glue/

Most businesses store their critical data in a data lake, where you can bring data from various sources to a centralized storage. The data is processed by specialized big data compute engines, such as Amazon Athena for interactive queries, Amazon EMR for Apache Spark applications, Amazon SageMaker for machine learning, and Amazon QuickSight for data visualization.

Apache Iceberg is an open-source table format for data stored in data lakes. It is optimized for data access patterns in Amazon Simple Storage Service (Amazon S3) cloud object storage. Iceberg helps data engineers tackle complex challenges in data lakes such as managing continuously evolving datasets while maintaining query performance. Iceberg allows you to do the following:

  • Maintain transactional consistency where files can be added, removed, or modified atomically with full read isolation and multiple concurrent writes
  • Implement full schema evolution to process safe table schema updates as the table data evolves
  • Organize tables into flexible partition layouts with partition evolution, enabling updates to partition schemes as queries and data volume changes without relying on physical directories
  • Perform row-level update and delete operations to satisfy new regulatory requirements such as the General Data Protection Regulation (GDPR)
  • Provide versioned tables and support time travel queries to query historical data and verify changes between updates
  • Roll back tables to prior versions to return tables to a known good state in case of any issues

In 2021, AWS teams contributed the Apache Iceberg integration with the AWS Glue Data Catalog to open source, which enables you to use open-source compute engines like Apache Spark with Iceberg on AWS Glue. In 2022, Amazon Athena announced support of Iceberg and Amazon EMR added support of Iceberg starting with version 6.5.0.

In this post, we show you how to use Amazon EMR Spark to create an Iceberg table, load sample books review data, and use Athena to query, perform schema evolution, row-level update and delete, and time travel, all coordinated through the AWS Glue Data Catalog.

Solution overview

We use the Amazon Customer Reviews public dataset as our source data. The dataset contains data files in Apache Parquet format on Amazon S3. We load all the book-related Amazon review data as an Iceberg table to demonstrate the advantages of using the Iceberg table format on top of raw Parquet files. The following diagram illustrates our solution architecture.

Architecture that shows the flow from Amazon EMR loading data into Amazon S3, and queried by Amazon Athena through AWS Glue Data Catalog.

To set up and test this solution, we complete the following high-level steps:

  1. Create an S3 bucket.
  2. Create an EMR cluster.
  3. Create an EMR notebook.
  4. Configure a Spark session.
  5. Load data into the Iceberg table.
  6. Query the data in Athena.
  7. Perform a row-level update in Athena.
  8. Perform a schema evolution in Athena.
  9. Perform time travel in Athena.
  10. Consume Iceberg data across Amazon EMR and Athena.

Prerequisites

To follow along with this walkthrough, you must have the following:

  • An AWS Account with a role that has sufficient access to provision the required resources.

Create an S3 bucket

To create an S3 bucket that holds your Iceberg data, complete the following steps:

  1. On the Amazon S3 console, choose Buckets in the navigation pane.
  2. Choose Create bucket.
  3. For Bucket name, enter a name (for this post, we enter aws-lake-house-iceberg-blog-demo).

Because S3 bucket names are globally unique, choose a different name when you create your bucket.

  1. For AWS Region, choose your preferred Region (for this post, we use us-east-1).

Create a new Amazon S3 bucket. Choose us-east-1 as region

  1. Complete the remaining steps to create your bucket.
  2. If this is the first time that you’re using Athena to run queries, create another globally unique S3 bucket to hold your Athena query output.

Create an EMR cluster

Now we’re ready to start an EMR cluster to run Iceberg jobs using Spark.

  1. On the Amazon EMR console, choose Create cluster.
  2. Choose Advanced options.
  3. For Software Configuration, choose your Amazon EMR release version.

Iceberg requires release 6.5.0 and above.

  1. Select JupyterEnterpriseGateway and Spark as the software to install.
  2. For Edit software settings, select Enter configuration and enter [{"classification":"iceberg-defaults","properties":{"iceberg.enabled":true}}].
  3. Leave other settings at their default and choose Next.

Choose Amazon EMR release 6.6.0 and JupyterEnterpriseGateway and Spark. Enter configuration information.

  1. You can change the hardware used by the Amazon EMR cluster in this step. In this demo, we use the default setting.
  2. Choose Next.
  3. For Cluster name, enter Iceberg Spark Cluster.
  4. Leave the remaining settings unchanged and choose Next.

Provide Iceberg Spark Cluster as the Cluster name

  1. You can configure security settings such as adding an EC2 key pair to access your EMR cluster locally. In this demo, we use the default setting.
  2. Choose Create cluster.

You’re redirected to the cluster detail page, where you wait for the EMR cluster to transition from Starting to Waiting.

Create an EMR notebook

When the cluster is active and in the Waiting state, we’re ready to run Spark programs in the cluster. For this demo, we use an EMR notebook to run Spark commands.

  1. On the Amazon EMR console, choose Notebooks in the navigation pane.
  2. Choose Create notebook.
  3. For Notebook name, enter a name (for this post, we enter iceberg-spark-notebook).
  4. For Cluster, select Choose an existing cluster and choose Iceberg Spark Cluster.
  5. For AWS service role, choose Create a new role to create EMR_Notebook_DefaultRole or choose a different role to access resources in the notebook.
  6. Choose Create notebook.

Create an Amazon EMR notebook. Use EMR_Notebooks_DefaultRole

You’re redirected to the notebook detail page.

  1. Choose Open in JupyterLab next to your notebook.
  2. Choose to create a new notebook.
  3. Under Notebook, choose Spark.

Choose Spark from the options provided in the Launcher

Configure a Spark session

In your notebook, run the following code:

%%configure -f
{
  "conf": {
    "spark.sql.catalog.demo": "org.apache.iceberg.spark.SparkCatalog",
    "spark.sql.catalog.demo.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
    "spark.sql.catalog.demo.warehouse": "s3://<your-iceberg-blog-demo-bucket>",
    "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
  }
}

This sets the following Spark session configurations:

  • spark.sql.catalog.demo – Registers a Spark catalog named demo, which uses the Iceberg Spark catalog plugin
  • spark.sql.catalog.demo.catalog-impl – The demo Spark catalog uses AWS Glue as the physical catalog to store Iceberg database and table information
  • spark.sql.catalog.demo.warehouse – The demo Spark catalog stores all Iceberg metadata and data files under the root path s3://<your-iceberg-blog-demo-bucket>
  • spark.sql.extensions – Adds support to Iceberg Spark SQL extensions, which allows you to run Iceberg Spark procedures and some Iceberg-only SQL commands (you use this in a later step)

Load data into the Iceberg table

In our Spark session, run the following commands to load data:

// create a database in AWS Glue named reviews if not exist
spark.sql("CREATE DATABASE IF NOT EXISTS demo.reviews")

// load reviews related to books
val book_reviews_location = "s3://amazon-reviews-pds/parquet/product_category=Books/*.parquet"
val book_reviews = spark.read.parquet(book_reviews_location)

// write book reviews data to an Iceberg v2 table
book_reviews.writeTo("demo.reviews.book_reviews").tableProperty("format-version", "2").createOrReplace()

Iceberg format v2 is needed to support row-level updates and deletes. See Format Versioning for more details.

It may take up to 15 minutes for the commands to complete. When it’s complete, you should be able to see the table on the AWS Glue console, under the reviews database, with the table_type property shown as ICEBERG.

Shows the table properties for book_reviews table

The table schema is inferred from the source Parquet data files. You can also create the table with a specific schema before loading data using Spark SQL, Athena SQL, or Iceberg Java and Python SDKs.

Query in Athena

Navigate to the Athena console and choose Query editor. If this is your first time using the Athena query editor, you need to configure to use the S3 bucket you created earlier to store the query results.

The table book_reviews is available for querying. Run the following query:

SELECT * FROM reviews.book_reviews LIMIT 5;

The following screenshot shows the first five records from the table being displayed.

Amazon Athena query the first 5 rows and show the results

Perform a row-level update in Athena

In the next few steps, let’s focus on a record in the table with review ID RZDVOUQG1GBG7. Currently, it has no total votes when we run the following query:

SELECT total_votes FROM reviews.book_reviews 
WHERE review_id = 'RZDVOUQG1GBG7'

Query total_votes for a particular review which shows a value of 0

Let’s update the total_votes value to 2 using the following query:

UPDATE reviews.book_reviews
SET total_votes = 2
WHERE review_id = 'RZDVOUQG1GBG7'

Update query to set the total_votes for the previous review_id to 2

After your update command runs successfully, run the below query and note the updated result showing a total of two votes:

SELECT total_votes FROM reviews.book_reviews
WHERE review_id = 'RZDVOUQG1GBG7'

Athena enforces ACID transaction guarantee for all the write operations against an Iceberg table. This is done through the Iceberg format’s optimistic locking specification. When concurrent attempts are made to update the same record, a commit conflict occurs. In this scenario, Athena displays a transaction conflict error, as shown in the following screenshot.

Concurrent updates causes a failure. This shows the TRANSACTION_CONFLICT error during this scenario.

Delete queries work in a similar way; see DELETE for more details.

Perform a schema evolution in Athena

Suppose the review suddenly goes viral and gets 10 billion votes:

UPDATE reviews.book_reviews
SET total_votes = 10000000000
WHERE review_id = 'RZDVOUQG1GBG7'

Based on the AWS Glue table information, the total_votes is an integer column. If you try to update a value of 10 billion, which is greater than the maximum allowed integer value, you get an error reporting a type mismatch.

Updating to a very large value greater than maximum allowed integer value results in an error

Iceberg supports most schema evolution features as metadata-only operations, which don’t require a table rewrite. This includes add, drop, rename, reorder column, and promote column types. To solve this issue, you can change the integer column total_votes to a BIGINT type by running the following DDL:

ALTER TABLE reviews.book_reviews
CHANGE COLUMN total_votes total_votes BIGINT;

You can now update the value successfully:

UPDATE reviews.book_reviews
SET total_votes = 10000000000
WHERE review_id = 'RZDVOUQG1GBG7'

Querying the record now gives us the expected result in BIGINT:

SELECT total_votes FROM reviews.book_reviews
WHERE review_id = 'RZDVOUQG1GBG7'

Perform time travel in Athena

In Iceberg, the transaction history is retained, and each transaction commit creates a new version. You can perform time travel to look at a historical version of a table. In Athena, you can use the following syntax to travel to a time that is after when the first version was committed:

SELECT total_votes FROM reviews.book_reviews
FOR SYSTEM_TIME AS OF localtimestamp + interval '-20' minute
WHERE review_id = 'RZDVOUQG1GBG7'

Query an earlier snapshot using time travel feature

Consume Iceberg data across Amazon EMR and Athena

One of the most important features of a data lake is for different systems to seamlessly work together through the Iceberg open-source protocol. After all the operations are performed in Athena, let’s go back to Amazon EMR and confirm that Amazon EMR Spark can consume the updated data.

First, run the same Spark SQL and see if you get the same result for the review used in the example:

val select_votes = """SELECT total_votes FROM demo.reviews.book_reviews
WHERE review_id = 'RZDVOUQG1GBG7'"""

spark.sql(select_votes).show()

Spark shows 10 billion total votes for the review.

Shows the latest value of total_votes when querying using the Amazon EMR notebook

Check the transaction history of the operation in Athena through Spark Iceberg’s history system table:

val select_history = "SELECT * FROM demo.reviews.book_reviews.history"

spark.sql(select_history).show()

This shows three transactions corresponding to the two updates you ran in Athena.

Shows snapshots corresponding to the two updates you ran in Athena

Iceberg offers a variety of Spark procedures to optimize the table. For example, you can run an expire_snapshots procedure to remove old snapshots, and free up storage space in Amazon S3:

import java.util.Calendar
import java.text.SimpleDateFormat

val now = Calendar.getInstance().getTime()
val form = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val now_formatted = form.format(now.getTime())
val procedure = s"""CALL demo.system.expire_snapshots(
  table => 'reviews.book_reviews',
  older_than => TIMESTAMP '$now_formatted',
  retain_last => 1)"""

spark.sql(procedure)

Note that, after running this procedure, time travel can no longer be performed against expired snapshots.

Examine the history system table again and notice that it shows you only the most recent snapshot.

Running the following query in Athena results in an error “No table snapshot found before timestamp…” as older snapshots were deleted, and you are no longer able to time travel to the older snapshot:

SELECT total_votes FROM reviews.book_reviews
FOR SYSTEM_TIME AS OF localtimestamp + interval '-20' minute
WHERE review_id = 'RZDVOUQG1GBG7'

Clean up

To avoid incurring ongoing costs, complete the following steps to clean up your resources:

  1. Run the following code in your notebook to drop the AWS Glue table and database:
// DROP the table 
spark.sql("DROP TABLE demo.reviews.book_reviews") 
// DROP the database 
spark.sql("DROP DATABASE demo.reviews")
  1. On the Amazon EMR console, choose Notebooks in the navigation pane.
  2. Select the notebook iceberg-spark-notebook and choose Delete.
  3. Choose Clusters in the navigation pane.
  4. Select the cluster Iceberg Spark Cluster and choose Terminate.
  5. Delete the S3 buckets and any other resources that you created as part of the prerequisites for this post.

Conclusion

In this post, we showed you an example of using Amazon S3, AWS Glue, Amazon EMR, and Athena to build an Iceberg data lake on AWS. An Iceberg table can seamlessly work across two popular compute engines, and you can take advantage of both to design your customized data production and consumption use cases.

With AWS Glue, Amazon EMR, and Athena, you can already use many features through AWS integrations, such as SageMaker Athena integration for machine learning, or QuickSight Athena integration for dashboard and reporting. AWS Glue also offers the Iceberg connector, which you can use to author and run Iceberg data pipelines.

In addition, Iceberg supports a variety of other open-source compute engines that you can choose from. For example, you can use Apache Flink on Amazon EMR for streaming and change data capture (CDC) use cases. The strong transaction guarantee and efficient row-level update, delete, time travel, and schema evolution experience offered by Iceberg offers a sound foundation and infinite possibilities for users to unlock the power of big data.


About the Authors

Kishore Dhamodaran is a Senior Solutions Architect at AWS. Kishore helps strategic customers with their cloud enterprise strategy and migration journey, leveraging his years of industry and cloud experience.

Jack Ye is a software engineer of the Athena Data Lake and Storage team. He is an Apache Iceberg Committer and PMC member.

Mohit Mehta is a Principal Architect at AWS with expertise in AI/ML and data analytics. He holds 12 AWS certifications and is passionate about helping customers implement cloud enterprise strategies for digital transformation. In his free time, he trains for marathons and plans hikes across major peaks around the world.

Giovanni Matteo Fumarola is the Engineering Manager of the Athena Data Lake and Storage team. He is an Apache Hadoop Committer and PMC member. He has been focusing in the big data analytics space since 2013.

Jared Keating is a Senior Cloud Consultant with AWS Professional Services. Jared assists customers with their cloud infrastructure, compliance, and automation requirements, drawing from his 20+ years of IT experience.

Implement a CDC-based UPSERT in a data lake using Apache Iceberg and AWS Glue

Post Syndicated from Sakti Mishra original https://aws.amazon.com/blogs/big-data/implement-a-cdc-based-upsert-in-a-data-lake-using-apache-iceberg-and-aws-glue/

As the implementation of data lakes and modern data architecture increases, customers’ expectations around its features also increase, which include ACID transaction, UPSERT, time travel, schema evolution, auto compaction, and many more. By default, Amazon Simple Storage Service (Amazon S3) objects are immutable, which means you can’t update records in your data lake because it supports append-only transactions. But there are use cases where you might be receiving incremental updates with change data capture (CDC) from your source systems, and you might need to update existing data in Amazon S3 to have a golden copy. Previously, you had to overwrite the complete S3 object or folders, but with the evolution of frameworks such as Apache Hudi, Apache Iceberg, Delta Lake, and governed tables in AWS Lake Formation, you can get database-like UPSERT features in Amazon S3.

Apache Hudi integration is already supported with AWS analytics services, and recently AWS Glue, Amazon EMR, and Amazon Athena announced support for Apache Iceberg. Apache Iceberg is an open table format originally developed at Netflix, which got open-sourced as an Apache project in 2018 and graduated from incubator mid-2020. It’s designed to support ACID transactions and UPSERT on petabyte-scale data lakes, and is getting popular because of its flexible SQL syntax for CDC-based MERGE, full schema evolution, and hidden partitioning features.

In this post, we walk you through a solution to implement CDC-based UPSERT or MERGE in an S3 data lake using Apache Iceberg and AWS Glue.

Configure Apache Iceberg with AWS Glue

You can integrate Apache Iceberg JARs into AWS Glue through its AWS Marketplace connector. The connector supports AWS Glue versions 1.0, 2.0, and 3.0, and is free to use. Configuring this connector is as easy as clicking few buttons on the user interface.

The following steps guide you through the setup process:

  1. Navigate to the AWS Marketplace connector page.
  2. Choose Continue to Subscribe and then Accept Terms.
  3. Choose Continue to Configuration.
  4. Choose the AWS Glue version and software version.
  5. Choose Continue to Launch.
  6. Choose Usage Instruction, which opens a page that has a link to activate the connector.
  7. Create a connection by providing a name and choosing Create connection and activate connector.

You can confirm your new connection on the AWS Glue Studio Connectors page.

To use this connector, when you create an AWS Glue job, make sure you add this connector to your job. Later in the implementation steps, when you create an AWS Glue job, we show how to use the connector you just configured.

Solution overview

Let’s assume you have a relational database that has product inventory data, and you want to move it into an S3 data lake on a continuous basis, so that your downstream applications or consumers can use it for analytics. After your initial data movement to Amazon S3, you’re supposed to receive incremental updates from the source database as CSV files using AWS DMS or equivalent tools, where each record has an additional column to represent an insert, update, or delete operation. While processing the incremental CDC data, one of the primary requirements you have is merging the CDC data in the data lake and providing the capability to query previous versions of the data.

To solve this use case, we present the following simple architecture that integrates Amazon S3 for the data lake, AWS Glue with the Apache Iceberg connector for ETL (extract, transform, and load), and Athena for querying the data using standard SQL. Athena helps in querying the latest product inventory data from the Iceberg table’s latest snapshot, and Iceberg’s time travel feature helps in identifying a product’s price at any previous date.

The following diagram illustrates the solution architecture.

The solution workflow consists of the following steps:

  • Data ingestion:
    • Steps 1.1 and 1.2 use AWS Database Migration Service (AWS DMS), which connects to the source database and moves incremental data (CDC) to Amazon S3 in CSV format.
    • Steps 1.3 and 1.4 consist of the AWS Glue PySpark job, which reads incremental data from the S3 input bucket, performs deduplication of the records, and then invokes Apache Iceberg’s MERGE statements to merge the data with the target UPSERT S3 bucket.
  • Data access:
    • Steps 2.1 and 2.2 represent Athena integration to query data from the Iceberg table using standard SQL and validate the time travel feature of Iceberg.
  • Data Catalog:
    • The AWS Glue Data Catalog is treated as a centralized catalog, which is used by AWS Glue and Athena. An AWS Glue crawler is integrated on top of S3 buckets to automatically detect the schema.

We have referenced AWS DMS as part of the architecture, but while showcasing the solution steps, we assume that the AWS DMS output is already available in Amazon S3, and focus on processing the data using AWS Glue and Apache Iceberg.

To demo the implementation steps, we use sample product inventory data that has the following attributes:

  • op – Represents the operation on the source record. This shows values I to represent insert operations, U to represent updates, and D to represent deletes. You need to make sure this attribute is included in your CDC incremental data before it gets written to Amazon S3. AWS DMS enables you to include this attribute, but if you’re using other mechanisms to move data, make sure you capture this attribute, so that your ETL logic can take appropriate action while merging it.
  • product_id – This is the primary key column in the source database’s products table.
  • category – This column represents the product’s category, such as Electronics or Cosmetics.
  • product_name – This is the name of the product.
  • quantity_available – This is the quantity available in the inventory for a product. When we showcase the incremental data for UPSERT or MERGE, we reduce the quantity available for the product to showcase the functionality.
  • last_update_time – This is the time when the product record was updated at the source database.

If you’re using AWS DMS to move data from your relational database to Amazon S3, then by default AWS DMS includes the op attribute for incremental CDC data, but it’s not included by default for the initial load. If you’re using CSV as your target file format, you can include IncludeOpForFullLoad as true in your S3 target endpoint setting of AWS DMS to have the op attribute included in your initial full load file. To learn more about the Amazon S3 settings in AWS DMS, refer to S3Settings.

To implement the solution, we create AWS resources such as an S3 bucket and an AWS Glue job, and integrate the Iceberg code for processing. Before we run the AWS Glue job, we have to upload the sample CSV files to the input bucket and process it with AWS Glue PySpark code for the output.

Prerequisites

Before getting started on the implementation, make sure you have the required permissions to perform the following in your AWS account:

  • Create AWS Identity and Access Management (IAM) roles as needed
  • Read or write to an S3 bucket
  • Create and run AWS Glue crawlers and jobs
  • Manage a database, table, and workgroups, and run queries in Athena

For this post, we use the us-east-1 Region, but you can integrate it in your preferred Region if the AWS services included in the architecture are available in that Region.

Now let’s dive into the implementation steps.

Create an S3 bucket for input and output

To create an S3 bucket, complete the following steps:

  1. On the Amazon S3 console, choose Buckets in the navigation pane.
  2. Choose Create bucket.
  3. Specify the bucket name as glue-iceberg-demo, and leave the remaining fields as default.
    S3 bucket names are globally unique. While implementing the solution, you may get an error saying the bucket name already exists. Make sure to provide a unique name and use the same name while implementing the rest of the implementation steps. Formatting the bucket name as <Bucket-Name>-${AWS_ACCOUNT_ID}-${AWS_REGION_CODE} might help you get a unique name.
  4. Choose Create bucket.
  5. On the bucket details page, choose Create folder.
  6. Create two subfolders: raw-csv-input and iceberg-output.
  7. Upload the LOAD00000001.csv file into the raw-csv-input folder of the bucket.

The following screenshot provides a sample of the input dataset.

Create input and output tables using Athena

To create input and output Iceberg tables in the AWS Glue Data Catalog, open the Athena console and run the following queries in sequence:

-- Create database for the demo
CREATE DATABASE iceberg_demo;
-- Create external table in input CSV files. Replace the S3 path with your bucket name
CREATE EXTERNAL TABLE iceberg_demo.raw_csv_input(
  op string, 
  product_id bigint, 
  category string, 
  product_name string, 
  quantity_available bigint, 
  last_update_time string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://glue-iceberg-demo/raw-csv-input/'
TBLPROPERTIES (
  'areColumnsQuoted'='false', 
  'classification'='csv', 
  'columnsOrdered'='true', 
  'compressionType'='none', 
  'delimiter'=',', 
  'typeOfData'='file');
-- Create output Iceberg table with partitioning. Replace the S3 bucket name with your bucket name
CREATE TABLE iceberg_demo.iceberg_output (
  product_id bigint,
  category string,
  product_name string,
  quantity_available bigint,
  last_update_time timestamp) 
PARTITIONED BY (category, bucket(16,product_id)) 
LOCATION 's3://glue-iceberg-demo/iceberg-output/' 
TBLPROPERTIES (
  'table_type'='ICEBERG',
  'format'='parquet',
  'write_target_data_file_size_bytes'='536870912' 
)
-- Validate the input data
SELECT * FROM iceberg_demo.raw_csv_input;

Alternatively, you can integrate an AWS Glue crawler on top of the input to create the table. Next, let’s create the AWS Glue PySpark job to process the input data.

Create the AWS Glue job

Complete the following steps to create an AWS Glue job:

  1. On the AWS Glue console, choose Jobs in the navigation pane.
  2. Choose Create job.
  3. Select Spark script editor.
  4. For Options, select Create a new script with boilerplate code.
  5. Choose Create.
  6. Replace the script with the following script:
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    
    from pyspark.sql.functions import *
    from awsglue.dynamicframe import DynamicFrame
    
    from pyspark.sql.window import Window
    from pyspark.sql.functions import rank, max
    
    from pyspark.conf import SparkConf
    
    args = getResolvedOptions(sys.argv, ['JOB_NAME', 'iceberg_job_catalog_warehouse'])
    conf = SparkConf()
    
    ## Please make sure to pass runtime argument --iceberg_job_catalog_warehouse with value as the S3 path 
    conf.set("spark.sql.catalog.job_catalog.warehouse", args['iceberg_job_catalog_warehouse'])
    conf.set("spark.sql.catalog.job_catalog", "org.apache.iceberg.spark.SparkCatalog")
    conf.set("spark.sql.catalog.job_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
    conf.set("spark.sql.catalog.job_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
    conf.set("spark.sql.iceberg.handle-timestamp-without-timezone","true")
    
    sc = SparkContext(conf=conf)
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args["JOB_NAME"], args)
    
    ## Read Input Table
    IncrementalInputDyF = glueContext.create_dynamic_frame.from_catalog(database = "iceberg_demo", table_name = "raw_csv_input", transformation_ctx = "IncrementalInputDyF")
    IncrementalInputDF = IncrementalInputDyF.toDF()
    
    if not IncrementalInputDF.rdd.isEmpty():
        ## Apply De-duplication logic on input data, to pickup latest record based on timestamp and operation 
        IDWindowDF = Window.partitionBy(IncrementalInputDF.product_id).orderBy(IncrementalInputDF.last_update_time).rangeBetween(-sys.maxsize, sys.maxsize)
                      
        # Add new columns to capture first and last OP value and what is the latest timestamp
        inputDFWithTS= IncrementalInputDF.withColumn("max_op_date",max(IncrementalInputDF.last_update_time).over(IDWindowDF))
        
        # Filter out new records that are inserted, then select latest record from existing records and merge both to get deduplicated output 
        NewInsertsDF = inputDFWithTS.filter("last_update_time=max_op_date").filter("op='I'")
        UpdateDeleteDf = inputDFWithTS.filter("last_update_time=max_op_date").filter("op IN ('U','D')")
        finalInputDF = NewInsertsDF.unionAll(UpdateDeleteDf)
    
        # Register the deduplicated input as temporary table to use in Iceberg Spark SQL statements
        finalInputDF.createOrReplaceTempView("incremental_input_data")
        finalInputDF.show()
        
        ## Perform merge operation on incremental input data with MERGE INTO. This section of the code uses Spark SQL to showcase the expressive SQL approach of Iceberg to perform a Merge operation
        IcebergMergeOutputDF = spark.sql("""
        MERGE INTO job_catalog.iceberg_demo.iceberg_output t
        USING (SELECT op, product_id, category, product_name, quantity_available, to_timestamp(last_update_time) as last_update_time FROM incremental_input_data) s
        ON t.product_id = s.product_id
        WHEN MATCHED AND s.op = 'D' THEN DELETE
        WHEN MATCHED THEN UPDATE SET t.quantity_available = s.quantity_available, t.last_update_time = s.last_update_time 
        WHEN NOT MATCHED THEN INSERT (product_id, category, product_name, quantity_available, last_update_time) VALUES (s.product_id, s.category, s.product_name, s.quantity_available, s.last_update_time)
        """)
    
        job.commit()

  7. On the Job details tab, specify the job name.
  8. For IAM Role, assign an IAM role that has the required permissions to run an AWS Glue job and read and write to the S3 bucket.
  9. For Glue version, choose Glue 3.0.
  10. For Language, choose Python 3.
  11. Make sure Job bookmark has default value of Enable.
  12. Under Connections, choose the Iceberg connector.
  13. Under Job parameters, specify Key as --iceberg_job_catalog_warehouse and Value as your S3 path (e.g. s3://<bucket-name>/<iceberg-warehouse-path>).
  14. Choose Save and then Run, which should write the input data to the Iceberg table with a MERGE statement.

Because the target table is empty in the first run, the Iceberg MERGE statement runs an INSERT statement for all records.

Query the Iceberg table using Athena

After you have successfully run the AWS Glue job, you can validate the output in Athena with the following SQL query:

SELECT * FROM iceberg_demo.iceberg_output limit 10;

The output of the query should match the input, with one difference: The Iceberg output table doesn’t have the op column.

Upload incremental (CDC) data for further processing

After we process the initial full load file, let’s upload the following two incremental files, which include insert, update, and delete records for a few products.

The following is a snapshot of first incremental file (20220302-1134010000.csv).

The following is a snapshot of the second incremental file (20220302-1135010000.csv), which shows that record 102 has another update transaction before the next ETL job processing.

After you upload both incremental files, you should see them in the S3 bucket.

Run the AWS Glue job again to process incremental files

Because we enabled bookmarks on the AWS Glue job, the next job picks up only the two new incremental files and performs a merge operation on the Iceberg table.

To run the job again, complete the following steps:

  • On the AWS Glue console, choose Jobs in the navigation pane.
  • Select the job and choose Run.

As explained earlier, the PySpark script is expected to deduplicate the input data before merging to the target Iceberg table, which means it only picks up the latest record of the 102 product.

For this post, we run the job manually, but you can configure your AWS Glue jobs to run as part of an AWS Glue workflow or via AWS Step Functions (for more information, see Manage AWS Glue Jobs with Step Functions).

Query the Iceberg table using Athena, after incremental data processing

After incremental data processing is complete, you can run the same SELECT statement again and validate that the quantity value is updated for record 102 and product record 103 is deleted.

The following screenshot shows the output.

Query the previous version of data with Iceberg’s time travel feature

You can run the following SQL query in Athena that uses the AS OF TIME statement of Iceberg to query the previous version of the data:

-SELECT * FROM iceberg_demo.iceberg_output FOR SYSTEM_TIME AS OF TIMESTAMP '2022-03-23 18:56:00'

The following screenshot shows the output. As you can see, the quantity value of product ID 102 is 30, which was available during the initial load.

Note that you have to change the AS OF TIMESTAMP value based on your runtime.

This concludes the implementation steps.

Considerations

The following are a few considerations you should keep in mind while integrating Apache Iceberg with AWS Glue:

  • Athena support for Iceberg became generally available recently, so make sure you review the considerations and limitations of using this feature.
  • AWS Glue provides DynamicFrame APIs to read from different source systems and write to different targets. For this post, we integrated Spark DataFrame instead of AWS Glue DynamicFrame because Iceberg’s MERGE statements aren’t supported with AWS Glue DynamicFrame APIs.
    To learn more about AWS integration, refer to Iceberg AWS Integrations.

Conclusion

This post explains how you can use the Apache Iceberg framework with AWS Glue to implement UPSERT on an S3 data lake. It provides an overview of Apache Iceberg, its features and integration approaches, and explains how you can implement it through a step-by-step guide.

I hope this gives you a great starting point for using Apache Iceberg with AWS analytics services and that you can build on top of it to implement your solution.

Appendix: AWS Glue DynamicFrame sample code to interact with Iceberg tables

  • The following code sample demonstrates how you can integrate the DynamicFrame method to read from an Iceberg table:
IcebergDyF = (
    glueContext.create_dynamic_frame.from_options(
        connection_type="marketplace.spark",
        connection_options={
            "path": "job_catalog.iceberg_demo.iceberg_output",
            "connectionName": "Iceberg Connector for Glue 3.0",
        },
        transformation_ctx="IcebergDyF",
    )
)

## Optionally, convert to Spark DataFrame if you plan to leverage Iceberg’s SQL based MERGE statements
InputIcebergDF = IcebergDyF.toDF()
  • The following sample code shows how you can integrate the DynamicFrame method to write to an Iceberg table for append-only mode:
## Use the following 2 lines to convert Spark DataFrame to DynamicFrame, if you plan to leverage DynamicFrame API to write to final target
from awsglue.dynamicframe import DynamicFrame 
finalDyF = DynamicFrame.fromDF(InputIcebergDF,glueContext,"finalDyF")

WriteIceberg = glueContext.write_dynamic_frame.from_options(
    frame= finalDyF,
    connection_type="marketplace.spark",
    connection_options={
        "path": "job_catalog.iceberg_demo.iceberg_output",
        "connectionName": "Iceberg Connector for Glue 3.0",
    },
    format="parquet",
    transformation_ctx="WriteIcebergDyF",
)

About the Author

Sakti Mishra is a Principal Data Lab Solution Architect at AWS, where he helps customers modernize their data architecture and help define end to end data strategy including data security, accessibility, governance, and more. He is also the author of the book Simplify Big Data Analytics with Amazon EMR. Outside of work, Sakti enjoys learning new technologies, watching movies, and visiting places with family.

How GE Proficy Manufacturing Data Cloud replatformed to improve TCO, data SLA, and performance

Post Syndicated from Jyothin Madari original https://aws.amazon.com/blogs/big-data/how-ge-proficy-manufacturing-data-cloud-replatformed-to-improve-tco-data-sla-and-performance/

This is post is co-authored by Jyothin Madari, Madhusudhan Muppagowni and Ayush Srivastava from GE.

GE Proficy Manufacturing Data Cloud (MDC), part of the GE Digital’s Manufacturing Execution Systems (MES) suite of solutions, allows GED’s customers to increase the derived value easily and quickly from the MES by reliably bringing enterprise-wide manufacturing data into the cloud and transforming it into a structured dataset for advanced analytics and deeper insights into the manufacturing processes.

In this post, we share how MDC modernized the hybrid cloud strategy by replatforming. This solution improved scalability, their data availability Service Level Agreement (SLA), and performance.

Challenge

MDC v1 was built on Predix services using industrial use case-optimized Predix services such as Predix Columnar Store (Cassandra) and Predix Insights (Amazon EMR). MDC evolved in both features and the underlying platform over the past year with a goal to improve TCO, data SLA, and performance. MDC’s customer base grew and the number of sites from customers grew to over 100 in the past couple of years. The increased number of sites needed more compute and storage capacity. This increased infrastructure and operational cost significantly, while introducing increased data latency and lowering the data freshness interval from the cloud.

How we started

MDC evaluated several vendors for their storage and compute capabilities using various measurements: security, performance, scalability, ease of management and operation, reduction of overall cost and increase in ROI, partnership, and migration help (technology assistance). The MDC team saw opportunities to improve the product by using native AWS services such as Amazon Redshift, AWS Glue, and Amazon Managed Workflows for Apache Airflow (Amazon MWAA), which made the product more performant and scalable while reducing operation costs and making it future-ready for advanced analytics and new customer use cases.

The GE Digital team, comprised of domain experts, developers, and QA, worked shoulder to shoulder with the AWS ProServe team, comprised of Solution Architects, Data Architects, and Big Data Experts, in determining the key architectural changes required and solutions to implementation challenges.

Overview of solution

The following diagram illustrates the high-level architecture of the solution.

This is a broad overview, and the specifics of networking and security between components are out of scope for this post.

The solution includes the following main steps and components:

  1. CDC and log collector – Compressed CSV data is collected from over 100 Manufacturing Data Sources Proficy Plant Applications and sinked into an Amazon Simple Storage Service (Amazon S3) bucket.
  2. S3 raw bucket – Our data lands in Amazon S3 without any transformation, but appropriately partitioned (tenant, site, date, and so on) for the ease of future processing.
  3. AWS Lambda – When the file lands in the S3 raw bucket, it triggers an S3 event notification, which invokes AWS Lambda. Lambda extracts metadata (bucket name, key name, date, and so on) from the event and saves it in Amazon DynamoDB.
  4. AWS Glue – Our goal is now to take CSV files, with varying schemas, and convert them into Apache Parquet format. An AWS Glue extract, transform, and load (ETL) job reads a list of files to be processed from the DynamoDB table and fetches them from the S3 raw bucket. We have preconfigured unified AVRO schemas in the AWS Glue Schema Registry for schema conversion. Converted data lands in the S3 raw Parquet bucket.
  5. S3 raw Parquet bucket – Data in this bucket is still raw and unmodified; only the format was changed. This intermediary storage is required due to schema and column order mismatch in CSV files.
  6. Amazon Redshift – The majority of transformations and data enrichment happens in this step. Amazon Redshift Spectrum consumes data from the S3 raw Parquet bucket and external PostgreSQL dimension tables (through a federated query). Transformations are performed via stored procedures, where we encapsulate logic for data transformation, data validation, and business-specific logic. The Amazon Redshift cluster is configured with concurrency scaling, auto workload management (WLM) with caching, and the latest RA3 instance types.
  7. MDC API – These custom-built, web-based, REST API microservices talk on the backend with Amazon Redshift and expose data to external users, business intelligence (BI) tools, and partners.
  8. Amazon Redshift data export and archival – On a scheduled basis, Amazon Redshift exports (UNLOAD command) contextualized and business-defined aggregated data. Exports are landed in the S3 bucket as Apache Parquet files.
  9. S3 Parquet export bucket – This bucket stores the exported data (hundreds of TBs) used by external users who need to run extensive, heavy analytics and AI or machine learning (ML) with various tools (such as Amazon EMR, Amazon Athena, Apache Spark, and Dremio).
  10. End-users – External users consume data from the API. The main use case here is reporting and visual analytics.
  11. Amazon MWAA – The orchestrator of the solution, Amazon MWAA is used for scheduling Amazon Redshift stored procedures, AWS Glue ETL jobs, and Amazon Redshift exports at regular intervals with error handling and retries built in.

Bringing it all together

MDC replaced both Predix Columnar Store (Cassandra) and Predix Insights (Amazon EMR) with Amazon Redshift for both storage of the MDC data models and compute (ELT). Amazon MWAA is used to schedule the workloads that do the bulk of the ELT. Lambda, AWS Glue, and DynamoDB are used to normalize the schema differences between sites. It was important not to disrupt MDC customers while replatforming. To achieve this, MDC used a phased approach to migrate the data models to Amazon Redshift. They used federated queries to query existing PostgreSQL for dimensional data, which facilitated having some of the data models in Amazon Redshift, while the others were in Cassandra with no interruption to MDC customers. Redshift Spectrum facilitated querying the raw data in Amazon S3 directly both for ETL and data validation.

75% of the MDC team along with the AWS ProServe team and AWS Solution Architects collaborated with the GE Digital Security Team and Platform Team to implement the architecture with AWS native services. It took approximately 9 months to implement, secure, and performance tune the architecture and migrate data models in three phases. Each phase has gone through a GE Digital internal security review. Amazon Redshift Auto WLM, short query acceleration, and tuning the sort keys to optimize querying patterns improved the Proficy MDC API performance. Because the unload of the data from Amazon Redshift was fast, Proficy MDC is now able to export the data much more frequently to our end customers.

Conclusion

With replatforming, Proficy MDC was able to improve ETL performance by approximately 75%. Data latency and freshness improved by approximately 87%. The solution reduced TCO of the platform by approximately 50%. Proficy MDC was also able reduce the infrastructure and operational cost. Improved performance and reduced latency has allowed us to speed up the next steps in our journey to modernize the enterprise data architecture and hybrid cloud data platform.


About the Authors

Jyothin Madari leads the Manufacturing Data Cloud (MDC) engineering team; part of the manufacturing suite of products at GE Digital. He has 18 years of experience, 4 of which is with GE Digital. Most recently he has been working on data migration projects with an aim to reduce costs and improve performance. He is an AWS Certified Cloud Practitioner, a keen learner and loves solving interesting problems. Connect with him on LinkedIn.

Madhusudhan (Madhu) Muppagowni is a Technical Architect and Principal Software Developer based in Silicon Valley, Bay Area, California.  He is passionate about Software Development and Architecture. He thrives on producing Well-Architected and Secure SaaS Products, Data Pipelines that can make a real impact.  He loves outdoors and an avid hiker and backpacker. Connect with him on LinkedIn.

Ayush Srivastava is a Senior Staff Engineer and Technical Anchor based in Hyderabad, India. He is passionate about Software Development and Architecture. He has Demonstrated track record of successfully technical anchoring small to large Secure SaaS Products, Data Pipelines from start to finish. He loves exploring different places and he says “I’m in love with cities I have never been to and people I have never met.” Connect with him on LinkedIn.

Karen Grygoryan is Data Architect with AWS ProServe. Connect with him on LinkedIn.

Gnanasekaran Kailasam is a Data Architect at AWS. He has worked with building data warehouses and big data solutions for over 16 years. He loves to learn new technologies and solving, automating, and simplifying customer problems with easy-to-use cloud data solutions on AWS. Connect with him on LinkedIn.

Optimize Federated Query Performance using EXPLAIN and EXPLAIN ANALYZE in Amazon Athena

Post Syndicated from Nishchai JM original https://aws.amazon.com/blogs/big-data/optimize-federated-query-performance-using-explain-and-explain-analyze-in-amazon-athena/

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon Simple Storage Service (Amazon S3) using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. In 2019, Athena added support for federated queries to run SQL queries across data stored in relational, non-relational, object, and custom data sources.

In 2021, Athena added support for the EXPLAIN statement, which can help you understand and improve the efficiency of your queries. The EXPLAIN statement provides a detailed breakdown of a query’s run plan. You can analyze the plan to identify and reduce query complexity and improve its runtime. You can also use EXPLAIN to validate SQL syntax prior to running the query. Doing so helps prevent errors that would have occurred while running the query.

Athena also added EXPLAIN ANALYZE, which displays the computational cost of your queries alongside their run plans. Administrators can benefit from using EXPLAIN ANALYZE because it provides a scanned data count, which helps you reduce financial impact due to user queries and apply optimizations for better cost control.

In this post, we demonstrate how to use and interpret EXPLAIN and EXPLAIN ANALYZE statements to improve Athena query performance when querying multiple data sources.

Solution overview

To demonstrate using EXPLAIN and EXPLAIN ANALYZE statements, we use the following services and resources:

Athena uses the AWS Glue Data Catalog to store and retrieve table metadata for the Amazon S3 data in your AWS account. The table metadata lets the Athena query engine know how to find, read, and process the data that you want to query. We use Athena data source connectors to connect to data sources external to Amazon S3.

Prerequisites

To deploy the CloudFormation template, you must have the following:

Provision resources with AWS CloudFormation

To deploy the CloudFormation template, complete the following steps:

  1. Choose Launch Stack:

  1. Follow the prompts on the AWS CloudFormation console to create the stack.
  2. Note the key-value pairs on the stack’s Outputs tab.

You use these values when configuring the Athena data source connectors.

The CloudFormation template creates the following resources:

  • S3 buckets to store data and act as temporary spill buckets for Lambda
  • AWS Glue Data Catalog tables for the data in the S3 buckets
  • A DynamoDB table and Amazon RDS for MySQL tables, which are used to join multiple tables from different sources
  • A VPC, subnets, and endpoints, which are needed for Amazon RDS for MySQL and DynamoDB

The following figure shows the high-level data model for the data load.

Create the DynamoDB data source connector

To create the DynamoDB connector for Athena, complete the following steps:

  1. On the Athena console, choose Data sources in the navigation pane.
  2. Choose Create data source.
  3. For Data sources, select Amazon DynamoDB.
  4. Choose Next.

  1. For Data source name, enter DDB.

  1. For Lambda function, choose Create Lambda function.

This opens a new tab in your browser.

  1. For Application name, enter AthenaDynamoDBConnector.
  2. For SpillBucket, enter the value from the CloudFormation stack for AthenaSpillBucket.
  3. For AthenaCatalogName, enter dynamodb-lambda-func.
  4. Leave the remaining values at their defaults.
  5. Select I acknowledge that this app creates custom IAM roles and resource policies.
  6. Choose Deploy.

You’re returned to the Connect data sources section on the Athena console.

  1. Choose the refresh icon next to Lambda function.
  2. Choose the Lambda function you just created (dynamodb-lambda-func).

  1. Choose Next.
  2. Review the settings and choose Create data source.
  3. If you haven’t already set up the Athena query results location, choose View settings on the Athena query editor page.

  1. Choose Manage.
  2. For Location of query result, browse to the S3 bucket specified for the Athena spill bucket in the CloudFormation template.
  3. Add Athena-query to the S3 path.
  4. Choose Save.

  1. In the Athena query editor, for Data source, choose DDB.
  2. For Database, choose default.

You can now explore the schema for the sportseventinfo table; the data is the same in DynamoDB.

  1. Choose the options icon for the sportseventinfo table and choose Preview Table.

Create the Amazon RDS for MySQL data source connector

Now let’s create the connector for Amazon RDS for MySQL.

  1. On the Athena console, choose Data sources in the navigation pane.
  2. Choose Create data source.
  3. For Data sources, select MySQL.
  4. Choose Next.

  1. For Data source name, enter MySQL.

  1. For Lambda function, choose Create Lambda function.

  1. For Application name, enter AthenaMySQLConnector.
  2. For SecretNamePrefix, enter AthenaMySQLFederation.
  3. For SpillBucket, enter the value from the CloudFormation stack for AthenaSpillBucket.
  4. For DefaultConnectionString, enter the value from the CloudFormation stack for MySQLConnection.
  5. For LambdaFunctionName, enter mysql-lambda-func.
  6. For SecurityGroupIds, enter the value from the CloudFormation stack for RDSSecurityGroup.
  7. For SubnetIds, enter the value from the CloudFormation stack for RDSSubnets.
  8. Select I acknowledge that this app creates custom IAM roles and resource policies.
  9. Choose Deploy.

  1. On the Lambda console, open the function you created (mysql-lambda-func).
  2. On the Configuration tab, under Environment variables, choose Edit.

  1. Choose Add environment variable.
  2. Enter a new key-value pair:
    • For Key, enter MYSQL_connection_string.
    • For Value, enter the value from the CloudFormation stack for MySQLConnection.
  3. Choose Save.

  1. Return to the Connect data sources section on the Athena console.
  2. Choose the refresh icon next to Lambda function.
  3. Choose the Lambda function you created (mysql-lamdba-function).

  1. Choose Next.
  2. Review the settings and choose Create data source.
  3. In the Athena query editor, for Data Source, choose MYSQL.
  4. For Database, choose sportsdata.

  1. Choose the options icon by the tables and choose Preview Table to examine the data and schema.

In the following sections, we demonstrate different ways to optimize our queries.

Optimal join order using EXPLAIN plan

A join is a basic SQL operation to query data on multiple tables using relations on matching columns. Join operations affect how much data is read from a table, how much data is transferred to the intermediate stages through networks, and how much memory is needed to build up a hash table to facilitate a join.

If you have multiple join operations and these join tables aren’t in the correct order, you may experience performance issues. To demonstrate this, we use the following tables from difference sources and join them in a certain order. Then we observe the query runtime and improve performance by using the EXPLAIN feature from Athena, which provides some suggestions for optimizing the query.

The CloudFormation template you ran earlier loaded data into the following services:

AWS Storage Table Name Number of Rows
Amazon DynamoDB sportseventinfo 657
Amazon S3 person 7,025,585
Amazon S3 ticketinfo 2,488

Let’s construct a query to find all those who participated in the event by type of tickets. The query runtime with the following join took approximately 7 mins to complete:

SELECT t.id AS ticket_id, 
e.eventid, 
p.first_name 
FROM 
"DDB"."default"."sportseventinfo" e, 
"AwsDataCatalog"."athenablog"."person" p, 
"AwsDataCatalog"."athenablog"."ticketinfo" t 
WHERE 
t.sporting_event_id = cast(e.eventid as double) 
AND t.ticketholder_id = p.id

Now let’s use EXPLAIN on the query to see its run plan. We use the same query as before, but add explain (TYPE DISTRIBUTED):

EXPLAIN (TYPE DISTRIBUTED)
SELECT t.id AS ticket_id, 
e.eventid, 
p.first_name 
FROM 
"DDB"."default"."sportseventinfo" e, 
"AwsDataCatalog"."athenablog"."person" p, 
"AwsDataCatalog"."athenablog"."ticketinfo" t 
WHERE 
t.sporting_event_id = cast(e.eventid as double) 
AND t.ticketholder_id = p.id

The following screenshot shows our output

Notice the cross-join in Fragment 1. The joins are converted to a Cartesian product for each table, where every record in a table is compared to every record in another table. Therefore, this query takes a significant amount of time to complete.

To optimize our query, we can rewrite it by reordering the joining tables as sportseventinfo first, ticketinfo second, and person last. The reason for this is because the WHERE clause, which is being converted to a JOIN ON clause during the query plan stage, doesn’t have the join relationship between the person table and sportseventinfo table. Therefore, the query plan generator converted the join type to cross-joins (a Cartesian product), which less efficient. Reordering the tables aligns the WHERE clause to the INNER JOIN type, which satisfies the JOIN ON clause and runtime is reduced from 7 minutes to 10 seconds.

The code for our optimized query is as follows:

SELECT t.id AS ticket_id, 
e.eventid, 
p.first_name 
FROM 
"DDB"."default"."sportseventinfo" e, 
"AwsDataCatalog"."athenablog"."ticketinfo" t, 
"AwsDataCatalog"."athenablog"."person" p 
WHERE 
t.sporting_event_id = cast(e.eventid as double) 
AND t.ticketholder_id = p.id

The following is the EXPLAIN output of our query after reordering the join clause:

EXPLAIN (TYPE DISTRIBUTED) 
SELECT t.id AS ticket_id, 
e.eventid, 
p.first_name 
FROM 
"DDB"."default"."sportseventinfo" e, 
"AwsDataCatalog"."athenablog"."ticketinfo" t, 
"AwsDataCatalog"."athenablog"."person" p 
WHERE t.sporting_event_id = cast(e.eventid as double) 
AND t.ticketholder_id = p.id

The following screenshot shows our output.

The cross-join changed to INNER JOIN with join on columns (eventid, id, ticketholder_id), which results in the query running faster. Joins between the ticketinfo and person tables converted to the PARTITION distribution type, where both left and right tables are hash-partitioned across all worker nodes due to the size of the person table. The join between the sportseventinfo table and ticketinfo are converted to the REPLICATED distribution type, where one table is hash-partitioned across all worker nodes and the other table is replicated to all worker nodes to perform the join operation.

For more information about how to analyze these results, refer to Understanding Athena EXPLAIN statement results.

As a best practice, we recommend having a JOIN statement along with an ON clause, as shown in the following code:

SELECT t.id AS ticket_id, 
e.eventid, 
p.first_name 
FROM 
"AwsDataCatalog"."athenablog"."person" p 
JOIN "AwsDataCatalog"."athenablog"."ticketinfo" t ON t.ticketholder_id = p.id 
JOIN "ddb"."default"."sportseventinfo" e ON t.sporting_event_id = cast(e.eventid as double)

Also as a best practice when you join two tables, specify the larger table on the left side of join and the smaller table on the right side of the join. Athena distributes the table on the right to worker nodes, and then streams the table on the left to do the join. If the table on the right is smaller, then less memory is used and the query runs faster.

In the following sections, we present examples of how to optimize pushdowns for filter predicates and projection filter operations for the Athena data source using EXPLAIN ANALYZE.

Pushdown optimization for the Athena connector for Amazon RDS for MySQL

A pushdown is an optimization to improve the performance of a SQL query by moving its processing as close to the data as possible. Pushdowns can drastically reduce SQL statement processing time by filtering data before transferring it over the network and filtering data before loading it into memory. The Athena connector for Amazon RDS for MySQL supports pushdowns for filter predicates and projection pushdowns.

The following table summarizes the services and tables we use to demonstrate a pushdown using Aurora MySQL.

Table Name Number of Rows Size in KB
player_partitioned 5,157 318.86
sport_team_partitioned 62 5.32

We use the following query as an example of a filtering predicate and projection filter:

SELECT full_name,
name 
FROM "sportsdata"."player_partitioned" a 
JOIN "sportsdata"."sport_team_partitioned" b ON a.sport_team_id=b.id 
WHERE a.id='1.0'

This query selects the players and their team based on their ID. It serves as an example of both filter operations in the WHERE clause and projection because it selects only two columns.

We use EXPLAIN ANALYZE to get the cost for the running this query:

EXPLAIN ANALYZE 
SELECT full_name,
name 
FROM "MYSQL"."sportsdata"."player_partitioned" a 
JOIN "MYSQL"."sportsdata"."sport_team_partitioned" b ON a.sport_team_id=b.id 
WHERE a.id='1.0'

The following screenshot shows the output in Fragment 2 for the table player_partitioned, in which we observe that the connector has a successful pushdown filter on the source side, so it tries to scan only one record out of the 5,157 records in the table. The output also shows that the query scan has only two columns (full_name as the projection column and sport_team_id and the join column), and uses SELECT and JOIN, which indicates the projection pushdown is successful. This helps reduce the data scan when using Athena data source connectors.

Now let’s look at the conditions in which a filter predicate pushdown doesn’t work with Athena connectors.

LIKE statement in filter predicates

We start with the following example query to demonstrate using the LIKE statement in filter predicates:

SELECT * 
FROM "MYSQL"."sportsdata"."player_partitioned" 
WHERE first_name LIKE '%Aar%'

We then add EXPLAIN ANALYZE:

EXPLAIN ANALYZE 
SELECT * 
FROM "MYSQL"."sportsdata"."player_partitioned" 
WHERE first_name LIKE '%Aar%'

The EXPLAIN ANALYZE output shows that the query performs the table scan (scanning the table player_partitioned, which contains 5,157 records) for all the records even though the WHERE clause only has 30 records matching the condition %Aar%. Therefore, the data scan shows the complete table size even with the WHERE clause.

We can optimize the same query by selecting only the required columns:

EXPLAIN ANALYZE 
SELECT sport_team_id,
full_name 
FROM "MYSQL"."sportsdata"."player_partitioned" 
WHERE first_name LIKE '%Aar%'

From the EXPLAIN ANALYZE output, we can observe that the connector supports the projection filter pushdown, because we select only two columns. This brought the data scan size down to half of the table size.

OR statement in filter predicates

We start with the following query to demonstrate using the OR statement in filter predicates:

SELECT id,
first_name 
FROM "MYSQL"."sportsdata"."player_partitioned" 
WHERE first_name = 'Aaron' OR id ='1.0'

We use EXPLAIN ANALYZE with the preceding query as follows:

EXPLAIN ANALYZE 
SELECT * 
FROM 
"MYSQL"."sportsdata"."player_partitioned" 
WHERE first_name = 'Aaron' OR id ='1.0'

Similar to the LIKE statement, the following output shows that query scanned the table instead of pushing down to only the records that matched the WHERE clause. This query outputs only 16 records, but the data scan indicates a complete scan.

Pushdown optimization for the Athena connector for DynamoDB

For our example using the DynamoDB connector, we use the following data:

Table Number of Rows Size in KB
sportseventinfo 657 85.75

Let’s test the filter predicate and project filter operation for our DynamoDB table using the following query. This query tries to get all the events and sports for a given location. We use EXPLAIN ANALYZE for the query as follows:

EXPLAIN ANALYZE 
SELECT EventId,
Sport 
FROM "DDB"."default"."sportseventinfo" 
WHERE Location = 'Chase Field'

The output of EXPLAIN ANALYZE shows that the filter predicate retrieved only 21 records, and the project filter selected only two columns to push down to the source. Therefore, the data scan for this query is less than the table size.

Now let’s see where filter predicate pushdown doesn’t work. In the WHERE clause, if you apply the TRIM() function to the Location column and then filter, predicate pushdown optimization doesn’t apply, but we still see the projection filter optimization, which does apply. See the following code:

EXPLAIN ANALYZE 
SELECT EventId,
Sport 
FROM "DDB"."default"."sportseventinfo" 
WHERE trim(Location) = 'Chase Field'

The output of EXPLAIN ANALYZE for this query shows that the query scans all the rows but is still limited to only two columns, which shows that the filter predicate doesn’t work when the TRIM function is applied.

We’ve seen from the preceding examples that the Athena data source connector for Amazon RDS for MySQL and DynamoDB do support filter predicates and projection predicates for pushdown optimization, but we also saw that operations such as LIKE, OR, and TRIM when used in the filter predicate don’t support pushdowns to the source. Therefore, if you encounter unexplained charges in your federated Athena query, we recommend using EXPLAIN ANALYZE with the query and determine whether your Athena connector supports the pushdown operation or not.

Please note that running EXPLAIN ANALYZE incurs cost because it scans the data.

Conclusion

In this post, we showcased how to use EXPLAIN and EXPLAIN ANALYZE to analyze Athena SQL queries for data sources on AWS S3 and Athena federated SQL query for data source like DynamoDB and Amazon RDS for MySQL. You can use this as an example to optimize queries which would also result in cost savings.


About the Authors

Nishchai JM is an Analytics Specialist Solutions Architect at Amazon Web services. He specializes in building Big-data applications and help customer to modernize their applications on Cloud. He thinks Data is new oil and spends most of his time in deriving insights out of the Data.

Varad Ram is Senior Solutions Architect in Amazon Web Services. He likes to help customers adopt to cloud technologies and is particularly interested in artificial intelligence. He believes deep learning will power future technology growth. In his spare time, he like to be outdoor with his daughter and son.

Simplify and optimize Python package management for AWS Glue PySpark jobs with AWS CodeArtifact

Post Syndicated from Ashok Padmanabhan original https://aws.amazon.com/blogs/big-data/simplify-and-optimize-python-package-management-for-aws-glue-pyspark-jobs-with-aws-codeartifact/

Data engineers use various Python packages to meet their data processing requirements while building data pipelines with AWS Glue PySpark Jobs. Languages like Python and Scala are commonly used in data pipeline development. Developers can take advantage of their open-source packages or even customize their own to make it easier and faster to perform use cases, such as data manipulation and analysis. However, managing standardized packages can be cumbersome with multiple teams using different versions of packages, installing non-approved packages, and causing duplicate development effort due to the lack of visibility of what is available at the enterprise level. This can be especially challenging in large enterprises with multiple data engineering teams.

ETL Developers have requirements to use additional packages for their AWS Glue ETL jobs. With security being job zero for customers, many will restrict egress traffic from their VPC to the public internet, and they need a way to manage the packages used by applications including their data processing pipelines.

Our proposed solution will enable you with network egress restrictions to manage packages centrally with AWS CodeArtifact and use their favorite libraries in their AWS Glue ETL PySpark code. In this post, we’ll describe how CodeArtifact can be used for managing packages and modules for AWS Glue ETL jobs, and we’ll demo a solution using Glue PySpark jobs that run within VPC Subnets that have no internet access.

Solution overview

The solution uses CodeArtifact as a tool to make it easier for organizations of any size to securely store, publish, and share software packages used in their ETL with AWS Glue. VPC Endpoints will be enabled for CodeArtifact and Glue to enable private link connections. AWS Step Functions makes it easy to coordinate the orchestration of components used in the data processing pipeline. Native integrations with both CodeArtifact and AWS Glue enable the workflow to both authenticate the request to CodeArtifact and start the AWS Glue ETL job.

The following architecture shows an implementation of a solution using AWS Glue, CodeArtifact, and Step Functions to use additional Python modules without egress internet access. The solution is deployed using AWS Cloud Development Kit (AWS CDK), an open-source software development framework to define your cloud application resources using familiar programming languages.

Solution Architecture for the blog post

Fig 1: Architecture Diagram for the Solution

To illustrate how to set up this architecture, we’ll walk you through the following steps:

  1. Deploying an AWS CDK stack to provision the following AWS Resources
    1. CodeArtifact
    2. An AWS Glue job
    3. Step Functions workflow
    4. Amazon Simple Storage Service (Amazon S3) bucket
    5. A VPC with a private Subnet and VPC Endpoints to Amazon S3 and CodeArtifact
  2. Validate the Deployment.
  3. Run a Sample Workflow – This workflow will run an AWS Glue PySpark job that uses a custom Python library, and an upgraded version of boto3.
  4. Cleaning up your resources.

Prerequisites

Make sure that you complete the following steps as prerequisites:

The solution

Launching your AWS CDK Stack

Step 1: Using your device’s command line, check out our Git repository to a local directory on your device:

git clone https://github.com/aws-samples/python-lib-management-without-internet-for-aws-glue-in-private-subnets.git

Step 2: Change directories to the new directory Amazon S3 script location:

cd python-lib-management-without-internet-for-aws-glue-in-private-subnets/scripts/s3

Step 3: Download the following CSV, which contains New York City Taxi and Limousine Commission (TLC) Trip weekly trips. This will serve as the input source for the AWS Glue Job:

aws s3 cp s3://nyc-tlc/misc/FOIL_weekly_trips_apps.csv .

Step 4: Change the directories to the path where the app.py file is located (in reference to the previous step, execute the following step):

cd ../..

Step 5: Create a virtual environment:

macOS/Linux:
python3 -m venv .env

Windows:
python -m venv .env

Step 6: Activate the virtual environment after the init process completes and the virtual environment is created:

macOS/Linux:
source .env/bin/activate

Windows:
.env\Scripts\activate.bat

Step 7: Install the required dependencies:

pip3 install -r requirements.txt

Step 8: Make sure that your AWS profile is setup along with the region that you want to deploy as mentioned in the prerequisite. Synthesize the templates. AWS CDK apps use code to define the infrastructure, and when run they produce or “synthesize” a CloudFormation template for each stack defined in the application:

cdk synthesize

Step 9: BootStrap the cdk app using the following command:

cdk bootstrap aws://<AWS_ACCOUNTID>/<AWS_REGION>

Replace the place holder AWS_ACCOUNTID and AWS_REGION with your AWS account ID and the region to be deployed.

This step provisions the initial resources, including an Amazon S3 bucket for storing files and IAM roles that grant permissions needed to perform deployments.

Step 10: Deploy the solution. By default, some actions that could potentially make security changes require approval. In this deployment, you’re creating an IAM role. The following command overrides the approval prompts, but if you would like to manually accept the prompts, then omit the --require-approval never flag:

cdk deploy "*" --require-approval never

While the AWS CDK deploys the CloudFormation stacks, you can follow the deployment progress in your terminal:

AWS CDK Deployment progress in terminal

Fig 2: AWS CDK Deployment progress in terminal

Once the deployment is successful, you’ll see the successful status as follows:

AWS CDK Deployment completion success

Fig 3: AWS CDK Deployment completion success

Step 11: Log in to the AWS Console, go to CloudFormation, and see the output of the ApplicationStack stack:

AWS CloudFormation stack output

Fig 4: AWS CloudFormation stack output

Note the values of the DomainName and RepositoryName variables. We’ll use them in the next step to upload our artifacts

Step 12: We will upload a custom library into the repo that we created. This will be used by our Glue ETL job.

  • Install twine using pip:
python3 -m pip install twine

The custom python package glueutils-0.2.0.tar.gz can be found under this folder of the cloned repo:

cd scripts/custom_glue_library
  • Configure twine with the login command (additional details here ). Refer to step 11 for the DomainName and RepositoryName from the CloudFormation output:
aws codeartifact login --tool twine --domain <DomainName> --domain-owner <AWS_ACCOUNTID> --repository <RepositoryName>
  • Publish Python package assets:
twine upload --repository codeartifact glueutils-0.2.0.tar.gz
Python package publishing using twine

Fig 5: Python package publishing using twine

Validate the Deployment

The AWS CDK stack will deploy the following AWS resources:

  1. Amazon Virtual Private Cloud (Amazon VPC)
    1. One Private Subnet
  2. AWS CodeArtifact
    1. CodeArtifact Repository
    2. CodeArtifact Domain
    3. CodeArtifact Upstream Repository
  3. AWS Glue
    1. AWS Glue Job
    2. AWS Glue Database
    3. AWS Glue Connection
  4. AWS Step Function
  5. Amazon S3 Bucket for AWS CDK and also for storing scripts and CSV file
  6. IAM Roles and Policies
  7. Amazon Elastic Compute Cloud (Amazon EC2) Security Group

Step 1: Browse to the AWS account and region via the AWS Console to which the resources are deployed.

Step 2: Browse the Subnet page (https://<region> .console.aws.amazon.com/vpc/home?region=<region> #subnets:) (*Replace region with actual AWS Region to which your resources are deployed)

Step 3: Select the Subnet with name as ApplicationStack/enterprise-repo-vpc/Enterprise-Repo-Private-Subnet1

Step 4: Select the Route Table and validate that there are no Internet Gateway or NAT Gateway for routes to Internet, and that it’s similar to the following image:

Route table validation

Fig 6: Route table validation

Step 5: Navigate to the CodeArtifact console and review the repositories created. The enterprise-repo is your local repository, and pypi-store is the upstream repository connected to the PyPI, providing artifacts from pypi.org.

AWS CodeArifact repositories created

Fig 7: AWS CodeArifact repositories created

Step 6: Navigate to enterprise-repo and search for glueutils. This is the custom python package that we published.

AWS CodeArifact custom python package published

Fig 8: AWS CodeArifact custom python package published

Step 7: Navigate to Step Functions Console and review the enterprise-repo-step-function as follows:

AWS Step Functions workflow

Fig 9: AWS Step Functions workflow

The diagram shows how the Step Functions workflow will orchestrate the pattern.

  1. The first step CodeArtifactGetAuthorizationToken calls the getAuthorizationToken API to generate a temporary authorization token for accessing repositories in the domain (this token is valid for 15 mins.).
  2. The next step GenerateCodeArtifactURL takes the authorization token from the response and generates the CodeArtifact URL.
  3. Then, this will move into the GlueStartJobRun state, which makes a synchronous API call to run the AWS Glue job.

Step 8: Navigate to the AWS Glue Console and select the Jobs tab, then select enterprise-repo-glue-job.

The AWS Glue job is created with the following script and AWS Glue Connection enterprise-repo-glue-connection. The AWS Glue connection is a Data Catalog object that enables the job to connect to sources and APIs from within the VPC. The network type connection runs the job from within the private subnet to make requests to Amazon S3 and CodeArtifact over the VPC endpoint connection. This enables the job to run without any traffic through the internet.

Note the connections section in the AWS Glue PySpark Job, which makes the Glue job run on the private subnet in the VPC provisioned.

AWS Glue network connections

Fig 10: AWS Glue network connections

The job takes an Amazon S3 bucket, Glue Database, Python Job Installer Option, and Additional Python Modules as job parameters. The parameters --additional-python-modules and --python-modules-installer-option are passed to install the selected Python module from a PyPI repository hosted in AWS CodeArtifact.

The script itself first reads the Amazon S3 input path of the taxi data in the CSV format. A light transformation to sum the total trips by year, week, and app is performed. Then the output is written to an Amazon S3 path as parquet . A partitioned table in the AWS Glue Data Catalog will either be created or updated if it already exists .

You can find the Glue PySpark script here.

Run a sample workflow

The following steps will demonstrate how to run a sample workflow:

Step 1: Navigate to the Step Functions Console and select the enterprise-repo-step-function.

Step 2: Select Start execution and input the following: We’re including the glueutils and latest boto3 libraries as part of the job run. It is always recommended to pin your python dependencies to avoid any breaking change due to a future version of dependency . In the below example, the latest available version of boto3, and the 0.2.0 version of glueutils will be installed. To pin it to a specific release you may add  boto3==1.24.2   (Current latest release at the time of publishing this post).

{"pythonmodules": "boto3,glueutils==0.2.0"}

Step 3: Select Start execution and wait until Execution Status is Succeeded. This may take a few minutes.

Step 4: Navigate to the CodeArtifact Console to review the enterprise-repo repository. You’ll see the cached PyPi packages and all of their dependencies pulled down from PyPi.

Step 5: In the Glue Console under the Runs section of the enterprise-glue-job, you’ll see the parameters passed:

Fig 11 : AWS Glue job execution history

Fig 11 : AWS Glue job execution history

Note the --index-url which was passed as a parameter to the glue ETL job. The token is valid only for 15 minutes.

Step 6: Navigate to the Amazon CloudWatch Console and go to the /aws/glue-jobs log group to verify that the packages were installed from the local repo.

You will see that the 2 package names passed as parameters are installed with the corresponding versions.

Fig 12 : Amazon CloudWatch logs details for the Glue job

Fig 12 : Amazon CloudWatch logs details for the Glue job

Step 7: Navigate to the Amazon Athena console and select Query Editor.

Step 8: Run the following query to validate the output of the AWS Glue job:

SELECT year, app, SUM(total_trips) as sum_of_total_trips 
FROM 
"codeartifactblog_glue_db"."taxidataparquet" 
GROUP BY year, app;

Clean up

Make sure that you clean up all of the other AWS resources that you created in the AWS CDK Stack deployment. You can delete these resources via the AWS CDK Destroy command as follows or the CloudFormation console.

To destroy the resources using AWS CDK, follow these steps:

  1. Follow Steps 1-6 from the ‘Launching your CDK Stack’ section.
  2. Destroy the app by executing the following command:
    cdk destroy

Conclusion

In this post, we demonstrated how CodeArtifact can be used for managing Python packages and modules for AWS Glue jobs that run within VPC Subnets that have no internet access. We also demonstrated how the versions of existing packages can be updated (i.e., boto3) and a custom Python library (glueutils) that is developed locally is also managed through CodeArtifact.

This post enables you to use your favorite Python packages with AWS Glue ETL PySpark jobs by modifying the input to the AWS StepFunctions workflow (Step 2 in the Run a Sample workflow section).


About the Authors

Bret Pontillo is a Data & ML Engineer with AWS Professional Services. He works closely with enterprise customers building data lakes and analytical applications on the AWS platform. In his free time, Bret enjoys traveling, watching sports, and trying new restaurants.

Gaurav Gundal is a DevOps consultant with AWS Professional Services, helping customers build solutions on the customer platform. When not building, designing, or developing solutions, Gaurav spends time with his family, plays guitar, and enjoys traveling to different places.

Ashok Padmanabhan is a Sr. IOT Data Architect with AWS Professional Services, helping customers build data and analytics platform and solutions. When not helping customers build and design data lakes, Ashok enjoys spending time at the beach near his home in Florida.

A serverless operational data lake for retail with AWS Glue, Amazon Kinesis Data Streams, Amazon DynamoDB, and Amazon QuickSight

Post Syndicated from Gandhi Raketla original https://aws.amazon.com/blogs/big-data/a-serverless-operational-data-lake-for-retail-with-aws-glue-amazon-kinesis-data-streams-amazon-dynamodb-and-amazon-quicksight/

Do you want to reduce stockouts at stores? Do you want to improve order delivery timelines? Do you want to provide your customers with accurate product availability, down to the millisecond? A retail operational data lake can help you transform the customer experience by providing deeper insights into a variety of operational aspects of your supply chain.

In this post, we demonstrate how to create a serverless operational data lake using AWS services, including AWS Glue, Amazon Kinesis Data Streams, Amazon DynamoDB, Amazon Athena, and Amazon QuickSight.

Retail operations is a critical functional area that gives retailers a competitive edge. An efficient retail operation can optimize the supply chain for a better customer experience and cost reduction. An optimized retail operation can reduce frequent stockouts and delayed shipments, and provide accurate inventory and order details. Today, a retailer’s channels aren’t just store and web—they include mobile apps, chatbots, connected devices, and social media channels. The data is both structured and unstructured. This coupled with multiple fulfillment options like buy online and pick up at store, ship from store, or ship from distribution centers, which increases the complexity of retail operations.

Most retailers use a centralized order management system (OMS) for managing orders, inventory, shipments, payments, and other operational aspects. These legacy OMSs are unable to scale in response to the rapid changes in retail business models. The enterprise applications that are key for efficient and smooth retail operations rely on a central OMS. Applications for ecommerce, warehouse management, call centers, and mobile all require an OMS to get order status, inventory positions of different items, shipment status, and more. Another challenge with legacy OMSs is they’re not designed to handle unstructured data like weather data and IoT data that could impact inventory and order fulfillment. A legacy OMS that can’t scale prohibits you from implementing new business models that could transform your customer experience.

A data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale. An operational data lake addresses this challenge by providing easy access to structured and unstructured operational data in real time from various enterprise systems. You can store your data as is, without having to first structure the data, and run different types of analytics—from dashboards and visualizations to big data processing, real-time analytics, and machine learning (ML)—to guide better decisions. This can ease the burden on OMSs that can instead focus on order orchestration and management.

Solution overview

In this post, we create an end-to-end pipeline to ingest, store, process, analyze, and visualize operational data like orders, inventory, and shipment updates. We use the following AWS services as key components:

  • Kinesis Data Streams to ingest all operational data in real time from various systems
  • DynamoDB, Amazon Aurora, and Amazon Simple Storage Service (Amazon S3) to store the data
  • AWS Glue DataBrew to clean and transform the data
  • AWS Glue crawlers to catalog the data
  • Athena to query the processed data
  • A QuickSight dashboard that provides insights into various operational metrics

The following diagram illustrates the solution architecture.

The data pipeline consists of stages to ingest, store, process, analyze, and finally visualize the data, which we discuss in more detail in the following sections.

Data ingestion

Orders and inventory data is ingested in real time from multiple sources like web applications, mobile apps, and connected devices into Kinesis Data Streams. Kinesis Data Streams is a massively scalable and durable real-time data streaming service. Kinesis Data Streams can continuously capture gigabytes of data per second from hundreds of thousands of sources, such as web applications, database events, inventory transactions, and payment transactions. Frontend systems like ecommerce applications and mobile apps ingest the order data as soon as items are added to a cart or an order is created. The OMS ingests orders when the order status changes. OMSs, stores, and third-party suppliers ingest inventory updates into the data stream.

To simulate orders, an AWS Lambda function is triggered by a scheduled Amazon CloudWatch event every minute to ingest orders to a data stream. This function simulates the typical order management system lifecycle (order created, scheduled, released, shipped, and delivered). Similarly, a second Lambda function is triggered by a CloudWatch event to generate inventory updates. This function simulates different inventory updates such as purchase orders created from systems like the OMS or third-party suppliers. In a production environment, this data would come from frontend applications and a centralized order management system.

Data storage

There are two types of data: hot and cold data. Hot data is consumed by frontend applications like web applications, mobile apps, and connected devices. The following are some example use cases for hot data:

  • When a customer is browsing products, the real-time availability of the item must be displayed
  • Customers interacting with Alexa to know the status of the order
  • A call center agent interacting with a customer needs to know the status of the customer order or its shipment details

The systems, APIs, and devices that consume this data need the data within seconds or milliseconds of the transactions.

Cold data is used for long-term analytics like orders over a period of time, orders by channel, top 10 items by number of orders, or planned vs. available inventory by item, warehouse, or store.

For this solution, we store orders hot data in DynamoDB. DynamoDB is a fully managed NoSQL database that delivers single-digit millisecond performance at any scale. A Lambda function processes records in the Kinesis data stream and stores it in a DynamoDB table.

Inventory hot data is stored in an Amazon Aurora MySQL-Compatible Edition database. Inventory is transactional data that requires high consistency so that customers aren’t over-promised or under-promised when they place orders. Aurora MySQL is fully managed database that is up to five times faster than standard MySQL databases and three times faster than standard PostgreSQL databases. It provides the security, availability, and reliability of commercial databases at a tenth of the cost.

Amazon S3 is object storage built to store and retrieve any amount of data from anywhere. It’s a simple storage service that offers industry-leading durability, availability, performance, security, and virtually unlimited scalability at very low cost. Order and inventory cold data is stored in Amazon S3.

Amazon Kinesis Data Firehose reads the data from the Kinesis data stream and stores it in Amazon S3. Kinesis Data Firehose is the easiest way to load streaming data into data stores and analytics tools. It can capture, transform, and load streaming data into Amazon S3, Amazon Redshift, Amazon OpenSearch Service, and Splunk, enabling near-real-time analytics.

Data processing

The data processing stage involves cleaning, preparing, and transforming the data to help downstream analytics applications easily query the data. Each frontend system might have a different data format. In the data processing stage, data is cleaned and converted into a common canonical form.

For this solution, we use DataBrew to clean and convert orders into a common canonical form. DataBrew is a visual data preparation tool that makes it easy for data analysts and data scientists to prepare data with an interactive, point-and-click visual interface without writing code. DataBrew provides over 250 built-in transformations to combine, pivot, and transpose the data without writing code. The cleaning and transformation steps in DataBrew are called recipes. A scheduled DataBrew job applies the recipes to the data in an S3 bucket and stores the output in a different bucket.

AWS Glue crawlers can access data stores, extract metadata, and create table definitions in the AWS Glue Data Catalog. You can schedule a crawler to crawl the transformed data and create or update the Data Catalog. The AWS Glue Data Catalog is your persistent metadata store. It’s a managed service that lets you store, annotate, and share metadata in the AWS Cloud in the same way you would in an Apache Hive metastore. We use crawlers to populate the Data Catalog with tables.

Data analysis

We can query orders and inventory data from S3 buckets using Athena. Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. Views are created in Athena that can be consumed by business intelligence (BI) services like QuickSight.

Data visualization

We generate dashboards using QuickSight. QuickSight is a scalable, serverless, embeddable BI service powered by ML and built for the cloud. QuickSight lets you easily create and publish interactive BI dashboards that include ML-powered insights.

QuickSight also has features to forecast orders, detect anomalies in the order, and provide ML-powered insights. We can create analyses such as orders over a period of time, orders split by channel, top 10 locations for orders, or order fulfillment timelines (the time it took from order creation to order delivery).

Walkthrough overview

To implement this solution, you complete the following high-level steps:

  1. Create solution resources using AWS CloudFormation.
  2. Connect to the inventory database.
  3. Load the inventory database with tables.
  4. Create a VPC endpoint using Amazon Virtual Private Cloud (Amazon VPC).
  5. Create gateway endpoints for Amazon S3 on the default VPC.
  6. Enable CloudWatch rules via Amazon EventBridge to ingest the data.
  7. Transform the data using AWS Glue.
  8. Visualize the data with QuickSight.

Prerequisites

Complete the following prerequisite steps:

  1. Create AWS account if you don’t have done already.
  2. Sign up for QuickSight if you’ve never used QuickSight in this account before. To use the forecast ability in QuickSight, sign up for the Enterprise Edition.

Create resources with AWS CloudFormation

To launch the provided CloudFormation template, complete the following steps:

  1. Choose Launch Stack:
  2. Choose Next.
  3. For Stack name, enter a name.
  4. Provide the following parameters:
    1. The name of the S3 bucket that holds all the data for the data lake.
    2. The name of the database that holds the inventory tables.
    3. The database user name.
    4. The database password.
  5. Enter any tags you want to assign to the stack and choose Next.
  6. Select the acknowledgement check boxes and choose Create stack.

The stack takes 5–10 minutes to complete.

On the AWS CloudFormation console, you can navigate to the stack’s Outputs tab to review the resources you created.

If you open the S3 bucket you created, you can observe its folder structure. The stack creates sample order data for the last 7 days.

Connect to the inventory database

To connect to your database in the query editor, complete the following steps:

  1. On the Amazon RDS console, choose the Region you deployed the stack in.
  2. In the navigation pane, choose Query Editor.

    If you haven’t connected to this database before, the Connect to database page opens.
  3. For Database instance or cluster, choose your database.
  4. For Database username, choose Connect with a Secrets Manager ARN.
    The database user name and password provided during stack creation are stored in AWS Secrets Manager. Alternatively, you can choose Add new database credentials and enter the database user name and password you provided when creating the stack.
  5. For Secrets Manager ARN, enter the value for the key InventorySecretManager from the CloudFormation stack outputs.
  6. Optionally, enter the name of your database.
  7. Choose Connect to database.

Load the inventory database with tables

Enter the following DDL statement in the query editor and choose Run:

CREATE TABLE INVENTORY (
    ItemID varchar(25) NOT NULL,
    ShipNode varchar(25) NOT NULL,
    SupplyType varchar(25) NOT NULL,
    SupplyDemandType varchar(25) NOT NULL,
    ItemName varchar(25),
    UOM varchar(10),
    Quantity int(11) NOT NULL,
    ETA varchar(25)	 ,
    UpdatedDate DATE,
    PRIMARY KEY (ItemID,ShipNode,SupplyType)
);

Create a VPC endpoint

To create your VPC endpoint, complete the following steps:

  1. On the Amazon VPC console, choose VPC Dashboard.
  2. Choose Endpoints in the navigation pane.
  3. Choose Create Endpoint.
  4. For Service category, select AWS services.
  5. For Service name, search for rds and choose the service name ending with rds-data.
  6. For VPC, choose the default VPC.
  7. Leave the remaining settings at their default and choose Create endpoint.

Create a gateway endpoint for Amazon S3

To create your gateway endpoint, complete the following steps:

  1. On the Amazon VPC console, choose VPC Dashboard.
  2. Choose Endpoints in the navigation pane.
  3. Choose Create Endpoint.
  4. For Service category, select AWS services.
  5. For Service name, search for S3 and choose the service name with type Gateway.
  6. For VPC, choose the default VPC.
  7. For Configure route tables, select the default route table.
  8. Leave the remaining settings at their default and choose Create endpoint.

Wait for both the gateway endpoint and VPC endpoint status to change to Available.

Enable CloudWatch rules to ingest the data

We created two CloudWatch rules via the CloudFormation template to ingest the order and inventory data to Kinesis Data Streams. To enable the rules via EventBridge, complete the following steps:

  1. On the CloudWatch console, under Events in the navigation pane, choose Rules.
  2. Make sure you’re in the Region where you created the stack.
  3. Choose Go to Amazon EventBridge.
  4. Select the rule Ingest-Inventory-Update-Schedule-Rule and choose Enable.
  5. Select the rule Ingest-Order-Schedule-Rule and choose Enable.

After 5–10 minutes, the Lambda functions start ingesting orders and inventory updates to their respective streams. You can check the S3 buckets orders-landing-zone and inventory-landing-zone to confirm that the data is being populated.

Perform data transformation

Our CloudFormation stack included a DataBrew project, a DataBrew job that runs every 5 minutes, and two AWS Glue crawlers. To perform data transformation using our AWS Glue resources, complete the following steps:

  1. On the DataBrew console, choose Projects in the navigation pane.
  2. Choose the project OrderDataTransform.

    You can review the project and its recipe on this page.
  3. In the navigation pane, choose Jobs.
  4. Review the job status to confirm it’s complete.
  5. On the AWS Glue console, choose Crawlers in the navigation pane.
    The crawlers crawl the transformed data and update the Data Catalog.
  6. Review the status of the two crawlers, which run every 15 minutes.
  7. Choose Tables in the navigation pane to view the two tables the crawlers created.
    If you don’t see these tables, you can run the crawlers manually to create them.

    You can query the data in the tables with Athena.
  8. On the Athena console, choose Query editor.
    If you haven’t created a query result location, you’re prompted to do that first.
  9. Choose View settings or choose the Settings tab.
  10. Choose Manage.
  11. Select the S3 bucket to store the results and choose Choose.
  12. Choose Query editor in the navigation pane.
  13. Choose either table (right-click) and choose Preview Table to view the table contents.

Visualize the data

If you have never used QuickSight in this account before, complete the prerequisite step to sign up for QuickSight. To use the ML capabilities of QuickSight (such as forecasting) sign up for the Enterprise Edition using the steps in this documentation.

While signing up for QuickSight, make sure to use the same region where you created the CloudFormation stack.

Grant QuickSight permissions

To visualize your data, you must first grant relevant permissions to QuickSight to access your data.

  1. On the QuickSight console, on the Admin drop-down menu, choose Manage QuickSight.
  2. In the navigation pane, choose Security & permissions.
  3. Under QuickSight access to AWS services, choose Manage.
  4. Select Amazon Athena.
  5. Select Amazon S3 to edit QuickSight access to your S3 buckets.
  6. Select the bucket you specified during stack creation (for this post, operational-datalake).
  7. Choose Finish.
  8. Choose Save.

Prepare the datasets

To prepare your datasets, complete the following steps:

  1. On the QuickSight console, choose Datasets in the navigation pane.
  2. Choose New dataset.
  3. Choose Athena.
  4. For Data source name, enter retail-analysis.
  5. Choose Validate connection.
  6. After your connection is validated, choose Create data source.
  7. For Database, choose orderdatalake.
  8. For Tables, select orders_clean.
  9. Choose Edit/Preview data.
  10. For Query mode, select SPICE.
    SPICE (Super-fast, Parallel, In-memory Calculation Engine) is the robust in-memory engine that QuickSight uses.
  11. Choose the orderdatetime field (right-click), choose Change data type, and choose Date.
  12. Enter the date format as MM/dd/yyyy HH:mm:ss.
  13. Choose Validate and Update.
  14. Change the data types of the following fields to QuickSight geospatial data types:
    1. billingaddress.zipcode – Postcode
    2. billingaddress.city – City
    3. billingaddress.country – Country
    4. billingaddress.state – State
    5. shippingaddress.zipcode – Postcode
    6. shippingaddress.city – City
    7. shippingaddress.country – Country
    8. shippingaddress.state – State
  15. Choose Save & publish.
  16. Choose Cancel to exit this page.

    Let’s create another dataset for the Athena table inventory_landing_zone.
  17. Follow steps 1–7 to create a new dataset. For Table selection, choose inventory_landing_zone.
  18. Choose Edit/Preview data.
  19. For Query mode, select SPICE.
  20. Choose Save & publish.
  21. Choose Cancel to exit this page.

    Both datasets should now be listed on the Datasets page.
  22. Choose each dataset and choose Refresh now.
  23. Select Full refresh and choose Refresh.

To set up a scheduled refresh, choose Schedule a refresh and provide your schedule details.

Create an analysis

To create an analysis in QuickSight, complete the following steps:

  1. On the QuickSight console, choose Analyses in the navigation pane.
  2. Choose New analysis.
  3. Choose the orders_clean dataset.
  4. Choose Create analysis.
  5. To adjust the theme, choose Themes in the navigation pane, choose your preferred theme, and choose Apply.
  6. Name the analysis retail-analysis.

Add visualizations to the analysis

Let’s start creating visualizations. The first visualization shows orders created over time.

  1. Choose the empty graph on the dashboard and for Visual type¸ choose the line chart.
    For more information about visual types, see Visual types in Amazon QuickSight.
  2. Under Field wells, drag orderdatetime to X axis and ordernumber to Value.
  3. Set ordernumber to Aggregate: Count distinct.

    Now we can filter these orders by Created status.
  4. Choose Filter in the navigation pane and choose Create one.
  5. Search for and choose status.
  6. Choose the status filter you just created.
  7. Select Created from the filter list and choose Apply.
  8. Choose the graph (right-click) and choose Add forecast.
    The forecasting ability is only available in the Enterprise Edition. QuickSight uses a built-in version of the Random Cut Forest (RCF) algorithm. For more information, refer to Understanding the ML algorithm used by Amazon QuickSight.
  9. Leave the settings as default and choose Apply.
  10. Rename the visualization to “Orders Created Over Time.”

If the forecast is applied successfully, the visualization shows the expected number of orders as well as upper and lower bounds.

If you get the following error message, allow for the data to accumulate for a few days before adding the forecast.

Let’s create a visualization on orders by location.

  1. On the Add menu, choose Add visual.
  2. Choose the points on map visual type.
  3. Under Field wells, drag shippingaddress.zipcode to Geospatial and ordernumber to Size.
  4. Change ordernumber to Aggregate: Count distinct.

    You should now see a map indicating the orders by location.
  5. Rename the visualization accordingly.

    Next, we create a drill-down visualization on the inventory count.
  6. Choose the pencil icon.
  7. Choose Add dataset.
  8. Select the inventory_landing_zone dataset and choose Select.
  9. Choose the inventory_landing_zone dataset.
  10. Add the vertical bar chart visual type.
  11. Under Field wells, drag itemname, shipnode, and invtype to X axis, and quantity to Value.
  12. Make sure that quantity is set to Sum.

    The following screenshot shows an example visualization of order inventory.
  13. To determine how many face masks were shipped out from each ship node, choose Face Masks (right-click) and choose Drill down to shipnode.
  14. You can drill down even further to invtype to see how many face masks in a specific ship node are in which status.

The following screenshot shows this drilled-down inventory count.

As a next step, you can create a QuickSight dashboard from the analysis you created. For instructions, refer to Tutorial: Create an Amazon QuickSight dashboard.

Clean up

To avoid any ongoing charges, on the AWS CloudFormation console, select the stack you created and choose Delete. This deletes all the created resources. On the stack’s Events tab, you can track the progress of the deletion, and wait for the stack status to change to DELETE_COMPLETE.

The Amazon EventBridge rules generate orders and inventory data every 15 minutes, to avoid generating huge amount of data, please ensure to delete the stack after testing the blog.

If the deletion of any resources fails, ensure that you delete them manually. For deleting Amazon QuickSight datasets, you can follow these instructions. You can delete the QuickSight Analysis using these steps. For deleting the QuickSight subscription and closing the account, you can follow these instructions.

Conclusion

In this post, we showed you how to use AWS analytics and storage services to build a serverless operational data lake. Kinesis Data Streams lets you ingest large volumes of data, and DataBrew lets you cleanse and transform the data visually. We also showed you how to analyze and visualize the order and inventory data using AWS Glue, Athena, and QuickSight. For more information and resources for data lakes on AWS, visit Analytics on AWS.


About the Authors

Gandhi Raketla is a Senior Solutions Architect for AWS. He works with AWS customers and partners on cloud adoption, as well as architecting solutions that help customers foster agility and innovation. He specializes in the AWS data analytics domain.

Sindhura Palakodety is a Solutions Architect at AWS. She is passionate about helping customers build enterprise-scale Well-Architected solutions on the AWS Cloud and specializes in the containers and data analytics domains.

Integrate AWS Glue Schema Registry with the AWS Glue Data Catalog to enable effective schema enforcement in streaming analytics use cases

Post Syndicated from Sam Mokhtari original https://aws.amazon.com/blogs/big-data/integrate-aws-glue-schema-registry-with-the-aws-glue-data-catalog-to-enable-effective-schema-enforcement-in-streaming-analytics-use-cases/

Metadata is an integral part of data management and governance. The AWS Glue Data Catalog can provide a uniform repository to store and share metadata. The main purpose of the Data Catalog is to provide a central metadata store where disparate systems can store, discover, and use that metadata to query and process the data.

Another important aspect of data governance is serving and managing the relationship between data stores and external clients, which are the producers and consumers of data. As the data evolves, especially in streaming use cases, we need a central framework that provides a contract between producers and consumers to enable schema evolution and improved governance. The AWS Glue Schema Registry provides a centralized framework to help manage and enforce schemas on data streaming applications using convenient integrations with Apache Kafka and Amazon Managed Streaming for Apache Kafka (Amazon MSK), Amazon Kinesis Data Streams, Apache Flink and Amazon Kinesis Data Analytics for Apache Flink, and AWS Lambda.

In this post, we demonstrate how to integrate Schema Registry with the Data Catalog to enable efficient schema enforcement in streaming analytics use cases.

Stream analytics on AWS

There are many different scenarios where customers want to run stream analytics on AWS while managing the schema evolution effectively. To manage the end-to-end stream analytics life cycle, there are many different applications involved for data production, processing, analytics, routing, and consumption. It can be quite hard to manage changes across different applications for stream analytics use cases. Adding/removing a data field across different stream analytics applications can lead to data quality issues or downstream application failures if it is not managed appropriately.

For example, a large grocery store may want to send orders information using Amazon KDS to it’s backend systems. While sending the order information, customer may want to make some data transformations or run analytics on it. The orders may be routed to different targets depending upon the type of orders and it may be integrated with many backend applications which expect order stream data in specific format. But the order details schema can change due to many different reasons such as new business requirements, technical changes, source system upgrades or something else.

The changes are inevitable but customers want a mechanism to manage these changes effectively while running their stream analytics workloads.  To support stream analytics use cases on AWS and enforce schema and governance, customers can make use of AWS Glue Schema Registry along with AWS Stream analytics services.

You can use Amazon Kinesis Data Firehose data transformation to ingest data from Kinesis Data Streams, run a simple data transformation on a batch of records via a Lambda function, and deliver the transformed records to destinations such as Amazon Simple Storage Service (Amazon S3), Amazon Redshift, Amazon OpenSearch Service, Splunk, Datadog, NewRelic, Dynatrace, Sumologic, LogicMonitor, MongoDB, and an HTTP endpoint. The Lambda function transforms the current batch of records with no information or state from previous batches.

Lambda function also has the stream analytics capability for Amazon Kinesis Data Analytics and Amazon DynamoDB. This feature enables data aggregation and state management across multiple function invocations. This capability uses a tumbling window, which is a fixed-size, non-overlapping time interval of up to 15 minutes. When you apply a tumbling window to a stream, records in the stream are grouped by window and sent to the processing Lambda function. The function returns a state value that is passed to the next tumbling window.

Kinesis Data Analytics provides SQL-based stream analytics against streaming data. This service also enables you to use an Apache Flink application to process stream data. Data can be ingested from Kinesis Data Streams and Kinesis Data Firehose while supporting Kinesis Data Firehose (Amazon S3, Amazon Redshift, Amazon OpenSearch Service, and Splunk), Lambda, and Kinesis Data Streams as destinations.

Finally, you can use the AWS Glue streaming extract, transform, and load (ETL) capability as a serverless method to consume data from Kinesis and Apache Kafka or Amazon MSK. The job aggregates, transforms, and enriches the data using Spark streaming, then continuously loads the results into Amazon S3-based data lakes, data warehouses, DynamoDB, JDBC, and more.

Managing stream metadata and schema evolution is becoming more important for stream analytics use cases. To enable these on AWS, the Data Catalog and Schema Registry allow you to centrally control and discover schemas. Before the release of schema referencing in the Data Catalog, you relied on managing schema evolution separately in the Data Catalog and Schema Registry, which usually leads to inconsistencies between these two. With the new release of the Data Catalog and Schema Registry integration, you can now reference schemas stored in the schema registry when creating or updating AWS Glue tables in the Data Catalog. This helps avoid inconsistency between the schema registry and Data Catalog, which results in end-to-end data quality enforcement.

In this post, we walk you through a streaming ETL example in AWS Glue to better showcase how this integration can help. This example includes reading streaming data from Kinesis Data Streams, schema discovery with Schema Registry, using the Data Catalog to store the metadata, and writing out the results to an Amazon S3 as a sink.

Solution overview

The following high-level architecture diagram shows the components to integrate Schema Registry and the Data Catalog to run streaming ETL jobs. In this architecture, Schema Registry helps centrally track and evolve Kinesis Data Streams schemas.

At a high level, we use the Amazon Kinesis Data Generator (KDG) to stream data to a Kinesis data stream, use AWS Glue to run streaming ETL, and use Amazon Athena to query the data.

In the following sections, we walk you through the steps to build this architecture.

Create a Kinesis data stream

To set up a Kinesis data stream, complete the following steps:

  1. On the Kinesis console, choose Data streams.
  2. Choose Create data stream.
  3. Give the stream a name, such as ventilator_gsr_stream.
  4. Complete stream creation.

Configure Kinesis Data Generator to generate sample data

You can use the KDG with the ventilator template available on the GitHub repo to generate sample data. The following diagram shows the template on the KDG console.

Add a new AWS Glue schema registry

To add a new schema registry, complete the following steps:

  1. On the AWS Glue console, under Data catalog in the navigation pane, choose Schema registries.
  2. Choose Add registry.
  3. For Registry name, enter a name (for example, MyDemoSchemaReg).
  4. For Description, enter an optional description for the registry.
  5. Choose Add registry.

Add a schema to the schema registry

To add a new schema, complete the following steps:

  1. On the AWS Glue console, under Schema registries in the navigation pane, choose Schemas.
  2. Choose Add schema.
  3. Provide the schema name (ventilatorstream_schema_gsr) and attach the schema to the schema registry defined in the previous step.
  4. AWS Glue schemas currently support Avro or JSON formats; for this post, select JSON.
  5. Use the default Compatibility mode and provide the necessary tags as per your tagging strategy.

Compatibility modes allow you to control how schemas can or cannot evolve over time. These modes form the contract between applications producing and consuming data. When a new version of a schema is submitted to the registry, the compatibility rule applied to the schema name is used to determine if the new version can be accepted. For more information on different compatibility modes, refer to Schema Versioning and Compatibility.

  1. Enter the following sample JSON:
    {
      "$id": "https://example.com/person.schema.json",
      "$schema": "http://json-schema.org/draft-07/schema#",
      "title": "Ventilator",
      "type": "object",
      "properties": {
        "ventilatorid": {
          "type": "integer",
          "description": "Ventilator ID"
        },
        "eventtime": {
          "type": "string",
          "description": "Time of the event."
        },
        "serialnumber": {
          "description": "Serial number of the device.",
          "type": "string",
          "minimum": 0
        },
        "pressurecontrol": {
          "description": "Pressure control of the device.",
          "type": "integer",
          "minimum": 0
        },
        "o2stats": {
          "description": "O2 status.",
          "type": "integer",
          "minimum": 0
        },
        "minutevolume": {
          "description": "Volume.",
          "type": "integer",
          "minimum": 0
        },
        "manufacturer": {
          "description": "Volume.",
          "type": "string",
          "minimum": 0
        }
      }
    }

  2. Choose Create schema and version.

Create a new Data Catalog table

To add a new table in the Data Catalog, complete the following steps:

  1. On the AWS Glue Console, under Data Catalog in the navigation pane, choose Tables.
  2. Choose Add table.
  3. Select Add tables from existing schema.
  4. Enter the table name and choose the database.
  5. Select the source type as Kinesis and choose a data stream in your own account.
  6. Choose the respective Region and choose the stream ventilator_gsr_stream.
  7. Choose the MyDemoSchemaReg registry created earlier and the schema (ventilatorstream_schema_gsr) with its respective version.

You should be able to preview the schema.

  1. Choose Next and then choose Finish to create your table.

Create the AWS Glue job

To create your AWS Glue job, complete the following steps:

  1. On the AWS Glue Studio console, choose Jobs in the navigation pane.
  2. Select Visual with a source and target.
  3. Under Source, select Amazon Kinesis and under Target, select Amazon S3.
  4. Choose Create.
  5. Choose Data source.
  6. Configure the job properties such as name, AWS Identity and Access Management (IAM) role, type, and AWS version.

For the IAM role, specify a role that is used for authorization to resources used to run the job and access data stores. Because streaming jobs require connecting to sources and sinks, you need to make sure that the IAM role has permissions to read from Kinesis Data Streams and write to Amazon S3.

  1. For This job runs, select A new script authored by you.
  2. Under Advanced properties, keep Job bookmark disabled.
  3. For Log Filtering, select Standard filter and Spark UI.
  4. Under Monitoring options, enable Job metrics and Continuous logging with Standard filter.
  5. Enable the Spark UI and provide the S3 bucket path to store the Spark event logs.
  6. For Job parameters, enter the following key-values:
    • –output_path – The S3 path where the final aggregations are persisted
    • –aws_region – The Region where you run the job
  7. Leave Connections empty and choose Save job and edit script.
  8. Use the following code for the AWS Glue job (update the values for database, table_name, and checkpointLocation):
import sys
import datetime
import boto3
import base64
from pyspark.sql import DataFrame, Row
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame

args = getResolvedOptions(sys.argv, \
['JOB_NAME', \
'aws_region', \
'output_path'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# S3 sink locations
aws_region = args['aws_region']
output_path = args['output_path']

s3_target = output_path + "ventilator_metrics"
checkpoint_location = output_path + "cp/"
temp_path = output_path + "temp/"


def processBatch(data_frame, batchId):
now = datetime.datetime.now()
year = now.year
month = now.month
day = now.day
hour = now.hour
minute = now.minute
if (data_frame.count() > 0):
dynamic_frame = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame")
apply_mapping = ApplyMapping.apply(frame = dynamic_frame, mappings = [ \
("ventilatorid", "long", "ventilatorid", "long"), \
("eventtime", "string", "eventtime", "timestamp"), \
("serialnumber", "string", "serialnumber", "string"), \
("pressurecontrol", "long", "pressurecontrol", "long"), \
("o2stats", "long", "o2stats", "long"), \
("minutevolume", "long", "minutevolume", "long"), \
("manufacturer", "string", "manufacturer", "string")],\
transformation_ctx = "apply_mapping")

dynamic_frame.printSchema()

# Write to S3 Sink
s3path = s3_target + "/ingest_year=" + "{:0>4}".format(str(year)) + "/ingest_month=" + "{:0>2}".format(str(month)) + "/ingest_day=" + "{:0>2}".format(str(day)) + "/ingest_hour=" + "{:0>2}".format(str(hour)) + "/"
s3sink = glueContext.write_dynamic_frame.from_options(frame = apply_mapping, connection_type = "s3", connection_options = {"path": s3path}, format = "parquet", transformation_ctx = "s3sink")

# Read from Kinesis Data Stream
sourceData = glueContext.create_data_frame.from_catalog( \
database = "kinesislab", \
table_name = "ventilator_gsr_new", \
transformation_ctx = "datasource0", \
additional_options = {"startingPosition": "TRIM_HORIZON", "inferSchema": "true"})

sourceData.printSchema()

glueContext.forEachBatch(frame = sourceData, batch_function = processBatch, options = {"windowSize": "100 seconds", "checkpointLocation": "s3://<bucket name>/ventilator_gsr/checkpoint/"})
job.commit()

Our AWS Glue job is ready to read the data from the Kinesis data stream and send it to Amazon S3 in Parquet format.

Query the data using Athena

The processed streaming data is written in Parquet format to Amazon S3. Run an AWS Glue crawler on the Amazon S3 location where the streaming data is written; the crawler updates the Data Catalog. You can then run queries using Athena to start driving relevant insights from the data.

Clean up

It’s always a good practice to clean up all the resources created as part of this post to avoid any undue cost. To clean up your resources, delete the AWS Glue database, tables, crawlers, jobs, service role, and S3 buckets.

Additionally, be sure to clean up all other AWS resources that you created using AWS CloudFormation. You can delete these resources on the AWS CloudFormation console by deleting the stack used for the Kinesis Data Generator.

Conclusion

This post demonstrated the importance of centrally managing metadata and schema evolution in stream analytics use cases. It also described how the integration of the Data Catalog and Schema Registry can help you achieve this on AWS. We used a streaming ETL example in AWS Glue to better showcase how this integration can help to enforce end-to-end data quality.

To learn more and get started, you can check out AWS Glue Data Catalog and AWS Glue Schema Registry.


About the Authors

Dr. Sam Mokhtari is a Senior Solutions Architect at AWS. His main area of depth is data and analytics, and he has published more than 30 influential articles in this field. He is also a respected data and analytics advisor, and has led several large-scale implementation projects across different industries, including energy, health, telecom, and transport.

Amar Surjit is a Sr. Solutions Architect based in the UK who has been working in IT for over 20 years designing and implementing global solutions for enterprise customers. He is passionate about streaming technologies and enjoys working with customers globally to design and build streaming architectures and drive value by analyzing their streaming data.

Synchronize your AWS Glue Studio Visual Jobs to different environments 

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/synchronize-your-aws-glue-studio-visual-jobs-to-different-environments/

AWS Glue has become a popular option for integrating data from disparate data sources due to its ability to integrate large volumes of data using distributed data processing frameworks. Many customers use AWS Glue to build data lakes and data warehouses. Data engineers who prefer to develop data processing pipelines visually using AWS Glue Studio to create data integration jobs. This post introduces Glue Visual Job API to author the Glue Studio Visual Jobs programmatically, and Glue Job Sync utility that uses the API to easily synchronize Glue jobs to different environments without losing the visual representation.

Glue Job Visual API

AWS Glue Studio has a graphical interface called Visual Editor that makes it easy to author extract, transform, and load (ETL) jobs in AWS Glue. The Glue jobs created in the Visual Editor contain its visual representation that composes data transformation. In this post, we call the jobs Glue Studio Visual Jobs.

For example, it’s common to develop and test AWS Glue jobs in a dev account, and then promote the jobs to a prod account. Previously, when you copied the AWS Glue Studio Visual jobs to a different environment, there was no mechanism to copy the visual representation together. This means that the visual representation of the job was lost and you could only copy the code produced with Glue Studio. It can be time consuming and tedious to either copy the code or recreate the job.

AWS Glue Job Visual API lets you programmatically create and update Glue Studio Visual Jobs by providing a JSON object that indicates visual representation, and also retrieve the visual representation from existing Glue Studio Visual Jobs. A Glue Studio Visual Job consists of data source nodes for reading the data, transform nodes for modifying the data, and data target nodes for writing the data.

There are some typical use cases for Glue Visual Job API:

  • Automate creation of Glue Visual Jobs.
  • Migrate your ETL jobs from third-party or on-premises ETL tools to AWS Glue. Many AWS partners, such as Bitwise, Bladebridge, and others have built convertors from the third-party ETL tools to AWS Glue.
  • Synchronize AWS Glue Studio Visual jobs from one environment to another without losing visual representation.

In this post, we focus on a utility that uses Glue Job Visual APIs to achieve the mass synchronization of your Glue Studio Visual Jobs without losing the visual representation.

Glue Job Sync Utility

There are common requirements to synchronize the Glue Visual Jobs between different environments.

  • Promote Glue Visual Jobs from a dev account to a prod account.
  • Transfer ownership of Glue Visual Jobs between different AWS accounts.
  • Replicate Glue Visual Job configurations from one region to another for disaster recovery purpose.

Glue Job Sync Utility is built on top of Glue Visual Job API, and the utility lets you synchronize the jobs to different accounts without losing the visual representation. The Glue Job Sync Utility is a python application that enables you to synchronize your AWS Glue Studio Visual jobs to different environments using the new Glue Job Visual API. This utility requires that you provide source and target AWS environment profiles. Optionally, you can provide a list of jobs that you want to synchronize, and specify how the utility should replace your environment-specific objects using a mapping file. For example, Amazon Simple Storage Service (Amazon S3) locations in your development environment and role can be different than your production environment. The mapping config file will be used to replace the environment specific objects.

How to use Glue Job Sync Utility

In this example, we’re synchronizing two AWS Glue Studio Visual jobs, test1 and test2, from the development environment to the production environment in a different account.

  • Source environment (dev environment)
    • AWS Account ID: 123456789012
    • AWS Region: eu-west-3 (Paris)
    • AWS Glue Studio Visual jobs: test1, test2
    • AWS Identity and Access Management (IAM) Role ARN for Glue job execution role: arn:aws:iam::123456789012:role/GlueServiceRole
    • Amazon S3 bucket for Glue job script and other asset location: s3://aws-glue-assets-123456789012-eu-west-3/
    • Amazon S3 bucket for data location: s3://dev-environment/
  • Destination environment (prod environment)
    • AWS Account ID: 234567890123
    • AWS Region: eu-west-3 (Paris)
    • IAM Role ARN for Glue job execution role: arn:aws:iam::234567890123:role/GlueServiceRole
    • Amazon S3 bucket for Glue job script and other asset location: s3://aws-glue-assets-234567890123-eu-west-3/
    • Amazon S3 bucket for data location: s3://prod-environment/

Set up the utility in your local environment

You will need the following prerequisites for this utility:

  • Python 3.6 or later.
  • Latest version of boto3.
  • Create two AWS named profiles, dev and prod, with the corresponding credentials in your environment. Follow this instruction.

Download the Glue Job Sync Utility

Download the sync utility from the GitHub repository to your local machine.

Create AWS Glue Studio Visual Jobs

  1. Create two AWS Glue Studio Visual jobs, test1, and test2, in the source account.
    • If you don’t have any AWS Glue Studio Visual jobs, then follow this instruction to create the Glue Studio Visual jobs.

  2. Open AWS Glue Studio in the destination account and verify that the test1 and test2 jobs aren’t present.

Run the Job Sync Utility

  1. Create a new file named mapping.json, and enter the following JSON code. With the configuration in line 1, the sync utility will replace all of the Amazon S3 references within the job (in this case s3://aws-glue-assets-123456789012-eu-west-3) to the mapped location (in this case s3://aws-glue-assets-234567890123-eu-west-3). Then, the utility will create the job to the destination environment. Along these lines, line 2 and line 3 will trigger appropriate substitutions in the job. Note that these are example values and you’ll need to substitute the right values that match your environment.

    {
        "s3://aws-glue-assets-123456789012-eu-west-3": "s3://aws-glue-assets-234567890123-eu-west-3",
        "arn:aws:iam::123456789012:role/GlueServiceRole": "arn:aws:iam::234567890123:role/GlueServiceRole",
        "s3://dev-environment": "s3://prod-environment"
    }

  2. Execute the utility by running the following command:
    $ python3 sync.py --src-profile dev --src-region eu-west-3 --dst-profile prod --dst-region eu-west-3 --src-job-names test1,test2 --config-path mapping.json

  3. Verify successful synchronization by opening AWS Glue Studio in the destination account:
  4. Open the Glue Studio Visual jobs, test1, and test2, and verify the visual representation of the DAG.

The screenshot above shows that you were able to copy the jobs test1 and test2 while keeping DAG into the destination account.

Conclusion

AWS Glue Job Visual API and the AWS Glue Sync Utility simplify how you synchronize your jobs to different environments. These are designed to easily integrate into your Continuous Integration pipelines while retaining the visual representation that improves the readability of the ETL pipeline.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for designing AWS features, implementing software artifacts, and helping customer architectures. In his spare time, he enjoys watching anime in Prime Video.

Aaron Meltzer is a Software Engineer on the AWS Glue Studio team. He leads the design and implementation of features to simplify the management of AWS Glue jobs. Outside of work, Aaron likes to read and learn new recipes.

Mohamed Kiswani is the Software Development Manager on the AWS Glue Team

Shiv Narayanan is a Senior Technical Product Manager on the AWS Glue team.

How to unit test and deploy AWS Glue jobs using AWS CodePipeline

Post Syndicated from Praveen Kumar Jeyarajan original https://aws.amazon.com/blogs/devops/how-to-unit-test-and-deploy-aws-glue-jobs-using-aws-codepipeline/

This post is intended to assist users in understanding and replicating a method to unit test Python-based ETL Glue Jobs, using the PyTest Framework in AWS CodePipeline. In the current practice, several options exist for unit testing Python scripts for Glue jobs in a local environment. Although a local development environment may be set up to build and unit test Python-based Glue jobs, by following the documentation, replicating the same procedure in a DevOps pipeline is difficult and time consuming.

Unit test scripts are one of the initial quality gates used by developers to provide a high-quality build. One must reuse these scripts during regression testing to make sure that all of the existing functionality is intact, and that new releases don’t disrupt key application functionality. The majority of the regression test suites are expected to be integrated with the DevOps Pipeline for its execution. Unit testing an application code is a fundamental task that evaluates  whether each (unit) code written by a programmer functions as expected. Unit testing of code provides a mechanism to determine that software quality hasn’t been compromised. One of the difficulties in building Python-based Glue ETL tasks is their ability for unit testing to be incorporated within DevOps Pipeline, especially when there are modernization of mainframe ETL process to modern tech stacks in AWS

AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning (ML), and application development. AWS Glue provides all of the capabilities needed for data integration. This means that you can start analyzing your data and putting it to use in minutes rather than months. AWS Glue provides both visual and code-based interfaces to make data integration easier.

Prerequisites

GitHub Repository

Amazon ECR Image URI for Glue Library

Solution overview

A typical enterprise-scale DevOps pipeline is illustrated in the following diagram. This solution describes how to incorporate the unit testing of Python-based AWS Glue ETL processes into the AWS DevOps Pipeline.

Figure 1 Solution Overview

The GitHub repository aws-glue-jobs-unit-testing has a sample Python-based Glue job in the src folder. Its associated unit test cases built using the Pytest Framework are accessible in the tests folder. An AWS CloudFormation template written in YAML is included in the deploy folder. As a runtime environment, AWS CodeBuild utilizes custom container images. This feature is used to build a project utilizing Glue libraries from Public ECR repository, that can run the code package to demonstrate unit testing integration.

Solution walkthrough

Time to read  7 min
Time to complete  15-20 min
Learning level  300
Services used
AWS CodePipeline, AWS CodeCommit, AWS CodeBuild, Amazon Elastic Container Registry (Amazon ECR) Public Repositories, AWS CloudFormation

The container image at the Public ECR repository for AWS Glue libraries includes all of the binaries required to run PySpark-based AWS Glue ETL tasks locally, as well as unit test them. The public container repository has three image tags, one for each AWS Glue version supported by AWS Glue. To demonstrate the solution, we use the image tag glue_libs_3.0.0_image_01 in this post. To utilize this container image as a runtime image in CodeBuild, copy the Image URI corresponding to the image tag that you intend to use, as shown in the following image.

Figure 2 Select Glue Library from Public ECR

The aws-glue-jobs-unit-testing GitHub repository contains a CloudFormation template, pipeline.yml, which deploys a CodePipeline with CodeBuild projects to create, test, and publish the AWS Glue job. As illustrated in the following, use the copied image URL from Amazon ECR public to create and test a CodeBuild project.

  TestBuild:
    Type: AWS::CodeBuild::Project
    Properties:
      Artifacts:
        Type: CODEPIPELINE
      BadgeEnabled: false
      Environment:
        ComputeType: BUILD_GENERAL1_LARGE
        Image: "public.ecr.aws/glue/aws-glue-libs:glue_libs_3.0.0_image_01"
        ImagePullCredentialsType: CODEBUILD
        PrivilegedMode: false
        Type: LINUX_CONTAINER
      Name: !Sub "${RepositoryName}-${BranchName}-build"
      ServiceRole: !GetAtt CodeBuildRole.Arn  

The pipeline performs the following operations:

  1. It uses the CodeCommit repository as the source and transfers the most recent code from the main branch to the CodeBuild project for further processing.
  2. The following stage is build and test, in which the most recent code from the previous phase is unit tested and the test report is published to CodeBuild report groups.
  3. If all of the test results are good, then the next CodeBuild project is launched to publish the code to an Amazon Simple Storage Service (Amazon S3) bucket.
  4. Following the successful completion of the publish phase, the final step is to deploy the AWS Glue task using the CloudFormation template in the deploy folder.

Deploying the solution

Set up

Now we’ll deploy the solution using a CloudFormation template.

  • Using the GitHub Web, download the code.zip file from the aws-glue-jobs-unit-testing repository. This zip file contains the GitHub repository’s src, tests, and deploy folders. You may also create the zip file yourself using command-line tools, such as git and zip. To create the zip file on Linux or Mac, open the terminal and enter the following commands.
git clone https://github.com/aws-samples/aws-glue-jobs-unit-testing.git
cd aws-glue-jobs-unit-testing
git checkout master
zip -r code.zip src/ tests/ deploy/
  • Sign in to the AWS Management Console and choose the AWS Region of your choice.
  • Create an Amazon S3 bucket. For more information, see How Do I Create an S3 Bucket? in the AWS documentation.
  • Upload the downloaded zip package, code.zip, to the Amazon S3 bucket that you created.

In this example, I created an Amazon S3 bucket named aws-glue-artifacts-us-east-1 in the N. Virginia (us-east-1) Region, and used the console to upload the zip package from the GitHub repository to the Amazon S3 bucket.

Figure 3 Upload code.zip file to S3 bucket

Creating the stack

  1.  In the CloudFormation console, choose Create stack.
  2. On the Specify template page, choose Upload a template file, and then choose the pipeline.yml template, downloaded from the GitHub repository

Figure 4 Upload pipeline.yml template to create a new CloudFormation stack

  1. Specify the following parameters:.
  • Stack name: glue-unit-testing-pipeline (Choose a stack name of your choice)
  • ApplicationStackName: glue-codepipeline-app (This is the name of the CloudFormation stack that will be created by the pipeline)
  • BranchName: master (This is the name of the branch to be created in the CodeCommit repository to check-in the code from the Amazon S3 bucket zip file)
  • BucketName: aws-glue-artifacts-us-east-1 (This is the name of the Amazon S3 bucket that contains the zip file. This bucket will also be used by the pipeline for storing code artifacts)
  • CodeZipFile: lambda.zip (This is the key name of the sample code Amazon S3 object. The object should be a zip file)
  • RepositoryName: aws-glue-unit-testing (This is the name of the CodeCommit repository that will be created by the stack)
  • TestReportGroupName: glue-unittest-report (This is the name of the CodeBuild test report group that will be created to store the unit test reports)

Figure 5 Fill parameters for stack creation

  1. Choose Next, and again Next.
  1. On the Review page, under Capabilities, choose the following options:
  • I acknowledge that CloudFormation might create IAM resources with custom names.

Figure 6 Acknowledge IAM roles creation

  1. Choose Create stack to begin the stack creation process. Once the stack creation is complete, the resources that were created are displayed on the Resources tab. The stack creation takes approximately 5-7 minutes.

Figure 7 Successful completion of stack creation

The stack automatically creates a CodeCommit repository with the initial code checked-in from the zip file uploaded to the Amazon S3 bucket. Furthermore, it creates a CodePipeline view using the CodeCommit repository as the source. In the above example, the CodeCommit repository is aws-glue-unit-test, and the pipeline is aws-glue-unit-test-pipeline.

Testing the solution

To test the deployed pipeline, open the CodePipeline console and select the pipeline created by the CloudFormation stack. Select the Release Change button on the pipeline page.

Figure 8 Choose Release Change on pipeline page

The pipeline begins its execution with the most recent code in the CodeCommit repository.

When the Test_and_Build phase is finished, select the Details link to examine the execution logs.

Figure 9 Successfully completed the Test_and_Build stage

Select the Reports tab, and choose the test report from Report history to view the unit execution results.

Figure 10 Test report from pipeline execution

Finally, after the deployment stage is complete, you can see, run, and monitor the deployed AWS Glue job on the AWS Glue console page. For more information, refer to the Running and monitoring AWS Glue documentation

Figure 11 Successful pipeline execution

Cleanup

To avoid additional infrastructure costs, make sure that you delete the stack after experimenting with the examples provided in the post. On the CloudFormation console, select the stack that you created, and then choose Delete. This will delete all of the resources that it created, including CodeCommit repositories, IAM roles/policies, and CodeBuild projects.

Summary

In this post, we demonstrated how to unit test and deploy Python-based AWS Glue jobs in a pipeline with unit tests written with the PyTest framework. The approach is not limited to CodePipeline, and it can be used to build up a local development environment, as demonstrated in the Big Data blog. The aws-glue-jobs-unit-testing GitHub repository contains the example’s CloudFormation template, as well as sample AWS Glue Python code and Pytest code used in this post. If you have any questions or comments regarding this example, please open an issue or submit a pull request.

Authors:

Praveen Kumar Jeyarajan

Praveen Kumar Jeyarajan is a PraveenKumar is a Senior DevOps Consultant in AWS supporting Enterprise customers and their journey to the cloud. He has 11+ years of DevOps experience and is skilled in solving myriad technical challenges using the latest technologies. He holds a Masters degree in Software Engineering. Outside of work, he enjoys watching movies and playing tennis.

Vaidyanathan Ganesa Sankaran

Vaidyanathan Ganesa Sankaran is a Sr Modernization Architect at AWS supporting Global Enterprise customers on their journey towards modernization. He is specialized in Artificial intelligence, legacy Modernization and Cloud Computing. He holds a Masters degree in Software Engineering and has 12+ years of Modernization experience. Outside work, he loves conducting training sessions for college grads and professional starter who wants to learn cloud and AI. His hobbies are playing tennis, philately and traveling.

Author AWS Glue jobs with PyCharm using AWS Glue interactive sessions

Post Syndicated from Kunal Ghosh original https://aws.amazon.com/blogs/big-data/author-aws-glue-jobs-with-pycharm-using-aws-glue-interactive-sessions/

Data lakes, business intelligence, operational analytics, and data warehousing share a common core characteristic—the ability to extract, transform, and load (ETL) data for analytics. Since its launch in 2017, AWS Glue has provided serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development.

AWS Glue interactive sessions allows programmers to build, test, and run data preparation and analytics applications. Interactive sessions provide access to run fully managed serverless Apache Spark using an on-demand model. AWS Glue interactive sessions also provide advanced users the same Apache Spark engine as AWS Glue 2.0 or AWS Glue 3.0, with built-in cost controls and speed. Additionally, development teams immediately become productive using their existing development tool of choice.

In this post, we walk you through how to use AWS Glue interactive sessions with PyCharm to author AWS Glue jobs.

Solution overview

This post provides a step-by-step walkthrough that builds on the instructions in Getting started with AWS Glue interactive sessions. It guides you through the following steps:

  1. Create an AWS Identity and Access Management (IAM) policy with limited Amazon Simple Storage Service (Amazon S3) read privileges and associated role for AWS Glue.
  2. Configure access to a development environment. You can use a desktop computer or an OS running on the AWS Cloud using Amazon Elastic Compute Cloud (Amazon EC2).
  3. Integrate AWS Glue interactive sessions with an integrated development environments (IDE).

We use the script Validate_Glue_Interactive_Sessions.ipynb for validation, available as a Jupyter notebook.

Prerequisites

You need an AWS account before you proceed. If you don’t have one, refer to How do I create and activate a new AWS account? This guide assumes that you already have installed Python and PyCharm. Python 3.7 or later is the foundational prerequisite.

Create an IAM policy

The first step is to create an IAM policy that limits read access to the S3 bucket s3://awsglue-datasets, which has the AWS Glue public datasets. You use IAM to define the policies and roles for access to AWS Glue.

  1. On the IAM console, choose Policies in the navigation pane.
  2. Choose Create policy.
  3. On the JSON tab, enter the following code:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:Get*",
                    "s3:List*",
                    "s3-object-lambda:Get*",
                    "s3-object-lambda:List*"
                ],
                "Resource": ["arn:aws:s3:::awsglue-datasets/*"]
            }
        ]
    }

  4. Choose Next: Tags.
  5. Choose Next: Review.
  6. For Policy name, enter glue_interactive_policy_limit_s3.
  7. For Description, enter a description.
  8. Choose Create policy.

Create an IAM role for AWS Glue

To create a role for AWS Glue with limited Amazon S3 read privileges, complete the following steps:

  1. On the IAM console, choose Roles in the navigation pane.
  2. Choose Create role.
  3. For Trusted entity type, select AWS service.
  4. For Use cases for other AWS services, choose Glue.
  5. Choose Next.
  6. On the Add permissions page, search and choose the AWS managed permission policies AWSGlueServiceRole and glue_interactive_policy_limit_s3.
  7. Choose Next.
  8. For Role name, enter glue_interactive_role.
  9. Choose Create role.
  10. Note the ARN of the role, arn:aws:iam::<replacewithaccountID>:role/glue_interactive_role.

Set up development environment access

This secondary level of access configuration needs to occur on the developer’s environment. The development environment can be a desktop computer running Windows or Mac/Linux, or similar operating systems running on the AWS Cloud using Amazon EC2. The following steps walk through each client access configuration. You can select the configuration path that is applicable to your environment.

Set up a desktop computer

To set up a desktop computer, we recommend completing the steps in Getting started with AWS Glue interactive sessions.

Set up an AWS Cloud-based computer with Amazon EC2

This configuration path follows the best practices for providing access to cloud-based resources using IAM roles. For more information, refer to Using an IAM role to grant permissions to applications running on Amazon EC2 instances.

  1. On the IAM console, choose Roles in the navigation pane.
  2. Choose Create role.
  3. For Trusted entity type¸ select AWS service.
  4. For Common use cases, select EC2.
  5. Choose Next.
  6. Add the AWSGlueServiceRole policy to the newly created role.
  7. On the Add permissions menu, choose Create inline policy.
  8. Create an inline policy that allows the instance profile role to pass or assume glue_interactive_role and save the new role as ec2_glue_demo.

Your new policy is now listed under Permissions policies.

  1. On the Amazon EC2 console, choose (right-click) the instance you want to attach to the newly created role.
  2. Choose Security and choose Modify IAM role.
  3. For IAM role¸ choose the role ec2_glue_demo.
  4. Choose Save.
  5. On the IAM console, open and edit the trust relationship for glue_interactive_role.
  6. Add “AWS”: [“arn:aws:iam:::user/glue_interactive_user”,”arn:aws:iam:::role/ec2_glue_demo”] to the principal JSON key.
  7. Complete the steps detailed in Getting started with AWS Glue interactive sessions.

You don’t need to provide an AWS access key ID or AWS secret access key as part of the remaining steps.

Integrate AWS Glue interactive sessions with an IDE

You’re now ready to set up and validate your PyCharm integration with AWS Glue interactive sessions.

  1. On the welcome page, choose New Project.
  2. For Location, enter the location of your project glue-interactive-demo.
  3. Expand Python Interpreter.
  4. Select Previously configured interpreter and choose the virtual environment you created earlier.
  5. Choose Create.

The following screenshot shows the New Project page on a Mac computer. A Windows computer setup will have a relative path beginning with C:\ followed by the PyCharm project location.

  1. Choose the project (right-click) and on the New menu, choose Jupyter Notebook.
  2. Name the notebook Validate_Glue_Interactive_Sessions.

The notebook has a drop-down called Managed Jupyter server: auto-start, which means the Jupyter server automatically starts when any notebook cell is run.

  1. Run the following code:
    print("This notebook will start the local Python kernel")

You can observe that the Jupyter server started running the cell.

  1. On the Python 3 (ipykernal) drop-down, choose Glue PySpark.
  2. Run the following code to start a Spark session:
    spark

  3. Wait to receive the message that a session ID has been created.
  4. Run the following code in each cell, which is the boilerplate syntax for AWS Glue:
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    glueContext = GlueContext(SparkContext.getOrCreate())

  5. Read the publicly available Medicare Provider payment data in the AWS Glue data preparation sample document:
    medicare_dynamicframe = glueContext.create_dynamic_frame.from_options(
        's3',
        {'paths': ['s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv']},
        'csv',
        {'withHeader': True})
    print("Count:",medicare_dynamicframe.count())
    medicare_dynamicframe.printSchema()

  6. Change the data type of the provider ID to long to resolve all incoming data to long:
    medicare_res = medicare_dynamicframe.resolveChoice(specs = [('Provider Id','cast:long')])
    medicare_res.printSchema()

  7. Display the providers:
    medicare_res.toDF().select('Provider Name').show(10,truncate=False)

Clean up

You can run %delete_session which deletes the current session and stops the cluster, and the user stops being charged. Have a look at the AWS Glue interactive sessions magics. Also please remember to delete IAM policy and role once you are done.

Conclusion

In this post, we demonstrated how to configure PyCharm to integrate and work with AWS Glue interactive sessions. The post builds on the steps in Getting started with AWS Glue interactive sessions to enable AWS Glue interactive sessions to work with Jupyter notebooks. We also provided ways to validate and test the functionality of the configuration.


About the Authors

Kunal Ghosh is a Sr. Solutions Architect at AWS. His passion is building efficient and effective solutions on cloud, especially involving analytics, AI, data science, and machine learning. Besides family time, he likes reading and watching movies. He is a foodie.

Sebastian Muah is a Solutions Architect at AWS focused on analytics, AI/ML, and big data. He has over 25 years of experience in information technology and helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS. He enjoys cycling and building things around his home.

Build your data pipeline in your AWS modern data platform using AWS Lake Formation, AWS Glue, and dbt Core

Post Syndicated from Benjamin Menuet original https://aws.amazon.com/blogs/big-data/build-your-data-pipeline-in-your-aws-modern-data-platform-using-aws-lake-formation-aws-glue-and-dbt-core/

dbt has established itself as one of the most popular tools in the modern data stack, and is aiming to bring analytics engineering to everyone. The dbt tool makes it easy to develop and implement complex data processing pipelines, with mostly SQL, and it provides developers with a simple interface to create, test, document, evolve, and deploy their workflows. For more information, see docs.getdbt.com.

dbt primarily targets cloud data warehouses such as Amazon Redshift or Snowflake. Now, you can use dbt against AWS data lakes, thanks to the following two services:

In this post, you’ll learn how to deploy a data pipeline in your modern data platform using the dbt-glue adapter built by the AWS Professional Services team in collaboration with dbtlabs.

With this new open-source, battle-tested dbt AWS Glue adapter, developers can now use dbt for their data lakes, paying for just the compute they need, with no need to shuffle data around. They still have access to everything that makes dbt great, including the local developer experience, documentation, tests, incremental data processing, Git integration, CI/CD, and more.

Solution overview

The following diagram shows the architecture of the solution.

The steps in this workflow are as follows:

  1. The data team configures a local Python virtual environment and creates a data pipeline with dbt.
  2. The dbt-glue adapter uses Lake Formation to perform all structure manipulation, like creation of database, tables. or views.
  3. The dbt-glue adapter uses AWS Glue interactive sessions as the backend for processing your data.
  4. All data is stored in Amazon Simple Storage Service (Amazon S3) as Parquet open file format.
  5. The data team can now query all data stored in the data lake using Amazon Athena.

Walkthrough overview

For this post, you run a data pipeline that creates indicators based on NYC taxi data by following these steps:

  1. Deploy the provided AWS CloudFormation stack in Region us-east-1.
  2. Configure your Amazon CloudShell environment.
  3. Install dbt, the dbt CLI, and the dbt adaptor.
  4. Use CloudShell to clone the project and configure it to use your account’s configuration.
  5. Run dbt to implement the data pipeline.
  6. Query the data with Athena.

For our use case, we use the data from the New York City Taxi Records dataset. This dataset is available in the Registry of Open Data on AWS (RODA), which is a repository containing public datasets from AWS resources.

The CloudFormation template creates the nyctaxi database in your AWS Glue Data Catalog and a table (records) that points to the public dataset. You don’t need to host the data in your account.

Prerequisites

The CloudFormation template used by this project configures the AWS Identity and Access Management (IAM) role GlueInteractiveSessionRole with all the mandatory permissions.

For more details on permissions for AWS Glue interactive sessions, refer to Securing AWS Glue interactive sessions with IAM.

Deploy resources with AWS CloudFormation

The CloudFormation stack deploys all the required infrastructure:

  • An IAM role with all the mandatory permissions to run an AWS Glue interactive session and the dbt-glue adapter.
  • An AWS Glue database and table to store the metadata related to the NYC taxi records dataset
  • An S3 bucket to use as output and store the processed data
  • An Athena configuration (a workgroup and S3 bucket to store the output) to explore the dataset
  • An AWS Lambda function as an AWS CloudFormation custom resource that updates all the partitions in the AWS Glue table

To create these resources, choose Launch Stack and follow the instructions:

Configure the CloudShell environment

To start working with the shell, complete the following steps:

  1. Sign in to the AWS Management Console and launch CloudShell using either one of the following two methods:
    1. Choose the CloudShell icon on the console navigation bar.
    2. Enter cloudshell in the Find Services box and then choose the CloudShell option.
  2. Because dbt and the dbt-glue adapter are compatible with Python versions 3.7, 3.8, and 3.9, check the version of Python:
    $ python3 --version

  3. Configure a Python virtual environment to isolate the package version and code dependencies:
    $ sudo yum install git -y
    $ python3 -m venv dbt_venv
    $ source dbt_venv/bin/activate
    $ python3 -m pip install --upgrade pip

  4. Configure the aws-glue-session package:
    $ sudo yum install gcc krb5-devel.x86_64 python3-devel.x86_64 -y
    $ pip3 install --no-cache-dir --upgrade boto3
    $ pip3 install --no-cache-dir --upgrade aws-glue-sessions

Install dbt, the dbt CLI, and the dbt adaptor

The dbt CLI is a command-line interface for running dbt projects. It’s is free to use and available as an open source project. Install dbt and the dbt CLI with the following code:

$ pip3 install --no-cache-dir dbt-core

For more information, refer to How to install dbt, What is dbt?, and Viewpoint.

Install the dbt adapter with the following code:

$ pip3 install --no-cache-dir dbt-glue

Clone the project

The dbt AWS Glue interactive session demo project contains an example of a data pipeline that produces metrics based on NYC taxi dataset. Clone the project with the following code:

$ git clone https://github.com/aws-samples/dbtgluenyctaxidemo

This project comes with the following configuration example:

$ dbtgluenyctaxidemo/profile/profiles.yml

The following table summarizes the parameter options for the adaptor.

Option Description Mandatory
project_name The dbt project name. This must be the same as the one configured in the dbt project. yes
type The driver to use. yes
query-comment A string to inject as a comment in each query that dbt runs. no
role_arn The ARN of the interactive session role created as part of the CloudFormation template. yes
region The AWS Region were you run the data pipeline. yes
workers The number of workers of a defined workerType that are allocated when a job runs. yes
worker_type The type of predefined worker that is allocated when a job runs. Accepts a value of Standard, G.1X, or G.2X. yes
schema The schema used to organize data stored in Amazon S3. yes
database The database in Lake Formation. The database stores metadata tables in the Data Catalog. yes
session_provisioning_timeout_in_seconds The timeout in seconds for AWS Glue interactive session provisioning. yes
location The Amazon S3 location of your target data. yes
idle_timeout The AWS Glue session idle timeout in minutes. (The session stops after being idle for the specified amount of time.) no
glue_version The version of AWS Glue for this session to use. Currently, the only valid options are 2.0 and 3.0. The default value is 2.0. no
security_configuration The security configuration to use with this session. no
connections A comma-separated list of connections to use in the session. no

Run the dbt project

The objective of this sample project is to create the following four tables, which contain metrics based on the NYC taxi dataset:

  • silver_nyctaxi_avg_metrics – Basic metrics based on NYC Taxi Open Data for the year 2016
  • gold_nyctaxi_passengers_metrics – Metrics per passenger based on the silver metrics table
  • gold_nyctaxi_distance_metrics – Metrics per distance based on the silver metrics table
  • gold_nyctaxi_cost_metrics – Metrics per cost based on the silver metrics table
  1. To run the project dbt, you should be in the project folder:
    $ cd dbtgluenyctaxidemo

  2. The project requires you to set environment variables in order to run on the AWS account:
    $ export DBT_ROLE_ARN="arn:aws:iam::$(aws sts get-caller-identity --query "Account" --output text):role/GlueInteractiveSessionRole"
    $ export DBT_S3_LOCATION="s3://aws-dbt-glue-datalake-$(aws sts get-caller-identity --query "Account" --output text)-us-east-1/"

  3. Make sure the profile is set up correctly from the command line:
    $ dbt debug --profiles-dir profile

  4. Run the models with the following code:
    $ dbt run --profiles-dir profile

  5. Generate documentation for the project:
    $ dbt docs generate --profiles-dir profile

  6. View the documentation for the project:
    $ dbt docs serve --profiles-dir profile

Query data via Athena

This section demonstrates how to query the target table using Athena. To query the data, complete the following steps:

  1. On the Athena console, switch the workgroup to athena-dbt-glue-aws-blog.
  2. If the Workgroup athena-dbt-glue-aws-blog settings dialog box appears, choose Acknowledge.
  3. Use the following query to explore the metrics created by the dbt project:
    SELECT cm.avg_cost_per_minute
    , cm.avg_cost_per_distance
    , dm.avg_distance_per_duration
    , dm.year
    , dm.month
    , dm.type
    FROM "dbt_nyc_metrics"."gold_nyctaxi_distance_metrics" dm
    LEFT JOIN "dbt_nyc_metrics"."gold_nyctaxi_cost_metrics" cm
    ON dm.type = cm.type
    AND dm.year = cm.year
    AND dm.month = cm.month
    WHERE dm.type = 'yellow'
    AND dm.year = '2016'
    AND dm.month = '6'

The following screenshot shows the results of this query.

Clean Up

To clean up your environment, complete the following steps in CloudShell:

  1. Delete the database created by dbt:
    $ aws glue delete-database --name dbt_nyc_metrics

  2. Delete all generated data:
    $ aws s3 rm s3://aws-dbt-glue-datalake-$(aws sts get-caller-identity --query "Account" --output text)-us-east-1/ --recursive
    $ aws s3 rm 3://aws-athena-dbt-glue-query-results-$(aws sts get-caller-identity --query "Account" --output text)-us-east-1/ --recursive

  3. Delete the CloudFormation stack:
    $ aws cloudformation delete-stack --stack-name dbt-demo

Summary

This post demonstrates how AWS managed services are key enablers and accelerators to build a modern data platform at scale or take advantage of an existing one.

With the introduction of dbt and aws-glue-dbt-adapter, data teams can access data stored in your modern data platform using SQL statements to extract value from data.

To report a bug or request a feature, please open an issue on GitHub. If you have any questions or suggestions, leave your feedback in the comment section. If you need further assistance to optimize your modern data platform, contact your AWS account team or a trusted AWS Partner.


About the Authors

Benjamin Menuet is a Data Architect with AWS Professional Services. He helps customers develop big data and analytics solutions to accelerate their business outcomes. Outside of work, Benjamin is a trail runner and has finished some mythic races like the UTMB.

Armando Segnini is a Data Architect with AWS Professional Services. He spends his time building scalable big data and analytics solutions for AWS Enterprise and Strategic customers. Armando also loves to travel with his family all around the world and take pictures of the places he visits.

Moshir Mikael is a Senior Practice Manager with AWS Professional Services.  He led development of large enterprise data platforms in EMEA and currently leading the Professional Services teams in EMEA for analytics.

Anouar Zaaber is a Senior Engagement Manager in AWS Professional Services. He leads internal AWS teams, external partners, and customer teams to deliver AWS cloud services that enable customers to realize their business outcomes.

Introducing AWS Glue Auto Scaling: Automatically resize serverless computing resources for lower cost with optimized Apache Spark

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/introducing-aws-glue-auto-scaling-automatically-resize-serverless-computing-resources-for-lower-cost-with-optimized-apache-spark/

Data created in the cloud is growing fast in recent days, so scalability is a key factor in distributed data processing. Many customers benefit from the scalability of the AWS Glue serverless Spark runtime. Today, we’re pleased to announce the release of AWS Glue Auto Scaling, which helps you scale your AWS Glue Spark jobs automatically based on the requirements calculated dynamically during the job run, and accelerate job runs at lower cost without detailed capacity planning.

Before AWS Glue Auto Scaling, you had to predict workload patterns in advance. For example, in cases when you don’t have expertise in Apache Spark, when it’s the first time you’re processing the target data, or when the volume or variety of the data is significantly changing, it’s not so easy to predict the workload and plan the capacity for your AWS Glue jobs. Under-provisioning is error-prone and can lead to either missed SLA or unpredictable performance. On the other hand, over-provisioning can cause underutilization of resources and cost overruns. Therefore, it was a common best practice to experiment with your data, monitor the metrics, and adjust the number of AWS Glue workers before you deployed your Spark applications to production.

With AWS Glue Auto Scaling, you no longer need to plan AWS Glue Spark cluster capacity in advance. You can just set the maximum number of workers and run your jobs. AWS Glue monitors the Spark application execution, and allocates more worker nodes to the cluster in near-real time after Spark requests more executors based on your workload requirements. When there are idle executors that don’t have intermediate shuffle data, AWS Glue Auto Scaling removes the executors to save the cost.

AWS Glue Auto Scaling is available with the optimized Spark runtime on AWS Glue version 3.0, and you can start using it today. This post describes possible use cases and how it works.

Use cases and benefits for AWS Glue Auto Scaling

Traditionally, AWS Glue launches a serverless Spark cluster of a fixed size. The computing resources are held for the whole job run until it is completed. With the new AWS Glue Auto Scaling feature, after you enable it for your AWS Glue Spark jobs, AWS Glue dynamically allocates compute resource considering the given maximum number of workers. It also supports dynamic scale-out and scale-in of the AWS Glue Spark cluster size over the course of job. As more executors are requested by Spark, more AWS Glue workers are added to the cluster. When the executor has been idle without active computation tasks for a period of time and associated shuffle dependencies, the executor and corresponding worker are removed.

AWS Glue Auto Scaling makes it easy to run your data processing in the following typical use cases:

  • Batch jobs to process unpredictable amounts of data
  • Jobs containing driver-heavy workloads (for example, processing many small files)
  • Jobs containing multiple stages with uneven compute demands or due to data skews (for example, reading from a data store, repartitioning it to have more parallelism, and then processing further analytic workloads)
  • Jobs to write large amouns of data into data warehouses such as Amazon Redshift or read and write from databases

Configure AWS Glue Auto Scaling

AWS Glue Auto Scaling is available with the optimized Spark runtime on Glue version 3.0. To enable Auto Scaling on the AWS Glue Studio console, complete the following steps:

  1. Open AWS Glue Studio.
  2. Choose Jobs.
  3. Choose your job.
  4. Choose the Job details tab.
  5. For Glue version, choose Glue 3.0 – Supports spark 3.1, Scala 2, Python.
  6. Select Automatically scale the number of workers.
  7. For Maximum number of workers, enter the maximum workers that can be vended to the job run.
  8. Choose Save.

To enable Auto Scaling in the AWS Glue API or AWS Command Line Interface (AWS CLI), set the following job parameters:

  • Key--enable-auto-scaling
  • Valuetrue

Monitor AWS Glue Auto Scaling

In this section, we discuss three ways to monitor AWS Glue Auto Scaling: via Amazon CloudWatch metrics or Spark UI.

CloudWatch metrics

After you enable AWS Glue Auto Scaling, Spark dynamic allocation is enabled and the executor metrics are visible in CloudWatch. You can review the following metrics to understand the demand and optimized usage of executors in their Spark applications enabled with Auto Scaling:

  • glue.driver.ExecutorAllocationManager.executors.numberAllExecutors
  • glue.driver.ExecutorAllocationManager.executors.numberMaxNeededExecutors

AWS Glue Studio Monitoring page

In the Monitoring page in AWS Glue Studio, you can monitor the DPU hours you spent for a specific job run. The following screenshot shows two job runs that processed the same dataset; one without Auto Scaling which spent 8.71 DPU hours, and another one with Auto Scaling enabled which spent only 1.48 DPU hours. The DPU hour values per job run are also available with GetJobRun API responses.

Spark UI

With the Spark UI, you can monitor that the AWS Glue Spark cluster dynamically scales out and scales in with AWS Glue Auto Scaling. The event timeline shows when each executor is added and removed gradually over the Spark application run.

In the following sections, we demonstrate AWS Glue Auto Scaling with two use cases: jobs with driver-heavy workloads, and jobs with multiple stages.

Example 1: Jobs containing driver-heavy workloads

A typical workload for AWS Glue Spark jobs is to process many small files to prepare the data for further analysis. For such workloads, AWS Glue has built-in optimizations, including file grouping, a Glue S3 Lister, partition pushdown predicates, partition indexes, and more. For more information, see Optimize memory management in AWS Glue. All those optimizations execute on the Spark driver and speed up the planning phase on Spark driver to compute and distribute the work for parallel processing with Spark executors. However, without AWS Glue Auto Scaling, Spark executors are idle during the planning phase. With Auto Scaling, Glue jobs only allocate executors when the driver work is complete, thereby saving executor cost.

Here’s the example DAG shown in AWS Glue Studio. This AWS Glue job reads from an Amazon Simple Storage Service (Amazon S3) bucket, performs the ApplyMapping transformation, runs a simple SELECT query repartitioning data to have 800 partitions, and writes back to another location in Amazon S3.

Without AWS Glue Auto Scaling

The following screenshot shows the executor timeline in Spark UI when the AWS Glue job ran with 20 workers without Auto Scaling. You can confirm that all 20 workers started at the beginning of the job run.

With AWS Glue Auto Scaling

In contrast, the following screenshot shows the executor timeline of the same job with Auto Scaling enabled and the maximum workers set to 20. The driver and one executor started at the beginning, and other executors started only after the driver finished its computation for listing 367,920 partitions on the S3 bucket. These 19 workers were not charged during the long-running driver task.

Both jobs completed in 44 minutes. With AWS Glue Auto Scaling, the job completed in the same amount of time with lower cost.

Example 2: Jobs containing multiple stages

Another typical workload in AWS Glue is to read from the data store or large compressed files, repartition it to have more parallelism for downstream processing, and process further analytic queries. For example, when you want to read from a JDBC data store, you may not want to have many concurrent connections, so you can avoid impacting source database performance. For such workloads, you can have a small number of connections to read data from the JDBC data store, then repartition the data with higher parallelism for further analysis.

Here’s the example DAG shown in AWS Glue Studio. This AWS Glue job reads from the JDBC data source, runs a simple SELECT query adding one more column (mod_id) calculated from the column ID, performs the ApplyMapping node, then writes to an S3 bucket with partitioning by this new column mod_id. Note that the JDBC data source was already registered in the AWS Glue Data Catalog, and the table has two parameters, hashfield=id and hashpartitions=5, to read from JDBC through five concurrent connections.

Without AWS Glue Auto Scaling

The following screenshot shows the executor timeline in the Spark UI when the AWS Glue job ran with 20 workers without Auto Scaling. You can confirm that all 20 workers started at the beginning of the job run.

With AWS Glue Auto Scaling

The following screenshot shows the same executor timeline in the Spark UI with Auto Scaling enabled with 20 maximum workers. The driver and two executors started at the beginning, and other executors started later. The first two executors read data from the JDBC source with fewer number of concurrent connections. Later, the job increased parallelism and more executors were started. You can also observe that there were 16 executors, not 20, which further reduced cost.

Conclusion

This post discussed AWS Glue Auto Scaling, which automatically resizes the computing resources of your AWS Glue Spark job capacity and reduce cost. You can start using AWS Glue Auto Scaling for both your existing workloads and future new workloads, and take advantage of it today! For more information about AWS Glue Auto Scaling, see Using Auto Scaling for AWS Glue. Migrate your jobs to Glue version 3.0 and get the benefits of Auto Scaling.

Special thanks to everyone who contributed to the launch: Raghavendhar Thiruvoipadi Vidyasagar, Ping-Yao Chang, Shashank Bhardwaj, Sampath Shreekantha, Vaibhav Porwal, and Akash Gupta.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is passionate about architecting fast-growing data platforms, diving deep into distributed big data software like Apache Spark, building reusable software artifacts for data lakes, and sharing the knowledge in AWS Big Data blog posts. In his spare time, he enjoys taking care of killifish, hermit crabs, and grubs with his children.

Bo Li is a Software Development Engineer on the AWS Glue team. He is devoted to designing and building end-to-end solutions to address customers’ data analytic and processing needs with cloud-based, data-intensive technologies.

Rajendra Gujja is a Software Development Engineer on the AWS Glue team. He is passionate about distributed computing and everything and anything about data.

Mohit Saxena is a Senior Software Development Manager on the AWS Glue team. His team works on distributed systems for efficiently managing data lakes on AWS and optimizes Apache Spark for performance and reliability.

Enhance analytics with Google Trends data using AWS Glue, Amazon Athena, and Amazon QuickSight

Post Syndicated from Drew Philip original https://aws.amazon.com/blogs/big-data/enhance-analytics-with-google-trends-data-using-aws-glue-amazon-athena-and-amazon-quicksight/

In today’s market, business success often lies in the ability to glean accurate insights and predictions from data. However, data scientists and analysts often find that the data they have at their disposal isn’t enough to help them make accurate predictions for their use cases. A variety of factors might alter an outcome and should be taken into account when making a prediction model. Google Trends is an available option, presenting a broad source of data that reflects global trends more comprehensively. This can help enrich a dataset to yield a better model.

You can use Google Trends data for a variety of analytical use cases. For example, you can use it to learn about how your products or brands are faring among targeted audiences. You can also use it to monitor competitors and see how well they’re performing against your brand.

In this post, we shows how to get Google Trends data programmatically, integrate it into a data pipeline, and use it to analyze data, using Amazon Simple Storage Service (Amazon S3), AWS Glue, Amazon Athena, and Amazon QuickSight. We use an example dataset of movies and TV shows and demonstrate how to get the search queries from Google Trends to analyze the popularity of movies and TV shows.

Solution overview

The following diagram shows a high-level architecture of the solution using Amazon S3, AWS Glue, the Google Trends API, Athena, and QuickSight.

The solution consists of the following components:

  1. Amazon S3 – The storage layer that stores the list of topics for which Google Trends data has to be gathered. It also stores the results returned by Google Trends.
  2. AWS Glue – The serverless data integration service that calls Google Trends for the list of topics to get the search results, aggregates the data, and loads it to Amazon S3.
  3. Athena – The query engine that allows you to query the data stored in Amazon S3. You can use it for supporting one-time SQL queries on Google Trends data and for building dashboards using tools like QuickSight.
  4. QuickSight – The reporting tool used for building visualizations.

In the following sections, we walk through the steps to set up the environment, download the libraries, create and run the AWS Glue job, and explore the data.

Set up your environment

Complete the following steps to set up your environment:

  1. Create an S3 bucket where you upload the list of movies and TV shows. For this post, we use a Netflix Movies and TV Shows public dataset from Kaggle.
  2. Create an AWS Identity and Access Management (IAM) service role that allows AWS Glue to read and write data to the S3 buckets you just created.
  3. Create a new QuickSight account with the admin/author role and access granted to Athena and Amazon S3.

Download the external libraries and dependencies for the AWS Glue Job

The AWS Glue job needs the following two external Python libraries: pytrends and awswrangler. pytrends is a library that provides a simple interface for automating the downloading of reports from Google Trends. awswrangler is a library provided by AWS to integrate data between a Pandas DataFrame and AWS repositories like Amazon S3.

Download the following .whl files for the libraries and upload them to Amazon S3:

Create and configure an AWS Glue job

To set up your AWS Glue job, complete the following steps:

  1. On the AWS Glue console, under ETL in the navigation pane, choose Jobs – New.
  2. For Create job, select Python Shell script editor.
  3. For Options, select Create a new script with boilerplate code.
  4. Choose Create.
  5. On the Script tab, enter the following script, replacing the source and target buckets with your bucket names:
    # Import external library TrendReq needed to connect to Google Trends API and library awswrangler to read/write from pandas to Amazon S3.
    
    from pytrends.request import TrendReq
    pytrend = TrendReq(hl='en-US', tz=360, timeout=10) 
    import pandas as pd
    import awswrangler as wr
    
    # Function get_gtrend, accepts a list of terms as input, calls Google Trends API for each term to get the search trends 
    def get_gtrend(terms):
      trends =[]
      for term in terms:
    # Normalizing the data using popular movie Titanic as baseline to get trends over time.
        pytrend.build_payload(kw_list=["Titanic",term.lower()])
        df = pytrend.interest_over_time()
        df["google_trend"] = round((df[term.lower()] /df['Titanic']) *100)
        
    # Transforming and filtering trends results to align with Analytics use case
        df_trend = df.loc[df.index >= "2018-1-1", "google_trend"].resample(rule="M").max().to_frame()
        df_trend["movie"] = term
        trends.append(df_trend.reset_index())
    
    # Last step in function to concatenate the results for each term and return an aggregated dataset 
      concat_df = pd.concat(trends)
      return concat_df
    
    def main():
      
    # Change the bucket and prefix name to Amazon S3 location where movie titles file from Kaggle has been downloaded. 
      source_bucket = "source_bucket"
      source_prefix = "source_prefix"
    
    # Awswrangler method s3.read_csv is called to load the titles from S3 location into a DataFrame and convert it to a list.
      df = wr.s3.read_csv(f's3://{source_bucket}/{source_prefix}/')
      movies = df['title'].head(20).values.tolist()
    
    #  Call the get_trends function and pass the list of movies as an input. Pandas dataframe is returned with trend data for movies.
      df = get_gtrend(terms=movies)
    
    # Change the prefix name to location where you want to store results. 
      target_bucket = "target_bucket" 
      target_prefix = "target_prefix" 
    
    # Use awswrangler to save pandas dataframe to Amazon S3. 
      wr.s3.to_csv(df,f's3://{target_bucket}/{target_prefix}/trends.csv',index= False)
    
    
    # Invoke the main function
    main()

  6. On the Job details tab, for Name, enter the name of the AWS Glue job.
  7. For IAM Role, choose the role that you created earlier with permissions to run the job and access Amazon S3.
  8. For Type, enter Python Shell to run the Python code.
  9. For Python Version, specify the Python version as Python 3.6.
  10. For Data processing units, choose 1 DPU.
  11. For Number of retries, enter .
  12. Expand Advanced properties and under Libraries, enter the location of the S3 bucket where the pytrends and awswrangler files were downloaded.
  13. Choose Save to save the job.

Run the AWS Glue job

Navigate to the AWS Glue console and run the AWS Glue job you created. When the job is complete, a CSV file with the Google Trends values is created in the target S3 bucket with the prefix specified in the main() function. In the next step, we create an AWS Glue table referring to the target bucket and prefix to allow queries to be run against the Google Trends data.

Create an AWS Glue table on the Google Trends data

In this step, we create a table in the AWS Glue Data Catalog using Athena. The table is created on top of the Google Trends data saved in the target S3 bucket.

In the Athena query editor, select default as the database and enter the following DDL command to create a table named trends. Replace the target bucket and prefix with your own values.

CREATE EXTERNAL TABLE `trends`(
  `date` date, 
  `google_trend` double, 
  `title` string)
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY ',' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://<< target_bucket >>/<<target_prefix >>/'
TBLPROPERTIES (
  'has_encrypted_data'='false', 
  'skip.header.line.count'='1')

This table has three columns:

  • date – The time dimension for aggregating the data. In this example, the time period is monthly.
  • google_trend – The count of Google Trends values normalized on a scale of 0–100.
  • title – The name of the movie or TV show.

Query the data using Athena

Now you can run one-time queries to find the popularity of movies and TV shows.

In the first example, we find the top 10 most popular movies and TV shows for November 2021. In the Athena query editor, enter the following SQL command to query the trends table created in the previous step:

select title,google_trend
from trends 
where date = date_parse('2021-11-30','%Y-%m-%d')
order by google_trend desc
limit 10

In the following example, we find the top 10 most popular movies and TV shows that have grown most in popularity in 2021 until November 30. In the Athena query editor, enter the following SQL command to query the trends table:

select  title,max(google_trend)-min(google_trend) trend_diff
from trends
where date between date_parse('2021-01-31','%Y-%m-%d') and date_parse('2021-11-30','%Y-%m-%d')
group by title
order by 2 desc
limit 10

Build a dashboard to visualize the data using QuickSight

We can use QuickSight to build a dashboard on the data downloaded from Google Trends to identify top movies and TV shows. Complete the following steps:

  1. Sign in to your QuickSight account.
  2. On the QuickSight console, choose Datasets and choose New dataset.
  3. Choose Athena as your data source.
  4. For Data source name, enter a name.
  5. For Athena workgroup, choose [primary].
  6. Choose Create data source.
  7. For Database, choose default.
  8. For Tables, select the trends table.
  9. Choose Select.
  10. Select Directly query your data.
  11. Choose Visualize.

For the first visual, we create a bar chart of the top movies or TV shows by title sorted in ascending order of aggregated Google Trends values.

  1. Choose the horizontal bar chart visual type.
  2. For Y axis, choose title.
  3. For Value, choose google_trend (Average).

Next, we create a time series plot of Google Trends count by month for titles.

  1. Add a new visual and choose the autograph visual type.
  2. For X axis, choose date.
  3. For Value, choose google_trend (Sum).
  4. For Color¸ choose title.

Clean up

To avoid incurring future charges, delete the resources you created for AWS Glue, Amazon S3, IAM, and QuickSight.

  1. AWS Glue Catalog table
    • On the AWS Glue console, choose Tables under Databases in the navigation pane.
    • Select the AWS Glue Data Catalog table that you created.
    • On the Actions drop-down menu, choose Delete.
    • Choose Delete to confirm.
  2. AWS Glue Job
    • Choose Jobs in the navigation pane.
    • Select the AWS Glue job you created.
    • On the Actions drop-down menu, choose Delete.
  3. S3 bucket
    • On the Amazon S3 console, choose Buckets in navigation pane.
    • Choose the bucket you created.
    • Choose Empty and enter your bucket name.
    • Choose Confirm.
    • Choose Delete and enter your bucket name.
    • Choose Delete bucket.
  4. IAM Role
    • On the IAM console, choose Roles in navigation pane.
    • Choose the role you attached to AWS Glue job.
    • Choose Delete role.
    • Choose Yes.
  5. Amazon QuickSight
    • If you created a QuickSight user for trying out this blog and do not want to retain that access, please ask your QuickSight admin to delete your user.
    • If you created the QuickSight account itself just for trying this blog and no longer want to retain it, use following steps to delete it.
    • Choose your user name on the application bar, and then choose Manage QuickSight
    • Choose Account settings.
    • Choose Delete Account.

You can only have one QuickSight account active for each AWS account. Make sure that other users aren’t using QuickSight before you delete the account.

Conclusion

Integrating external data sources such as Google Trends via AWS Glue, Athena, and QuickSight can help you enrich your datasets to yield greater insights. You can use it in a data science context when the model is under-fit and requires more relevant data in order to make better predictions. In this post, we used movies as an example, but the solution extends to a wide breadth of industries, such as products in a retail context or commodities in a finance context. If the simple inventory histories or the transaction dates are available, you may find little correlation to future demand or prices. But with an integrated data pipeline using external data, new relationships in the dataset make the model more reliable.

In a business context, whether your team wants to test out a machine learning (ML) proof of concept more quickly or have limited access to pertinent data, Google Trends integration is a relatively quick way to enrich your data for the purposes of ML and data insights.

You can also extend this concept to other third-party datasets, such as social media sentiment, as your team’s expertise grows and your ML and analytics operations mature. Integrating external datasets such as Google Trends is just one part of the feature and data engineering process, but it’s a great place to start and, in our experience, most often leads to better models that businesses can innovate from.


About the Authors

Drew Philip is a Sr. Solutions Architect with AWS Private Equity. He has held senior
technical leadership positions within key AWS partners such as Microsoft, Oracle, and
Rackspace. Drew focuses on applied engineering that leverages AI-enabled digital innovation and development, application modernization, resiliency and operational excellence for workloads at scale in the public and private sector. He sits on the board of Calvin University’s computer science department and is a contributing member of the AWS Machine Learning Technical Focus Community.

Gautam Prothia is a Senior Solution Architect within AWS dedicated to Strategic Accounts. Gautam has more than 15+ years of experience designing and implementing large-scale data management and analytical solutions. He has worked with many clients across industries to help them modernize their data platforms on the cloud.

Simon Zamarin is an AI/ML Solutions Architect whose main focus is helping customers extract value from their data assets. In his spare time, Simon enjoys spending time with family, reading sci-fi, and working on various DIY house projects.

Develop and test AWS Glue version 3.0 jobs locally using a Docker container

Post Syndicated from Subramanya Vajiraya original https://aws.amazon.com/blogs/big-data/develop-and-test-aws-glue-version-3-0-jobs-locally-using-a-docker-container/

AWS Glue is a fully managed serverless service that allows you to process data coming through different data sources at scale. You can use AWS Glue jobs for various use cases such as data ingestion, preprocessing, enrichment, and data integration from different data sources. AWS Glue version 3.0, the latest version of AWS Glue Spark jobs, provides a performance-optimized Apache Spark 3.1 runtime experience for batch and stream processing.

You can author AWS Glue jobs in different ways. If you prefer coding, AWS Glue allows you to write Python/Scala source code with the AWS Glue ETL library. If you prefer interactive scripting, AWS Glue interactive sessions and AWS Glue Studio notebooks helps you to write scripts in notebooks by inspecting and visualizing the data. If you prefer a graphical interface rather than coding, AWS Glue Studio helps you author data integration jobs visually without writing code.

For a production-ready data platform, a development process and CI/CD pipeline for AWS Glue jobs is key. We understand the huge demand for developing and testing AWS Glue jobs where you prefer to have flexibility, a local laptop, a Docker container on Amazon Elastic Compute Cloud (Amazon EC2), and so on. You can achieve that by using AWS Glue Docker images hosted on Docker Hub or the Amazon Elastic Container Registry (Amazon ECR) Public Gallery. The Docker images help you set up your development environment with additional utilities. You can use your preferred IDE, notebook, or REPL using the AWS Glue ETL library.

This post is a continuation of blog post “Developing AWS Glue ETL jobs locally using a container“. While the earlier post introduced the pattern of development for AWS Glue ETL Jobs on a Docker container using a Docker image, this post focuses on how to develop and test AWS Glue version 3.0 jobs using the same approach.

Solution overview

The following Docker images are available for AWS Glue on Docker Hub:

  • AWS Glue version 3.0amazon/aws-glue-libs:glue_libs_3.0.0_image_01
  • AWS Glue version 2.0amazon/aws-glue-libs:glue_libs_2.0.0_image_01

You can also obtain the images from the Amazon ECR Public Gallery:

  • AWS Glue version 3.0public.ecr.aws/glue/aws-glue-libs:glue_libs_3.0.0_image_01
  • AWS Glue version 2.0public.ecr.aws/glue/aws-glue-libs:glue_libs_2.0.0_image_01

Note: AWS Glue Docker images are x86_64 compatible and arm64 hosts are currently not supported.

In this post, we use amazon/aws-glue-libs:glue_libs_3.0.0_image_01 and run the container on a local machine (Mac, Windows, or Linux). This container image has been tested for AWS Glue version 3.0 Spark jobs. The image contains the following:

  • Amazon Linux
  • AWS Glue ETL Library (aws-glue-libs)
  • Apache Spark 3.1.1
  • Spark history server
  • JupyterLab
  • Livy
  • Other library dependencies (the same as the ones of the AWS Glue job system)

To set up your container, you pull the image from Docker Hub and then run the container. We demonstrate how to run your container with the following methods, depending on your requirements:

  • spark-submit
  • REPL shell (pyspark)
  • pytest
  • JupyterLab
  • Visual Studio Code

Prerequisites

Before you start, make sure that Docker is installed and the Docker daemon is running. For installation instructions, see the Docker documentation for Mac, Windows, or Linux. Also make sure that you have at least 7 GB of disk space for the image on the host running Docker.

For more information about restrictions when developing AWS Glue code locally, see Local Development Restrictions.

Configure AWS credentials

To enable AWS API calls from the container, set up your AWS credentials with the following steps:

  1. Create an AWS named profile.
  2. Open cmd on Windows or a terminal on Mac/Linux, and run the following command:
    PROFILE_NAME="profile_name"

In the following sections, we use this AWS named profile.

Pull the image from Docker Hub

If you’re running Docker on Windows, choose the Docker icon (right-click) and choose Switch to Linux containers… before pulling the image.

Run the following command to pull the image from Docker Hub:

docker pull amazon/aws-glue-libs:glue_libs_3.0.0_image_01

Run the container

Now you can run a container using this image. You can choose any of following methods based on your requirements.

spark-submit

You can run an AWS Glue job script by running the spark-submit command on the container.

Write your ETL script (sample.py in the example below) and save it under the /local_path_to_workspace/src/ directory using the following commands:

$ WORKSPACE_LOCATION=/local_path_to_workspace
$ SCRIPT_FILE_NAME=sample.py
$ mkdir -p ${WORKSPACE_LOCATION}/src
$ vim ${WORKSPACE_LOCATION}/src/${SCRIPT_FILE_NAME}

These variables are used in the docker run command below. The sample code (sample.py) used in the spark-submit command below is included in the appendix at the end of this post.

Run the following command to run the spark-submit command on the container to submit a new Spark application:

$ docker run -it -v ~/.aws:/home/glue_user/.aws -v $WORKSPACE_LOCATION:/home/glue_user/workspace/ -e AWS_PROFILE=$PROFILE_NAME -e DISABLE_SSL=true --rm -p 4040:4040 -p 18080:18080 --name glue_spark_submit amazon/aws-glue-libs:glue_libs_3.0.0_image_01 spark-submit /home/glue_user/workspace/src/$SCRIPT_FILE_NAME
...22/01/26 09:08:55 INFO DAGScheduler: Job 0 finished: fromRDD at DynamicFrame.scala:305, took 3.639886 s
root
|-- family_name: string
|-- name: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- gender: string
|-- image: string
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- other_names: array
| |-- element: struct
| | |-- lang: string
| | |-- note: string
| | |-- name: string
|-- sort_name: string
|-- images: array
| |-- element: struct
| | |-- url: string
|-- given_name: string
|-- birth_date: string
|-- id: string
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
|-- death_date: string

...

REPL shell (pyspark)

You can run a REPL (read-eval-print loop) shell for interactive development. Run the following command to run the pyspark command on the container to start the REPL shell:

$ docker run -it -v ~/.aws:/home/glue_user/.aws -e AWS_PROFILE=$PROFILE_NAME -e DISABLE_SSL=true --rm -p 4040:4040 -p 18080:18080 --name glue_pyspark amazon/aws-glue-libs:glue_libs_3.0.0_image_01 pyspark
...
 ____ __
 / __/__ ___ _____/ /__
 _\ \/ _ \/ _ `/ __/ '_/
 /__ / .__/\_,_/_/ /_/\_\  version 3.1.1-amzn-0
 /_/

Using Python version 3.7.10 (default, Jun 3 2021 00:02:01)
Spark context Web UI available at http://56e99d000c99:4040
Spark context available as 'sc' (master = local[*], app id = local-1643011860812).
SparkSession available as 'spark'.
>>> 

pytest

For unit testing, you can use pytest for AWS Glue Spark job scripts.

Run the following commands for preparation:

$ WORKSPACE_LOCATION=/local_path_to_workspace
$ SCRIPT_FILE_NAME=sample.py
$ UNIT_TEST_FILE_NAME=test_sample.py
$ mkdir -p ${WORKSPACE_LOCATION}/tests
$ vim ${WORKSPACE_LOCATION}/tests/${UNIT_TEST_FILE_NAME}

Run the following command to run pytest on the test suite:

$ docker run -it -v ~/.aws:/home/glue_user/.aws -v $WORKSPACE_LOCATION:/home/glue_user/workspace/ -e AWS_PROFILE=$PROFILE_NAME -e DISABLE_SSL=true --rm -p 4040:4040 -p 18080:18080 --name glue_pytest amazon/aws-glue-libs:glue_libs_3.0.0_image_01 -c "python3 -m pytest"
starting org.apache.spark.deploy.history.HistoryServer, logging to /home/glue_user/spark/logs/spark-glue_user-org.apache.spark.deploy.history.HistoryServer-1-5168f209bd78.out
============================================================= test session starts =============================================================
platform linux -- Python 3.7.10, pytest-6.2.3, py-1.11.0, pluggy-0.13.1
rootdir: /home/glue_user/workspace
plugins: anyio-3.4.0
collected 1 item  

tests/test_sample.py . [100%]

============================================================== warnings summary ===============================================================
tests/test_sample.py::test_counts
 /home/glue_user/spark/python/pyspark/sql/context.py:79: DeprecationWarning: Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead.
 DeprecationWarning)

-- Docs: https://docs.pytest.org/en/stable/warnings.html
======================================================== 1 passed, 1 warning in 21.07s ========================================================

JupyterLab

You can start Jupyter for interactive development and ad hoc queries on notebooks. Complete the following steps:

  1. Run the following command to start JupyterLab:
    $ JUPYTER_WORKSPACE_LOCATION=/local_path_to_workspace/jupyter_workspace/
    $ docker run -it -v ~/.aws:/home/glue_user/.aws -v $JUPYTER_WORKSPACE_LOCATION:/home/glue_user/workspace/jupyter_workspace/ -e AWS_PROFILE=$PROFILE_NAME -e DISABLE_SSL=true --rm -p 4040:4040 -p 18080:18080 -p 8998:8998 -p 8888:8888 --name glue_jupyter_lab amazon/aws-glue-libs:glue_libs_3.0.0_image_01 /home/glue_user/jupyter/jupyter_start.sh
    ...
    [I 2022-01-24 08:19:21.368 ServerApp] Serving notebooks from local directory: /home/glue_user/workspace/jupyter_workspace
    [I 2022-01-24 08:19:21.368 ServerApp] Jupyter Server 1.13.1 is running at:
    [I 2022-01-24 08:19:21.368 ServerApp] http://faa541f8f99f:8888/lab
    [I 2022-01-24 08:19:21.368 ServerApp] or http://127.0.0.1:8888/lab
    [I 2022-01-24 08:19:21.368 ServerApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).

  2. Open http://127.0.0.1:8888/lab in your web browser in your local machine to access the JupyterLab UI.
  3. Choose Glue Spark Local (PySpark) under Notebook.

Now you can start developing code in the interactive Jupyter notebook UI.

Visual Studio Code

To set up the container with Visual Studio Code, complete the following steps:

  1. Install Visual Studio Code.
  2. Install Python.
  3. Install Visual Studio Code Remote – Containers.
  4. Open the workspace folder in Visual Studio Code.
  5. Choose Settings.
  6. Choose Workspace.
  7. Choose Open Settings (JSON).
  8. Enter the following JSON and save it:
    {
        "python.defaultInterpreterPath": "/usr/bin/python3",
        "python.analysis.extraPaths": [
            "/home/glue_user/aws-glue-libs/PyGlue.zip:/home/glue_user/spark/python/lib/py4j-0.10.9-src.zip:/home/glue_user/spark/python/",
        ]
    }

Now you’re ready to set up the container.

  1. Run the Docker container:
    $ docker run -it -v ~/.aws:/home/glue_user/.aws -v $WORKSPACE_LOCATION:/home/glue_user/workspace/ -e AWS_PROFILE=$PROFILE_NAME -e DISABLE_SSL=true --rm -p 4040:4040 -p 18080:18080 --name glue_pyspark amazon/aws-glue-libs:glue_libs_3.0.0_image_01 pyspark

  2. Start Visual Studio Code.
  3. Choose Remote Explorer in the navigation pane, and choose the container amazon/aws-glue-libs:glue_libs_3.0.0_image_01.
  4. Right-click and choose Attach to Container.
  5. If the following dialog appears, choose Got it.
  6. Open /home/glue_user/workspace/.
  7. Create an AWS Glue PySpark script and choose Run.

You should see the successful run on the AWS Glue PySpark script.

Conclusion

In this post, we learned how to get started on AWS Glue Docker images. AWS Glue Docker images help you develop and test your AWS Glue job scripts anywhere you prefer. It is available on Docker Hub and Amazon ECR Public Gallery. Check it out, we look forward to getting your feedback.

Appendix: AWS Glue job sample codes for testing

This appendix introduces three different scripts as AWS Glue job sample codes for testing purposes. You can use any of them in the tutorial.

The following sample.py code uses the AWS Glue ETL library with an Amazon Simple Storage Service (Amazon S3) API call. The code requires Amazon S3 permissions in AWS Identity and Access Management (IAM). You need to grant the IAM-managed policy arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess or IAM custom policy that allows you to make ListBucket and GetObject API calls for the S3 path.

import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions


class GluePythonSampleTest:
    def __init__(self):
        params = []
        if '--JOB_NAME' in sys.argv:
            params.append('JOB_NAME')
        args = getResolvedOptions(sys.argv, params)

        self.context = GlueContext(SparkContext.getOrCreate())
        self.job = Job(self.context)

        if 'JOB_NAME' in args:
            jobname = args['JOB_NAME']
        else:
            jobname = "test"
        self.job.init(jobname, args)

    def run(self):
        dyf = read_json(self.context, "s3://awsglue-datasets/examples/us-legislators/all/persons.json")
        dyf.printSchema()

        self.job.commit()


def read_json(glue_context, path):
    dynamicframe = glue_context.create_dynamic_frame.from_options(
        connection_type='s3',
        connection_options={
            'paths': [path],
            'recurse': True
        },
        format='json'
    )
    return dynamicframe


if __name__ == '__main__':
    GluePythonSampleTest().run()z
	

The following test_sample.py code is a sample for a unit test of sample.py:

import pytest
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
import sys
from src import sample


@pytest.fixture(scope="module", autouse=True)
def glue_context():
    sys.argv.append('--JOB_NAME')
    sys.argv.append('test_count')

    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    context = GlueContext(SparkContext.getOrCreate())
    job = Job(context)
    job.init(args['JOB_NAME'], args)

    yield(context)

    job.commit()


def test_counts(glue_context):
    dyf = sample.read_json(glue_context, "s3://awsglue-datasets/examples/us-legislators/all/persons.json")
    assert dyf.toDF().count() == 1961

About the Authors

Subramanya Vajiraya is a Cloud Engineer (ETL) at AWS Sydney specialized in AWS Glue. He is passionate about helping customers solve issues related to their ETL workload and implement scalable data processing and analytics pipelines on AWS. Outside of work, he enjoys going on bike rides and taking long walks with his dog Ollie, a 1-year-old Corgi.

Vishal Pathak is a Data Lab Solutions Architect at AWS. Vishal works with customers on their use cases, architects solutions to solve their business problems, and helps them build scalable prototypes. Prior to his journey in AWS, Vishal helped customers implement business intelligence, data warehouse, and data lake projects in the US and Australia.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He enjoys learning different use cases from customers and sharing knowledge about big data technologies with the wider community.

Best practices to optimize data access performance from Amazon EMR and AWS Glue to Amazon S3

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/best-practices-to-optimize-data-access-performance-from-amazon-emr-and-aws-glue-to-amazon-s3/

Customers are increasingly building data lakes to store data at massive scale in the cloud. It’s common to use distributed computing engines, cloud-native databases, and data warehouses when you want to process and analyze your data in data lakes. Amazon EMR and AWS Glue are two key services you can use for such use cases. Amazon EMR is a managed big data framework that supports several different applications, including Apache Spark, Apache Hive, Presto, Trino, and Apache HBase. AWS Glue Spark jobs run on top of Apache Spark, and distribute data processing workloads in parallel to perform extract, transform, and load (ETL) jobs to enrich, denormalize, mask, and tokenize data on a massive scale.

For data lake storage, customers typically use Amazon Simple Storage Service (Amazon S3) because it’s secure, scalable, durable, and highly available. Amazon S3 is designed for 11 9’s of durability and stores over 200 trillion objects for millions of applications around the world, making it the ideal storage destination for your data lake. Amazon S3 averages over 100 million operations per second, so your applications can easily achieve high request rates when using Amazon S3 as your data lake.

This post describes best practices to achieve the performance scaling you need when analyzing data in Amazon S3 using Amazon EMR and AWS Glue. We specifically focus on optimizing for Apache Spark on Amazon EMR and AWS Glue Spark jobs.

Optimizing Amazon S3 performance for large Amazon EMR and AWS Glue jobs

Amazon S3 is a very large distributed system, and you can scale to thousands of transactions per second in request performance when your applications read and write data to Amazon S3. Amazon S3 performance isn’t defined per bucket, but per prefix in a bucket. Your applications can achieve at least 3,500 PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per prefix in a bucket. Additionally, there are no limits to the number of prefixes in a bucket, so you can horizontally scale your read or write performance using parallelization. For example, if you create 10 prefixes in an S3 bucket to parallelize reads, you could scale your read performance to 55,000 read requests per second. You can similarly scale writes by writing data across multiple prefixes.

You can scale performance by utilizing automatic scaling in Amazon S3 and scan millions of objects for queries run over petabytes of data. Amazon S3 automatically scales in response to sustained new request rates, dynamically optimizing performance. While Amazon S3 is internally optimizing for a new request rate, you receive HTTP 503 request responses temporarily until the optimization completes:

AmazonS3Exception: Please reduce your request rate. (Service: Amazon S3; Status Code: 503; Error Code: SlowDown)

Such situations require the application to retry momentarily, but after Amazon S3 internally optimizes performance for the new request rate, all requests are generally served without retries. One such situation is when multiple workers in distributed compute engines such as Amazon EMR and AWS Glue momentarily generate a high number of requests to access data under the same prefix.

When using Amazon EMR and AWS Glue to process data in Amazon S3, you can employ certain best practices to manage request traffic and avoid HTTP Slow Down errors. Let’s look at some of these strategies.

Best practices to manage HTTP Slow Down responses

You can use the following approaches to take advantage of the horizontal scaling capability in Amazon S3 and improve the success rate of your requests when accessing Amazon S3 data using Amazon EMR and AWS Glue:

  • Modify the retry strategy for Amazon S3 requests
  • Adjust the number of Amazon S3 objects processed
  • Adjust the number of concurrent Amazon S3 requests

We recommend choosing and applying the options that fit best for your use case to optimize data processing on Amazon S3. In the following sections, we describe best practices of each approach.

Modify the retry strategy for Amazon S3 requests

This is the easiest way to avoid HTTP 503 Slow Down responses and improve the success rate of your requests. To access Amazon S3 data, both Amazon EMR and AWS Glue use the EMR File System (EMRFS), which retries Amazon S3 requests with jitters when it receives 503 Slow Down responses. To improve the success rate of your Amazon S3 requests, you can adjust your retry strategy by configuring certain properties. In Amazon EMR, you can configure parameters in your emrfs-site configuration. In AWS Glue, you can configure the parameters in job parameters. You can adjust your retry strategy in the following ways:

  • Increase the EMRFS default retry limit – By default, EMRFS uses an exponential backoff strategy to retry requests to Amazon S3. The default EMRFS retry limit is 15. However, you can increase this limit when you create a new cluster, on a running cluster, or at application runtime. To increase the retry limit, you can change the value of the fs.s3.maxRetries parameter. Note that you may experience longer job duration if you set a higher value for this parameter. We recommend experimenting with different values, such as 20 as a starting point, confirm the duration overhead of the jobs for each value, and then adjust this parameter based on your requirement.
  • For Amazon EMR, use the AIMD retry strategy – With Amazon EMR versions 6.4.0 and later, EMRFS supports an alternative retry strategy based on an additive-increase/multiplicative-decrease (AIMD) model. This strategy can be useful in shaping the request rate from large clusters. Instead of treating each request in isolation, this mode keeps track of the rate of recent successful and throttled requests. Requests are limited to a rate determined from the rate of recent successful requests. This decreases the number of throttled requests, and therefore the number of attempts needed per request. To enable the AIMD retry strategy, you can set the fs.s3.aimd.enabled property to true. You can further refine the AIMD retry strategy using the advanced AIMD retry settings.

Adjust the number of Amazon S3 objects processed

Another approach is to adjust the number of Amazon S3 objects processed so you have fewer requests made concurrently. When you lower the number of objects to be processed in a job, you use fewer Amazon S3 requests, thereby lowering the request rate or transactions per second (TPS) required for each job. Note the following considerations:

  • Preprocess the data by aggregating multiple smaller files into fewer, larger chunks – For example, use s3-dist-cp or an AWS Glue compaction blueprint to merge a large number of small files (generally less than 64 MB) into a smaller number of optimally sized files (such as 128–512 MB). This approach reduces the number of requests required, while simultaneously improving the aggregate throughput to read and process data in Amazon S3. You may need to experiment to arrive at the optimal size for your workload, because creating extremely large files can reduce the parallelism of the job.
  • Use partition pruning to scan data under specific partitions – In Apache Hive and Hive Metastore-compatible applications such as Apache Spark or Presto, one table can have multiple partition folders. Partition pruning is a technique to scan only the required data in a specific partition folder of a table. It’s useful when you want to read a specific portion from the entire table. To take advantage of predicate pushdown, you can use partition columns in the WHERE clause in Spark SQL or the filter expression in a DataFrame. In AWS Glue, you can also use a partition pushdown predicate when creating DynamicFrames.
  • For AWS Glue, enable job bookmarks – You can use AWS Glue job bookmarks to process continuously ingested data repeatedly. It only picks unprocessed data from the previous job run, thereby reducing the number of objects read or retrieved from Amazon S3.
  • For AWS Glue, enable bounded executionsAWS Glue bounded execution is a technique to only pick unprocessed data, with an upper bound on the dataset size or the number of files to be processed. This is another way to reduce the number of requests made to Amazon S3.

Adjust the number of concurrent Amazon S3 requests

To adjust the number of Amazon S3 requests to have fewer concurrent reads per prefix, you can configure Spark parameters. By default, Spark populates 10,000 tasks to list prefixes when creating Spark DataFrames. You may experience Slow Down responses, especially when you read from a table with highly nested prefix structures. In this case, it’s a good idea to configure Spark to limit the number of maximum listing parallelism by decreasing the parameter spark.sql.sources.parallelPartitionDiscovery.parallelism (the default is 10000).

To have fewer concurrent write requests per prefix, you can use the following techniques:

  • Reduce the number of Spark RDD partitions before writes – You can do this by using df.repartition(n) or df.coalesce(n) in DataFrames. For Spark SQL, you can also use query hints like REPARTITION or COALESCE. You can see the number of tasks (=RDD partitions) on the Spark UI.
  • For AWS Glue, group the input data – If the datasets are made up of small files, we recommend grouping the input data because it reduces the number of RDD partitions, and reduces the number of Amazon S3 requests to write the files.
  • Use the EMRFS S3-optimized committer – The EMRFS S3-optimized committer is used by default in Amazon EMR 5.19.0 and later, and AWS Glue 3.0. In AWS Glue 2.0, you can configure it in the job parameter --enable-s3-parquet-optimized-committer. The committer uses Amazon S3 multipart uploads instead of renaming files, and it usually reduces the number of HEAD/LIST requests significantly.

The following are other techniques to adjust the Amazon S3 request rate in Amazon EMR and AWS Glue. These options have the net effect of reducing parallelism of the Spark job, thereby reducing the probability of Amazon S3 Slow Down responses, although it can lead to longer job duration. We recommend testing and adjusting these values for your use case.

  • Reduce the number of concurrent jobs – Start with the most read/write heavy jobs. If you configured cross-account access for Amazon S3, keep in mind that other accounts might also be submitting jobs to the prefix.
  • Reduce the number of concurrent Spark tasks – You have several options:
    • For Amazon EMR, set the number of Spark executors (for example, the spark-submit option --num-executors and Spark parameter spark.executor.instance).
    • For AWS Glue, set the number of workers in the NumberOfWorkers parameter.
    • For AWS Glue, change the WorkerType parameter to a smaller one (for example, G.2X to G.1X).
    • Configure Spark parameters:
      • Decrease the number of spark.default.parallelism.
      • Decrease the number of spark.sql.shuffle.partitions.
      • Increase the number of spark.task.cpus (the default is 1) to allocate more CPU cores per Spark task.

Conclusion

In this post, we described the best practices to optimize data access from Amazon EMR and AWS Glue to Amazon S3. With these best practices, you can easily run Amazon EMR and AWS Glue jobs by taking advantage of Amazon S3 horizontal scaling, and process data in a highly distributed way at a massive scale.

For further guidance, please reach out to AWS Premium Support.

Appendix A: Configure CloudWatch request metrics

To monitor Amazon S3 requests, you can enable request metrics in Amazon CloudWatch for the bucket. Then, define a filter for the prefix. For a list of useful metrics to monitor, see Monitoring metrics with Amazon CloudWatch. After you enable metrics, use the data in the metrics to determine which of the aforementioned options is best for your use case.

Appendix B: Configure Spark parameters

To configure Spark parameters in Amazon EMR, there are several options:

  • spark-submit command – You can pass Spark parameters via the --conf option.
  • Job script – You can set Spark parameters in the SparkConf object in the job script codes.
  • Amazon EMR configurations – You can configure Spark parameters via API using Amazon EMR configurations. For more information, see Configure Spark.

To configure Spark parameters in AWS Glue, you can configure AWS Glue job parameters using key --conf with value like spark.hadoop.fs.s3.maxRetries=50.

To set multiple configs, configure your job parameters using key --conf with value like spark.hadoop.fs.s3.maxRetries=50 --conf spark.task.cpus=2.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is passionate about releasing AWS Glue connector custom blueprints and other software artifacts to help customers build their data lakes. In his spare time, he enjoys watching hermit crabs with his children.

Aditya Kalyanakrishnan is a Senior Product Manager on the Amazon S3 team at AWS. He enjoys learning from customers about how they use Amazon S3 and helping them scale performance. Adi’s based in Seattle, and in his spare time enjoys hiking and occasionally brewing beer.

Introducing Protocol buffers (protobuf) schema support in Amazon Glue Schema Registry

Post Syndicated from Vikas Bajaj original https://aws.amazon.com/blogs/big-data/introducing-protocol-buffers-protobuf-schema-support-in-amazon-glue-schema-registry/

AWS Glue Schema Registry now supports Protocol buffers (protobuf) schemas in addition to JSON and Avro schemas. This allows application teams to use protobuf schemas to govern the evolution of streaming data and centrally control data quality from data streams to data lake. AWS Glue Schema Registry provides an open-source library that includes Apache-licensed serializers and deserializers for protobuf that integrate with Java applications developed for Apache Kafka, Amazon Managed Streaming for Apache Kafka (Amazon MSK), Amazon Kinesis Data Streams, and Kafka Streams. Similar to Avro and JSON schemas, Protocol buffers schemas also support compatibility modes, schema sourcing via metadata, auto-registration of schemas, and AWS Identity and Access Management (IAM) compatibility.

In this post, we focus on Protocol buffers schema support in AWS Glue Schema Registry and how to use Protocol buffers schemas in stream processing Java applications that integrate with Apache Kafka, Amazon Managed Streaming for Apache Kafka and Amazon Kinesis Data Streams

Introduction to Protocol buffers

Protocol buffers is a language and platform-neutral, extensible mechanism for serializing and deserializing structured data for use in communications protocols and data storage. A protobuf message format is defined in the .proto file. Protobuf is recommended over other data formats when you need language interoperability, faster serialization and deserialization, type safety, schema adherence between data producer and consumer applications, and reduced coding effort. With protobuf, you can use generated code from the schema using the protobuf compiler (protoc) to easily write and read your data to and from data streams using a variety of languages. You can also use build tools plugins such as Maven and Gradle to generate code from protobuf schemas as part of your CI/CD pipelines. ​We use the following schema for code examples in this post, which defines an employee with a gRPC service definition to find an employee by ID:

Employee.proto

syntax = "proto2";
package gsr.proto.post;

import "google/protobuf/wrappers.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
import "google/type/money.proto";

service EmployeeSearch {
    rpc FindEmployee(EmployeeSearchParams) returns (Employee);
}
message EmployeeSearchParams {
    required int32 id = 1;
}
message Employee {
    required int32 id = 1;
    required string name = 2;
    required string address = 3;
    required google.protobuf.Int32Value employee_age = 4;
    required google.protobuf.Timestamp start_date = 5;
    required google.protobuf.Duration total_time_span_in_company = 6;
    required google.protobuf.BoolValue is_certified = 7;
    required Team team = 8;
    required Project project = 9;
    required Role role = 10;
    required google.type.Money total_award_value = 11;
}
message Team {
    required string name = 1;
    required string location = 2;
}
message Project {
    required string name = 1;
    required string state = 2;
}
enum Role {
    MANAGER = 0;
    DEVELOPER = 1;
    ARCHITECT = 2;
}

AWS Glue Schema Registry supports both proto2 and proto3 syntax. The preceding protobuf schema using version 2 contains three message types: Employee, Team, and Project using scalar, composite, and enumeration data types. Each field in the message definitions has a unique number, which is used to identify fields in the message binary format, and should not be changed once your message type is in use. In a proto2 message, a field can be required, optional, or repeated; in proto3, the options are repeated and optional. The package declaration makes sure generated code is namespaced to avoid any collisions. In addition to scalar, composite, and enumeration types, AWS Glue Schema Registry also supports protobuf schemas with common types such as Money, PhoneNumber,Timestamp, Duration, and nullable types such as BoolValue and Int32Value. It also supports protobuf schemas with gRPC service definitions with compatibility rules, such as EmployeeSearch, in the preceding schema. To learn more about the Protocol buffers, refer to its documentation.

Supported Protocol buffers specification and features

AWS Glue Schema Registry supports all the features of Protocol buffers for versions 2 and 3 except for groups, extensions, and importing definitions. AWS Glue Schema Registry APIs and its open-source library supports the latest protobuf runtime version. The protobuf schema operations in AWS Glue Schema Registry are supported via the AWS Management Console, AWS Command Line Interface (AWS CLI), AWS Glue Schema Registry API, AWS SDK, and AWS CloudFormation.

How AWS Glue Schema Registry works

The following diagram illustrates a high-level view of how AWS Glue Schema Registry works. AWS Glue Schema Registry allows you to register and evolve JSON, Apache Avro, and Protocol buffers schemas with compatibility modes. You can register multiple versions of each schema as the business needs or stream processing application’s requirements evolve. The AWS Glue Schema Registry open-source library provides JSON, Avro, and protobuf serializers and deserializers that you configure in producer and consumer stream processing applications, as shown in the following diagram. The open-source library also supports optional compression and caching configuration to save on data transfers.

To accommodate various business use cases, AWS Glue Schema Registry supports multiple compatibility modes. For example, if a consumer application is updated to a new schema version but is still able to consume and process messages based on the previous version of the same schema, then the schema is backward-compatible. However, if a schema version has bumped up in the producer application and the consumer application is not updated yet but can still consume and process the old and new message, then the schema is configured as forward-compatible. For more information, refer to How the Schema Registry Works.

Create a Protocol buffers schema in AWS Glue Schema Registry

In this section, we create a protobuf schema in AWS Glue Schema Registry via the console and AWS CLI.

Create a schema via the console

Make sure you have the required AWS Glue Schema Registry IAM permissions.

  1. On the AWS Glue console, choose Schema registries in the navigation pane.
  2. Click Add registry.
  3. For Registry name, enter employee-schema-registry.
  4. Click Add Registry.
  5. After the registry is created, click Add schema to register a new schema.
  6. For Schema name, enter Employee.proto.

The schema must be either Employee.proto or Employee if the protobuf schema doesn’t have the options option java_multiple_files = true; and option java_outer_classname = "<Outer class name>"; and if you decide to use protobuf schema generated code (POJOs) in your stream processing applications. We cover this with an example in a subsequent section of this post.­ For more information on protobuf options, refer to Options.

  1. For Registry, choose the registry employee-schema-registry.
  2. For Data format, choose Protocol buffers.
  3. For Compatibility mode, choose Backward.

You can choose other compatibility modes as per your use case.

  1. For First schema version, enter the preceding protobuf schema, then click Create schema and version.

After the schema is registered successfully, its status will be Available, as shown in the following screenshot.

Create a schema via the AWS CLI

Make sure you have IAM credentials with AWS Glue Schema Registry permissions.

  1. Run the following AWS CLI command to create a schema registry employee-schema-registry (for this post, we use the Region us-east-2):
    aws glue create-registry \
    --registry-name employee-schema-registry \
    --region us-east-2

The AWS CLI command returns the newly created schema registry ARN in response.

  1. Copy the RegistryArn value from the response to use in the following AWS CLI command.
  2. In the following command, use the preceding protobuf schema and schema name Employee.proto:
    aws glue create-schema --schema-name Employee.proto \
    --registry-id RegistryArn=<Schema Registry ARN that you copied from response of create registry CLI command> \
    --compatibility BACKWARD \
    --data-format PROTOBUF \
    --schema-definition file:///<project-directory>/Employee.proto \
    --region us-east-2

You can also use AWS CloudFormation to create schemas in AWS Glue Schema Registry.

Using a Protocol buffers schema with Amazon MSK and Kinesis Data Streams

Like Apache Avro’s SpecificRecord and GenericRecord, protobuf also supports working with POJOs to ensure type safety and DynamicMessage to create generic data producer and consumer applications. The following examples showcase the use of a protobuf schema registered in AWS Glue Schema Registry with Kafka and Kinesis Data Streams producer and consumer applications.

Use a protobuf schema with Amazon MSK

Create an Amazon MSK or Apache Kafka cluster with a topic called protobuf-demo-topic. If creating an Amazon MSK cluster, you can use the console. For instructions, refer to Getting Started Using Amazon MSK.

Use protobuf schema-generated POJOs

To use protobuf schema-generated POJOs, complete the following steps:

  1. Install the protobuf compiler (protoc) on your local machine from GitHub and add it in the PATH variable.
  2. Add the following plugin configuration to your application’s pom.xml file. We use the xolstice protobuf Maven plugin for this post to generate code from the protobuf schema.
    <plugin>
       <!-- https://www.xolstice.org/protobuf-maven-plugin/usage.html -->
       <groupId>org.xolstice.maven.plugins</groupId>
       <artifactId>protobuf-maven-plugin</artifactId>
       <version>0.6.1</version>
       <configuration>
           <protoSourceRoot>${basedir}/src/main/resources/proto</protoSourceRoot>
           <outputDirectory>${basedir}/src/main/java</outputDirectory>
           <clearOutputDirectory>false</clearOutputDirectory>
       </configuration>
       <executions>
           <execution>
               <goals>
                   <goal>compile</goal>
               </goals>
           </execution>
       </executions>
    </plugin>

  3. Add the following dependencies to your application’s pom.xml file:
    <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
    <dependency>
       <groupId>com.google.protobuf</groupId>
       <artifactId>protobuf-java</artifactId>
       <version>3.19.4</version>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/software.amazon.glue/schema-registry-serde -->
    <dependency>
       <groupId>software.amazon.glue</groupId>
       <artifactId>schema-registry-serde</artifactId>
       <version>1.1.9</version>
    </dependency>	

  4. Create a schema registry employee-schema-registry in AWS Glue Schema Registry and register the Employee.proto protobuf schema with it. Name your schema Employee.proto (or Employee).
  5. Run the following command to generate the code from Employee.proto. Make sure you have the schema file in the ${basedir}/src/main/resources/proto directory or change it as per your application directory structure in the application’s pom.xml <protoSourceRoot> tag value:
    mvn clean compile

Next, we configure the Kafka producer publishing protobuf messages to the Kafka topic on Amazon MSK.

  1. Configure the Kafka producer properties:
private Properties getProducerConfig() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName());
    props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.PROTOBUF.name());
    props.put(AWSSchemaRegistryConstants.AWS_REGION,"us-east-2");
    props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "employee-schema-registry");
    props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "Employee.proto");
    props.put(AWSSchemaRegistryConstants.PROTOBUF_MESSAGE_TYPE, ProtobufMessageType.POJO.getName());
    return props;
}

The VALUE_SERIALIZER_CLASS_CONFIG configuration specifies the AWS Glue Schema Registry serializer, which serializes the protobuf message.

  1. Use the schema-generated code (POJOs) to create a protobuf message:
    public EmployeeOuterClass.Employee createEmployeeRecord(int employeeId){
        EmployeeOuterClass.Employee employee =
                EmployeeOuterClass.Employee.newBuilder()
                        .setId(employeeId)
                        .setName("Dummy")
                        .setAddress("Melbourne, Australia")
                        .setEmployeeAge(Int32Value.newBuilder().setValue(32).build())
                        .setStartDate(Timestamp.newBuilder().setSeconds(235234532434L).build())
                        .setTotalTimeSpanInCompany(Duration.newBuilder().setSeconds(3453245345L).build())
                        .setIsCertified(BoolValue.newBuilder().setValue(true).build())
                        .setRole(EmployeeOuterClass.Role.ARCHITECT)
                        .setProject(EmployeeOuterClass.Project.newBuilder()
                                .setName("Protobuf Schema Demo")
                                .setState("GA").build())
                        .setTotalAwardValue(Money.newBuilder()
                                            .setCurrencyCode("USD")
                                            .setUnits(5)
                                            .setNanos(50000).build())
                        .setTeam(EmployeeOuterClass.Team.newBuilder()
                                .setName("Solutions Architects")
                                .setLocation("Australia").build()).build();
        return employee;
    }

  2. Publish the protobuf messages to the protobuf-demo-topic topic on Amazon MSK:
    public void startProducer() throws InterruptedException {
        String topic = "protobuf-demo-topic";
        KafkaProducer<String, EmployeeOuterClass.Employee> producer = new KafkaProducer<String, EmployeeOuterClass.Employee>(getProducerConfig());
        logger.info("Starting to send records...");
        int employeeId = 0;
        while(employeeId < 100)
        {
            EmployeeOuterClass.Employee person = createEmployeeRecord(employeeId);
            String key = "key-" + employeeId;
            ProducerRecord<String,  EmployeeOuterClass.Employee> record = new ProducerRecord<String,  EmployeeOuterClass.Employee>(topic, key, person);
            producer.send(record, new ProducerCallback());
            employeeId++;
        }
    }
    private class ProducerCallback implements Callback {
        @Override
        public void onCompletion(RecordMetadata recordMetaData, Exception e){
            if (e == null) {
                logger.info("Received new metadata. \n" +
                        "Topic:" + recordMetaData.topic() + "\n" +
                        "Partition: " + recordMetaData.partition() + "\n" +
                        "Offset: " + recordMetaData.offset() + "\n" +
                        "Timestamp: " + recordMetaData.timestamp());
            }
            else {
                logger.info("There's been an error from the Producer side");
                e.printStackTrace();
            }
        }
    }

  3. Start the Kafka producer:
    public static void main(String args[]) throws InterruptedException {
        ProducerProtobuf producer = new ProducerProtobuf();
        producer.startProducer();
    }

  4. In the Kafka consumer application’s pom.xml, add the same plugin and dependencies as the Kafka producer’s pom.xml.

Next, we configure the Kafka consumer consuming protobuf messages from the Kafka topic on Amazon MSK.

  1. Configure the Kafka consumer properties:
    private Properties getConsumerConfig() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "protobuf-consumer");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
        props.put(AWSSchemaRegistryConstants.AWS_REGION,"us-east-2");
        props.put(AWSSchemaRegistryConstants.PROTOBUF_MESSAGE_TYPE, ProtobufMessageType.POJO.getName());
        return props;
    }

The VALUE_DESERIALIZER_CLASS_CONFIG config specifies the AWS Glue Schema Registry deserializer that deserializes the protobuf messages.

  1. Consume the protobuf message (as a POJO) from the protobuf-demo-topic topic on Amazon MSK:
    public void startConsumer() {
        logger.info("starting consumer...");
        String topic = "protobuf-demo-topic";
        KafkaConsumer<String, EmployeeOuterClass.Employee> consumer = new KafkaConsumer<String, EmployeeOuterClass.Employee>(getConsumerConfig());
        consumer.subscribe(Collections.singletonList(topic));
        while (true) {
            final ConsumerRecords<String, EmployeeOuterClass.Employee> records = consumer.poll(Duration.ofMillis(1000));
            for (final ConsumerRecord<String, EmployeeOuterClass.Employee> record : records) {
                final EmployeeOuterClass.Employee employee = record.value();
                logger.info("Employee Id: " + employee.getId() + " | Name: " + employee.getName() + " | Address: " + employee.getAddress() +
                        " | Age: " + employee.getEmployeeAge().getValue() + " | Startdate: " + employee.getStartDate().getSeconds() +
                        " | TotalTimeSpanInCompany: " + employee.getTotalTimeSpanInCompany() +
                        " | IsCertified: " + employee.getIsCertified().getValue() + " | Team: " + employee.getTeam().getName() +
                        " | Role: " + employee.getRole().name() + " | Project State: " + employee.getProject().getState() +
                        " | Project Name: " + employee.getProject().getName() + "| Award currency code: " + employee.getTotalAwardValue().getCurrencyCode() +
                        " | Award units : " + employee.getTotalAwardValue().getUnits() + " | Award nanos " + employee.getTotalAwardValue().getNanos());
            }
        }
    }

  2. Start the Kafka consumer:
    public static void main(String args[]){
        ConsumerProtobuf consumer = new ConsumerProtobuf();
        consumer.startConsumer();
    }

Use protobuf’s DynamicMessage

You can use DynamicMessage to create generic producer and consumer applications without generating the code from the protobuf schema. To use DynamicMessage, you first need to create a protobuf schema file descriptor.

  1. Generate a file descriptor from the protobuf schema using the following command:
    protoc --include_imports --proto_path=proto --descriptor_set_out=proto/Employeeproto.desc proto/Employee.proto

The option --descritor_set_out has the descriptor file name that this command generates. The protobuf schema Employee.proto is in the proto directory.

  1. Make sure you have created a schema registry and registered the preceding protobuf schema with it.

Now we configure the Kafka producer publishing DynamicMessage to the Kafka topic on Amazon MSK.

  1. Create the Kafka producer configuration. The PROTOBUF_MESSAGE_TYPE configuration is DYNAMIC_MESSAGE instead of POJO.
    private Properties getProducerConfig() {
       Properties props = new Properties();
       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
       props.put(ProducerConfig.ACKS_CONFIG, "-1");
       props.put(ProducerConfig.CLIENT_ID_CONFIG,"protobuf-dynamicmessage-record-producer");
       props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,GlueSchemaRegistryKafkaSerializer.class.getName());
       props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.PROTOBUF.name());
       props.put(AWSSchemaRegistryConstants.AWS_REGION,"us-east-2");
       props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "employee-schema-registry");
       props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "Employee.proto");
       props.put(AWSSchemaRegistryConstants.PROTOBUF_MESSAGE_TYPE, ProtobufMessageType.DYNAMIC_MESSAGE.getName());
       return props;
        }

  2. Create protobuf dynamic messages and publish them to the Kafka topic on Amazon MSK:
    public void startProducer() throws Exception {
        Descriptor desc = getDescriptor();
        String topic = "protobuf-demo-topic";
        KafkaProducer<String, DynamicMessage> producer = new KafkaProducer<String, DynamicMessage>(getProducerConfig());
        logger.info("Starting to send records...");
        int i = 0;
        while (i < 100) {
            DynamicMessage dynMessage = DynamicMessage.newBuilder(desc)
                    .setField(desc.findFieldByName("id"), 1234)
                    .setField(desc.findFieldByName("name"), "Dummy Name")
                    .setField(desc.findFieldByName("address"), "Melbourne, Australia")
                    .setField(desc.findFieldByName("employee_age"), Int32Value.newBuilder().setValue(32).build())
                    .setField(desc.findFieldByName("start_date"), Timestamp.newBuilder().setSeconds(235234532434L).build())
                    .setField(desc.findFieldByName("total_time_span_in_company"), Duration.newBuilder().setSeconds(3453245345L).build())
                    .setField(desc.findFieldByName("is_certified"), BoolValue.newBuilder().setValue(true).build())
    		.setField(desc.findFieldByName("total_award_value"), Money.newBuilder().setCurrencyCode("USD")
    						.setUnits(1).setNanos(50000).build())
                    .setField(desc.findFieldByName("team"), createTeam(desc.findFieldByName("team").getMessageType()))
                    .setField(desc.findFieldByName("project"), createProject(desc.findFieldByName("project").getMessageType()))
                    .setField(desc.findFieldByName("role"), desc.findFieldByName("role").getEnumType().findValueByName("ARCHITECT"))
                    .build();
            String key = "key-" + i;
            ProducerRecord<String, DynamicMessage> record = new ProducerRecord<String, DynamicMessage>(topic, key, dynMessage);
            producer.send(record, new ProtobufProducer.ProducerCallback());
            Thread.sleep(1000);
            i++;
        }
    }
    private static DynamicMessage createTeam(Descriptor desc) {
        DynamicMessage dynMessage = DynamicMessage.newBuilder(desc)
                .setField(desc.findFieldByName("name"), "Solutions Architects")
                .setField(desc.findFieldByName("location"), "Australia")
                .build();
        return dynMessage;
    }
    
    private static DynamicMessage createProject(Descriptor desc) {
        DynamicMessage dynMessage = DynamicMessage.newBuilder(desc)
                .setField(desc.findFieldByName("name"), "Protobuf Schema Demo")
                .setField(desc.findFieldByName("state"), "GA")
                .build();
        return dynMessage;
    }
    
    private class ProducerCallback implements Callback {
        @Override
        public void onCompletion(RecordMetadata recordMetaData, Exception e) {
            if (e == null) {
                logger.info("Received new metadata. \n" +
                        "Topic:" + recordMetaData.topic() + "\n" +
                        "Partition: " + recordMetaData.partition() + "\n" +
                        "Offset: " + recordMetaData.offset() + "\n" +
                        "Timestamp: " + recordMetaData.timestamp());
            } else {
                logger.info("There's been an error from the Producer side");
                e.printStackTrace();
            }
        }
    }

  3. Create a descriptor using the Employeeproto.desc file that we generated from the Employee.proto schema file in the previous steps:
    private Descriptor getDescriptor() throws Exception {
        InputStream inStream = ProtobufProducer.class.getClassLoader().getResourceAsStream("proto/Employeeproto.desc");
        DescriptorProtos.FileDescriptorSet fileDescSet = DescriptorProtos.FileDescriptorSet.parseFrom(inStream);
        Map<String, DescriptorProtos.FileDescriptorProto> fileDescProtosMap = new HashMap<String, DescriptorProtos.FileDescriptorProto>();
        List<DescriptorProtos.FileDescriptorProto> fileDescProtos = fileDescSet.getFileList();
        for (DescriptorProtos.FileDescriptorProto fileDescProto : fileDescProtos) {
            fileDescProtosMap.put(fileDescProto.getName(), fileDescProto);
        }
        DescriptorProtos.FileDescriptorProto fileDescProto = fileDescProtosMap.get("Employee.proto");
        FileDescriptor[] dependencies = getProtoDependencies(fileDescProtosMap, fileDescProto);
        FileDescriptor fileDesc = FileDescriptor.buildFrom(fileDescProto, dependencies);
        Descriptor desc = fileDesc.findMessageTypeByName("Employee");
        return desc;
    }
    
    public static FileDescriptor[] getProtoDependencies(Map<String, FileDescriptorProto> fileDescProtos, 
    				  FileDescriptorProto fileDescProto) throws Exception {
    
        if (fileDescProto.getDependencyCount() == 0)
            return new FileDescriptor[0];
    
        ProtocolStringList dependencyList = fileDescProto.getDependencyList();
        String[] dependencyArray = dependencyList.toArray(new String[0]);
        int noOfDependencies = dependencyList.size();
    
        FileDescriptor[] dependencies = new FileDescriptor[noOfDependencies];
        for (int i = 0; i < noOfDependencies; i++) {
            FileDescriptorProto dependencyFileDescProto = fileDescProtos.get(dependencyArray[i]);
            FileDescriptor dependencyFileDesc = FileDescriptor.buildFrom(dependencyFileDescProto, 
    					     getProtoDependencies(fileDescProtos, dependencyFileDescProto));
            dependencies[i] = dependencyFileDesc;
        }
        return dependencies;
    }

  4. Start the Kafka producer:
    public static void main(String args[]) throws InterruptedException {
    	 ProducerProtobuf producer = new ProducerProtobuf();
             producer.startProducer();
    }

Now we configure the Kafka consumer consuming dynamic messages from the Kaka topic on Amazon MSK.

  1. Enter the following Kafka consumer configuration:
    private Properties getConsumerConfig() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "protobuf-record-consumer");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
        props.put(AWSSchemaRegistryConstants.AWS_REGION,"us-east-2");
        props.put(AWSSchemaRegistryConstants.PROTOBUF_MESSAGE_TYPE, ProtobufMessageType.DYNAMIC_MESSAGE.getName());
        return props;
    }

  2. Consume protobuf dynamic messages from the Kafka topic protobuf-demo-topic. Because we’re using DYNAMIC_MESSAGE, the retrieved objects are of type DynamicMessage.
    public void startConsumer() {
        logger.info("starting consumer...");
        String topic = "protobuf-demo-topic";
        KafkaConsumer<String, DynamicMessage> consumer = new KafkaConsumer<String, DynamicMessage>(getConsumerConfig());
        consumer.subscribe(Collections.singletonList(topic));
        while (true) {
            final ConsumerRecords<String, DynamicMessage> records = consumer.poll(Duration.ofMillis(1000));
            for (final ConsumerRecord<String, DynamicMessage> record : records) {
                for (Descriptors.FieldDescriptor field : record.value().getAllFields().keySet()) {
                    logger.info(field.getName() + ": " + record.value().getField(field));
                }
            }
        }
    }

  3. Start the Kafka consumer:
    public static void main(String args[]){
            ConsumerProtobuf consumer = new ConsumerProtobuf();
            consumer.startConsumer();
         }

Use a protobuf schema with Kinesis Data Streams

You can use the protobuf schema-generated POJOs with the Kinesis Producer Library (KPL) and Kinesis Client Library (KCL).

  1. Install the protobuf compiler (protoc) on your local machine from GitHub and add it in the PATH variable.
  2. Add the following plugin configuration to your application’s pom.xml file. We’re using the xolstice protobuf Maven plugin for this post to generate code from the protobuf schema.
    <plugin>
       <!-- https://www.xolstice.org/protobuf-maven-plugin/usage.html -->
       <groupId>org.xolstice.maven.plugins</groupId>
       <artifactId>protobuf-maven-plugin</artifactId>
       <version>0.6.1</version>
       <configuration>
           <protoSourceRoot>${basedir}/src/main/resources/proto</protoSourceRoot>
           <outputDirectory>${basedir}/src/main/java</outputDirectory>
           <clearOutputDirectory>false</clearOutputDirectory>
       </configuration>
       <executions>
           <execution>
               <goals>
                   <goal>compile</goal>
               </goals>
           </execution>
       </executions>
    </plugin>

  3. Because the KPL and KCL latest versions have the AWS Glue Schema Registry open-source library (schema-registry-serde) and protobuf runtime (protobuf-java) included, you only need to add the following dependencies to your application’s pom.xml:
    <!-- https://mvnrepository.com/artifact/com.amazonaws/amazon-kinesis-producer -->
    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>amazon-kinesis-producer</artifactId>
        <version>0.14.11</version>
    	</dependency>
    	<!-- https://mvnrepository.com/artifact/software.amazon.kinesis/amazon-kinesis-client -->
    <dependency>
        <groupId>software.amazon.kinesis</groupId>
        <artifactId>amazon-kinesis-client</artifactId>
        <version>2.4.0version>
    </dependency>

  4. Create a schema registry employee-schema-registry and register the Employee.proto protobuf schema with it. Name your schema Employee.proto (or Employee).
  5. Run the following command to generate the code from Employee.proto. Make sure you have the schema file in the ${basedir}/src/main/resources/proto directory or change it as per your application directory structure in the application’s pom.xml <protoSourceRoot> tag value.
    mvn clean compile

The following Kinesis producer code with the KPL uses the Schema Registry open-source library to publish protobuf messages to Kinesis Data Streams.

  1. Start the Kinesis Data Streams producer:
    private static final String PROTO_SCHEMA_FILE = "proto/Employee.proto";
    private static final String SCHEMA_NAME = "Employee.proto";
    private static String REGION_NAME = "us-east-2";
    private static String REGISTRY_NAME = "employee-schema-registry";
    private static String STREAM_NAME = "employee_data_stream";
    private static int NUM_OF_RECORDS = 100;
    private static String REGISTRY_ENDPOINT = "https://glue.us-east-2.amazonaws.com";
    
    public static void main(String[] args) throws Exception {
         ProtobufKPLProducer producer = new ProtobufKPLProducer();
         producer.startProducer();
     }
    }

  2. Configure the Kinesis producer:
public void startProducer() throws Exception {
    logger.info("Starting KPL client with Glue Schema Registry Integration...");
    GlueSchemaRegistryConfiguration schemaRegistryConfig = new GlueSchemaRegistryConfiguration(REGION_NAME);
    schemaRegistryConfig.setCompressionType(AWSSchemaRegistryConstants.COMPRESSION.ZLIB);
    schemaRegistryConfig.setSchemaAutoRegistrationEnabled(false);
    schemaRegistryConfig.setCompatibilitySetting(Compatibility.BACKWARD);
    schemaRegistryConfig.setEndPoint(REGISTRY_ENDPOINT);
    schemaRegistryConfig.setProtobufMessageType(ProtobufMessageType.POJO);
    schemaRegistryConfig.setRegistryName(REGISTRY_NAME);
	
    //Setting Glue Schema Registry configuration in Kinesis Producer Configuration along with other configs
    KinesisProducerConfiguration config = new KinesisProducerConfiguration()
                                        .setRecordMaxBufferedTime(3000)
                                        .setMaxConnections(1)
                                        .setRequestTimeout(60000)
                                        .setRegion(REGION_NAME)
                                        .setRecordTtl(60000)
                                        .setGlueSchemaRegistryConfiguration(schemaRegistryConfig);

    FutureCallback<UserRecordResult> myCallback = new FutureCallback<UserRecordResult>() {
        @Override public void onFailure(Throwable t) {
              t.printStackTrace();
        };
        @Override public void onSuccess(UserRecordResult result) {
            logger.info("record sent successfully. Sequence Number: " + result.getSequenceNumber() + " | Shard Id : " + result.getShardId());
        };
    };
    
	//Creating schema definition object from the Employee.proto schema file.
    Schema gsrSchema = getSchemaDefinition();
    final KinesisProducer producer = new KinesisProducer(config);
    int employeeCount = 1;
    while(true) {
        //Creating and serializing schema generated POJO object (protobuf message)

        EmployeeOuterClass.Employee employee = createEmployeeRecord(employeeCount);
        byte[] serializedBytes = employee.toByteArray();
        ByteBuffer data = ByteBuffer.wrap(serializedBytes);
        Instant timestamp = Instant.now();

        //Publishing protobuf message to the Kinesis Data Stream
        ListenableFuture<UserRecordResult> f =
                    producer.addUserRecord(STREAM_NAME,
                                        Long.toString(timestamp.toEpochMilli()),
                                        new BigInteger(128, new Random()).toString(10),
                                        data,
                                        gsrSchema);
        Futures.addCallback(f, myCallback, MoreExecutors.directExecutor());
        employeeCount++;
        if(employeeCount > NUM_OF_RECORDS)
            break;
    }
    List<Future<UserRecordResult>> putFutures = new LinkedList<>();
    for (Future<UserRecordResult> future : putFutures) {
        UserRecordResult userRecordResult = future.get();
        logger.info(userRecordResult.getShardId() + userRecordResult.getSequenceNumber());
    }
}
  1. Create a protobuf message using schema-generated code (POJOs):
    public EmployeeOuterClass.Employee createEmployeeRecord(int count){
        EmployeeOuterClass.Employee employee =
                EmployeeOuterClass.Employee.newBuilder()
                .setId(count)
                .setName("Dummy")
                .setAddress("Melbourne, Australia")
                .setEmployeeAge(Int32Value.newBuilder().setValue(32).build())
                .setStartDate(Timestamp.newBuilder().setSeconds(235234532434L).build())
                .setTotalTimeSpanInCompany(Duration.newBuilder().setSeconds(3453245345L).build())
                .setIsCertified(BoolValue.newBuilder().setValue(true).build())
                .setRole(EmployeeOuterClass.Role.ARCHITECT)
                .setProject(EmployeeOuterClass.Project.newBuilder()
                            .setName("Protobuf Schema Demo")
                            .setState("GA").build())
                .setTotalAwardValue(Money.newBuilder()
                            .setCurrencyCode("USD")
                            .setUnits(5)
                            .setNanos(50000).build())
                .setTeam(EmployeeOuterClass.Team.newBuilder()
                            .setName("Solutions Architects")
                            .setLocation("Australia").build()).build();
        return employee;
    }

  2. Create the schema definition from Employee.proto:
    private Schema getSchemaDefinition() throws IOException {
        InputStream inputStream = ProtobufKPLProducer.class.getClassLoader().getResourceAsStream(PROTO_SCHEMA_FILE);
        StringBuilder resultStringBuilder = new StringBuilder();
        try (BufferedReader br = new BufferedReader(new InputStreamReader(inputStream))) {
            String line;
            while ((line = br.readLine()) != null) {
                resultStringBuilder.append(line).append("\n");
            }
        }
        String schemaDefinition = resultStringBuilder.toString();
        logger.info("Schema Definition " + schemaDefinition);
        Schema gsrSchema =
                new Schema(schemaDefinition, DataFormat.PROTOBUF.toString(), SCHEMA_NAME);
        return gsrSchema;
    }

The following is the Kinesis consumer code with the KCL using the Schema Registry open-source library to consume protobuf messages from the Kinesis Data Streams.

  1. Initialize the application:
    public void run(){
        logger.info("Starting KCL client with Glue Schema Registry Integration...");
        Region region = Region.of(ObjectUtils.firstNonNull(REGION_NAME, "us-east-2"));
        KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));
        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
    
        EmployeeRecordProcessorFactory employeeRecordProcessorFactory = new EmployeeRecordProcessorFactory();
        ConfigsBuilder configsBuilder =
                new ConfigsBuilder(STREAM_NAME,
                        APPLICATION_NAME,
                        kinesisClient,
                        dynamoClient,
                        cloudWatchClient,
                        APPLICATION_NAME,
                        employeeRecordProcessorFactory);
    
        //Creating Glue Schema Registry configuration and Glue Schema Registry Deserializer object.
        GlueSchemaRegistryConfiguration gsrConfig = new GlueSchemaRegistryConfiguration(region.toString());
        gsrConfig.setEndPoint(REGISTRY_ENDPOINT);
        gsrConfig.setProtobufMessageType(ProtobufMessageType.POJO);
        GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer =
                new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), gsrConfig);
        /*
         Setting Glue Schema Registry deserializer in the Retrieval Config for
         Kinesis Client Library to use it while deserializing the protobuf messages.
         */
        RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(STREAM_NAME, kinesisClient));
        retrievalConfig.glueSchemaRegistryDeserializer(glueSchemaRegistryDeserializer);
    
        Scheduler scheduler = new Scheduler(
                		configsBuilder.checkpointConfig(),
                		configsBuilder.coordinatorConfig(),
               		configsBuilder.leaseManagementConfig(),
                		configsBuilder.lifecycleConfig(),
                		configsBuilder.metricsConfig(),
                		configsBuilder.processorConfig(),
                		retrievalConfig);
    
        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();
    
        logger.info("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
            Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
            logger.info("Waiting up to 20 seconds for shutdown to complete.");
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (Exception e) {
            logger.info("Interrupted while waiting for graceful shutdown. Continuing.");
        }
        logger.info("Completed, shutting down now.");
    }

  2. Consume protobuf messages from Kinesis Data Streams:
    public static class EmployeeRecordProcessorFactory implements ShardRecordProcessorFactory {
        @Override
        public ShardRecordProcessor shardRecordProcessor() {
            return new EmployeeRecordProcessor();
        }
    }
    public static class EmployeeRecordProcessor implements ShardRecordProcessor {
        private static final Logger logger = Logger.getLogger(EmployeeRecordProcessor.class.getSimpleName());
        public void initialize(InitializationInput initializationInput) {}
        public void processRecords(ProcessRecordsInput processRecordsInput) {
            try {
                logger.info("Processing " + processRecordsInput.records().size() + " record(s)");
                for (KinesisClientRecord r : processRecordsInput.records()) {
    			
                    //Deserializing protobuf message into schema generated POJO
                    EmployeeOuterClass.Employee employee = EmployeeOuterClass.Employee.parseFrom(r.data().array());
                    
                   logger.info("Processed record: " + employee);
                    logger.info("Employee Id: " + employee.getId() + " | Name: "  + employee.getName() + " | Address: " + employee.getAddress() +
                            " | Age: " + employee.getEmployeeAge().getValue() + " | Startdate: " + employee.getStartDate().getSeconds() +
                            " | TotalTimeSpanInCompany: " + employee.getTotalTimeSpanInCompany() +
                            " | IsCertified: " + employee.getIsCertified().getValue() + " | Team: " + employee.getTeam().getName() +
                            " | Role: " + employee.getRole().name() + " | Project State: " + employee.getProject().getState() +
                            " | Project Name: " + employee.getProject().getName() + " | Award currency code: " +    
                           employee.getTotalAwardValue().getCurrencyCode() + " | Award units : " + employee.getTotalAwardValue().getUnits() + 
    		      " | Award nanos " + employee.getTotalAwardValue().getNanos());
                }
            } catch (Exception e) {
                logger.info("Failed while processing records. Aborting" + e);
                Runtime.getRuntime().halt(1);
            }
        }
        public void leaseLost(LeaseLostInput leaseLostInput) {. . .}
        public void shardEnded(ShardEndedInput shardEndedInput) {. . .}
        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {. . .}
    }

  3. Start the Kinesis Data Streams consumer:
    private static final Logger logger = Logger.getLogger(ProtobufKCLConsumer.class.getSimpleName());
    private static String REGION_NAME = "us-east-2";
    private static String STREAM_NAME = "employee_data_stream";
    private static final String APPLICATION_NAME =  "protobuf-demo-kinesis-kpl-consumer";
    private static String REGISTRY_ENDPOINT = "https://glue.us-east-2.amazonaws.com";
    
    public static void main(String[] args) throws ParseException {
        new ProtobufKCLConsumer().run();
    }
    

Enhance your protobuf schema

We covered examples of data producer and consumer applications integrating with Amazon MSK, Apache Kafka, and Kinesis Data Streams, and using a Protocol buffers schema registered with AWS Glue Schema Registry. You can further enhance these examples with schema evolution using the following rules, which are supported by AWS Glue Schema Registry. For example, the following protobuf schema shown is a backward-compatible updated version of Employee.proto. We have added another gRPC service definition CreateEmployee under EmployeeSearch and added an Optional field in the Employee message type. If you upgrade the consumer application with this version of the protobuf schema, the consumer application can still consume old and new protobuf messages.

Employee.proto (version-2)

syntax = "proto2";
package gsr.proto.post;

import "google/protobuf/wrappers.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
import "google/type/money.proto";

service EmployeeSearch {
    rpc FindEmployee(EmployeeSearchParams) returns (Employee);
    rpc CreateEmployee(EmployeeSearchParams) returns (google.protobuf.Empty);
}
message EmployeeSearchParams {
    required int32 id = 1;
}
message Employee {
    required int32 id = 1;
    required string name = 2;
    required string address = 3;
    required google.protobuf.Int32Value employee_age = 4;
    required google.protobuf.Timestamp start_date = 5;
    required google.protobuf.Duration total_time_span_in_company = 6;
    required google.protobuf.BoolValue is_certified = 7;
    required Team team = 8;
    required Project project = 9;
    required Role role = 10;
    required google.type.Money total_award_value = 11;
    optional string title = 12;
}
message Team {
    required string name = 1;
    required string location = 2;
}
message Project {
    required string name = 1;
    required string state = 2;
}
enum Role {
    MANAGER = 0;
    DEVELOPER = 1;
    ARCHITECT = 2;
}

Conclusion

In this post, we introduced Protocol buffers schema support in AWS Glue Schema Registry. AWS Glue Schema Registry now supports Apache Avro, JSON, and Protocol buffers schemas with different compatible modes. The examples in this post demonstrated how to use Protocol buffers schemas registered with AWS Glue Schema Registry in stream processing applications integrated with Apache Kafka, Amazon MSK, and Kinesis Data Streams. We used the schema-generated POJOs for type safety and protobuf’s DynamicMessage to create generic producer and consumer applications. The examples in this post contain the basic components of the stream processing pattern; you can adapt these examples to your use case needs.

To learn more, refer to the following resources:


About the Author

Vikas Bajaj is a Principal Solutions Architect at AWS. Vikas works with digital native customers and advises them on technology architecture and solutions to meet strategic business objectives.

Run AWS Glue crawlers using Amazon S3 event notifications

Post Syndicated from Pradeep Patel original https://aws.amazon.com/blogs/big-data/run-aws-glue-crawlers-using-amazon-s3-event-notifications/

The AWS Well-Architected Data Analytics Lens provides a set of guiding principles for analytics applications on AWS. One of the best practices it talks about is build a central Data Catalog to store, share, and track metadata changes. AWS Glue provides a Data Catalog to fulfill this requirement. AWS Glue also provides crawlers that automatically discover datasets stored in multiple source systems, including Amazon Redshift, Amazon DynamoDB, Amazon Simple Storage Service (Amazon S3), MongoDB, Amazon DocumentDB (with MongoDB compatibility), and various other data stores using JDBC. A crawler extracts schemas of tables from these sources and stores the information in the AWS Glue Data Catalog. You can run a crawler on-demand or on a schedule.

When you schedule a crawler to discover data in Amazon S3, you can choose to crawl all folders or crawl new folders only. In the first mode, every time the crawler runs, it scans data in every folder under the root path it was configured to crawl. This can be slow for large tables because on every run, the crawler must list all objects and then compare metadata to identify new objects. In the second mode, commonly referred as incremental crawls, every time the crawler runs, it processes only S3 folders that were added since the last crawl. Incremental crawls can reduce runtime and cost when used with datasets that append new objects with consistent schema on a regular basis.

AWS Glue also supports incremental crawls using Amazon S3 Event Notifications. You can configure Amazon S3 Event Notifications to be sent to an Amazon Simple Queue Service (Amazon SQS) queue, which the crawler uses to identify the newly added or deleted objects. With each run of the crawler, the SQS queue is inspected for new events; if none are found, the crawler stops. If events are found in the queue, the crawler inspects their respective folders and processes the new objects. This new mode reduces cost and crawler runtime to update large and frequently changing tables.

In this post, we present two design patterns to create a crawler pipeline using this new feature. A crawler pipeline refers to components required to implement incremental crawling using Amazon S3 Event Notifications.

Crawler pipeline design patterns

We define design patterns for the crawler pipeline based on a simple question: do I have any applications other than the crawler that consume S3 event notifications?

If the answer is no, you can send event notifications directly to an SQS queue that has no other consumers. The crawler consumes events from the queue.

If you have multiple applications that want to consume the event notifications, send the notifications directly to an Amazon Simple Notification Service (Amazon SNS) topic, and then broadcast them to an SQS queue. If you have an application or microservice that consumes notifications, you can subscribe it to the SNS topic. This way, you can populate metadata in the Data Catalog while still supporting use cases around the files ingested into the S3 bucket.

The following are some considerations for these options:

  • S3 event notifications can only be sent to standard Amazon SNS; Amazon SNS FIFO is not supported. Refer to Amazon S3 Event Notifications for more details.
  • Similarly, S3 event notifications sent to Amazon SNS can only be forwarded to standard SQS queues and not Amazon SQS FIFO queues. For more information, see FIFO topics example use case.
  • The AWS Identity and Access Management (IAM) role used by the crawler needs to include an IAM policy for Amazon SQS. We provide an example policy later in this post.

Let’s take a deeper look into each design pattern to understand the architecture and its pros and cons.

Option 1: Publish events to an SQS queue

The following diagram represents a design pattern where S3 event notifications are published directly to a standard SQS queue. First, you need to configure an SQS queue as a target for S3 event notification on the S3 bucket where the table you want to crawl is stored. Next, attach an IAM policy to the queue including permissions for Amazon S3 to send messages to Amazon SQS, and permissions for the crawler IAM role to read and delete messages from Amazon SQS. This approach is useful when the SQS queue is used only for incremental crawling and no other application or service is depending on it. The crawler removes events from the queue after they are processed, so they’re not available for other applications. The following diagram illustrates this architecture.

Figure 1: Crawler pipeline using Amazon SQS queue

Figure 1: Crawler pipeline using Amazon SQS queue

Option 2: Publish events to an SNS topic and forward to an SQS queue

The following diagram represents a design pattern where S3 event notifications are sent to an SNS topic, which are then forwarded to an SQS queue for the crawler to consume. First, you need to configure an SNS topic as a target for S3 event notification on the S3 bucket where the table you want to crawl is stored. Next, attach an IAM policy to the topic including permissions for Amazon S3 to send messages to Amazon SNS. Then, create an SQS queue and subscribe it to the SNS topic to receives S3 events. Finally, attach an IAM policy to the queue that includes permissions for Amazon SNS to publish messages to Amazon SQS and permissions for the crawler IAM role to read and delete messages from Amazon SQS. This approach is useful when other applications depend on the S3 event notifications. For more information about fanout capabilities in Amazon SNS, see Fanout S3 Event Notifications to Multiple Endpoints.

Figure 2 : Crawler pipeline using Amazon SNS topic and Amazon SQS queue

Figure 2: Crawler pipeline using Amazon SNS topic and Amazon SQS queue

Solution overview

It’s common to have multiple applications consuming S3 event notifications, so in this post we demonstrate how to implement the second design pattern using Amazon SNS and Amazon SQS.

We create the following AWS resources:

  • S3 bucket – The location where table data is stored. Event notifications are enabled.
  • SNS topic and access policy – Amazon S3 sends event notifications to the SNS topic. The topic must have a policy that gives permissions to Amazon S3.
  • SQS queue and access policy – The SNS topic publishes messages to SQS queue. The queue must have a policy that gives the SNS topic permission to write messages.
  • Three IAM policies – The policies are as follows:
    • SQS queue policy – Lets the crawler consume messages from the SQS queue.
    • S3 policy – Lets the crawler read files from the S3 bucket.
    • AWS Glue crawler policy – Lets the crawler make changes to the AWS Glue Data Catalog.
  • IAM role – The IAM role used by the crawler. This role uses the three preceding policies.
  • AWS Glue crawler – Crawls the table’s objects and updates the AWS Glue Data Catalog.
  • AWS Glue database – The database in the Data Catalog.
  • AWS Glue table – The crawler creates a table in the Data Catalog.

In the following sections, we walk you through the steps to create your resources and test the solution.

Create an S3 bucket and set up a folder

To create your Amazon S3 resources, complete the following steps:

  1. On the Amazon S3 console, choose Buckets in the navigation pane.
  2. Choose Create bucket.
  3. For Bucket name, enter s3-event-notifications-bucket-<random-number>.
  4. Select Block all public access.
  5. Choose Create bucket.
  6. In the buckets list, select the bucket and choose Create a folder.
  7. For Folder name, enter nyc_data.
  8. Choose Create folder.

Create an IAM policy with permissions on Amazon S3

To create your IAM policy with Amazon S3 permissions, complete the following steps:

  1. On the IAM console, choose Policies.
  2. Choose Create policy.
  3. On the JSON tab, enter the policy code from s3_event_notifications_iam_policy_s3.json.
  4. Update the S3 bucket name.
  5. Choose Next: Tags.
  6. Choose Next: Review.
  7. For Name, enter s3_event_notifications_iam_policy_s3.
  8. Choose Create policy.

Create an IAM policy with permissions on Amazon SQS

To create your IAM policy with Amazon SQS permissions, complete the following steps:

  1. On the IAM console, choose Policies.
  2. Choose Create policy.
  3. On the JSON tab, enter the policy code from s3_event_notifications_iam_policy_sqs.json.
  4. Update the AWS account number.
  5. Choose Next: Tags.
  6. Choose Next: Review.
  7. For Name, enter s3_event_notifications_iam_policy_sqs.
  8. Choose Create policy.

Create an IAM role for the crawler

To create your IAM policy with for the AWS Glue crawler, complete the following steps:

  1. On the IAM console, choose Roles.
  2. Choose Create role.
  3. For Choose a use case, choose Glue.
  4. Choose Next: Permissions.
  5. Attach the two policies you just created: s3_event_notifications_iam_policy_s3 and s3_event_notifications_iam_policy_sqs.
  6. Attach the AWS managed policy AWSGlueServiceRole.
  7. Choose Next: Tags.
  8. Choose Next: Review.
  9. For Role name, enter s3_event_notifications_crawler_iam_role.
  10. Review to confirm that all three policies are attached.
  11. Choose Create role.

Create an SNS topic

To create your SNS topic, complete the following steps:

  1. On the Amazon SNS console, choose Topics.
  2. Choose Create topic.
  3. For Type, choose Standard (FIFO isn’t supported).
  4. For Name, enter s3_event_notifications_topic.
  5. Choose Create topic.
  6. On the Access policy tab, choose Advanced.
  7. Enter the policy contents from s3_event_notifications_sns_topic_access_policy.json.
  8. Update the account number and S3 bucket.
  9. Choose Create topic.

Create an SQS queue

To create your SQS queue, complete the following steps.

  1. On the Amazon SQS console, choose Create a queue.
  2. For Type, choose Standard.
  3. For Name, enter s3_event_notifications_queue.
  4. Keep the remaining settings at their default.
  5. On the Access policy tab, choose Advanced.
  6. Enter the policy contents from s3_event_notifications_sqs_queue_policy.json.
  7. Update the account number.
  8. Choose Create queue.
  9. On the SNS subscription tab, choose Subscribe to SNS topic.
  10. Choose the topic you created, s3_event_notifications_topic.
  11. Choose Save.

Create event notifications on the S3 bucket

To create event notifications for your S3 bucket, complete the following steps:

  1. Navigate to the Properties tab of the S3 bucket you created.
  2. In the Event notifications section, choose Create event notification.
  3. For Event name, enter crawler_event.
  4. For Prefix, enter nyc_data/.
  5. For Event Types, choose All Object Create Event.
  6. For Destination, choose SNS topic and the topic s3_event_notifications_topic.

Create a crawler

To create your AWS Glue crawler, complete the following steps:

  1. On the AWS Glue console, choose Crawlers.
  2. Choose Add crawler.
  3. For Crawler name, enter s3_event_notifications_crawler.
  4. Choose Next.
  5. For Crawler source type, choose data stores.
  6. For Repeat crawls of S3 data stores, choose Crawl changes identified by Amazon S3 events.
  7. Choose Next.
  8. For Include path, enter an S3 path.
  9. For Include SQS ARN, add your Amazon SQS ARN.

Including a dead-letter queue is optional; we skip it for this post. Dead-letter queues help you isolate problematic event notifications a crawler can’t process successfully. To understand general benefits of dead-letter queues and how it gets messages from the main SQS queue, refer to Amazon SQS dead-letter queues.

  1. Choose Next.
  2. When asked to add another data store, choose No.
  3. For IAM role, select “Choose an existing role” and enter the IAM role created above.
  4. Choose Next.
  5. For Frequency, choose Run on demand.
  6. Choose Next.
  7. Under Database, choose Add database.
  8. For Database name, enter s3_event_notifications_database.
  9. Choose Create.
  10. Choose Next.
  11. Choose Finish to create your crawler.

Test the solution

The following steps show how adding new objects triggers an event notification that propagates to Amazon SQS, which the crawler uses on subsequent runs. For sample data, we use NYC taxi records from January and February, 2020.

  1. Download the following datasets:
    1. green_tripdata_2020-01.csv
    2. green_tripdata_2020-02.csv
  2. On the Amazon S3 console, navigate to the bucket you created earlier.
  3. Create a folder called nyc_data.
  4. Create a subfolder called dt=202001.

This sends a notification to the SNS topic, and a message is sent to the SQS queue.

  1. In the folder dt=202001, upload the file green_tripdata_2020-01.csv.
  2. To validate that this step generated an S3 event notification, navigate to the queue on the Amazon SQS console.
  3. Choose Send and receive messages.
  4. Under Receive messages, Messages available should show as 1.
  5. Return to the Crawlers page on the AWS Glue console and select the crawler s3_event_notifications_crawler.
  6. Choose Run crawler. After a few seconds, the crawler status changes to Starting and then to Running. The crawler should complete in 1–2 minutes and display a success message.
  7. Confirm that a new table, nyc_data, is in your database.
  8. Choose the table to verify its schema.

The dt column is marked as a partition key.

  1. Choose View partitions to see partition details.
  2. To validate that the crawler consumed this event, navigate to the queue on the Amazon SQS console and choose Send and receive messages.
  3. Under Receive messages, Messages available should show as 0.

Now upload another file and see how the S3 event triggers a crawler to run.

  1. On the Amazon S3 console, in your nyc_data folder, create the subfolder dt=202002.
  2. Upload the file green_tripdata_2020-02.csv to this subfolder.
  3. Run the crawler again and wait for the success message.
  4. Return to the AWS Glue table and choose View partitions to see a new partition added.

Additional notes

Keep in mind the following when using this solution:

Clean up

When you’re finished evaluating this feature, you should delete the SNS topic and SQS queue, AWS Glue crawler, and S3 bucket and objects to avoid any further charges.

Conclusion

In this post, we discussed a new way for AWS Glue crawlers to use S3 Event Notifications to reduce the time and cost needed to incrementally process table data updates in the AWS Glue Data Catalog. We discussed two design patterns to implement this approach. The first pattern publishes events directly to an SQS queue, which is useful when only the crawler needs these events. The second pattern publishes events to an SNS topic, which are forwarded to an SQS queue for the crawler to process. This is useful when other applications also depend on these events. We also discussed how to implement the second design pattern to incrementally crawl your data. Incremental crawlers using S3 event notifications reduces the runtime and cost of your crawlers for large and frequently changing tables.

Let us know your feedback in the comments section. Happy crawling!


About the Authors

Pradeep Patel is a Sr. Software Engineer at AWS Glue. He is passionate about helping customers solve their problems by using the power of the AWS Cloud to deliver highly scalable and robust solutions. In his spare time, he loves to hike and play with web applications.

Vivek Shrivastava is a Principal Data Architect, Data Lake in AWS Professional Services. He is a Bigdata enthusiast and holds 13 AWS Certifications. He is passionate about helping customers build scalable and high-performance data analytics solutions in the cloud. In his spare time, he loves reading and finds areas for home automation.

Ravi Itha is a Sr. Data Architect at AWS. He works with customers to design and implement data lakes, analytics, and microservices on AWS. He is an open-source committer and has published more than a dozen solutions using AWS CDK, AWS Glue, AWS Lambda, AWS Step Functions, Amazon ECS, Amazon MQ, Amazon SQS, Amazon Kinesis Data Streams, and Amazon Kinesis Data Analytics for Apache Flink. His solutions can be found at his GitHub handle. Outside of work, he is passionate about books, cooking, movies, and yoga.