Tag Archives: AWS Glue

A serverless operational data lake for retail with AWS Glue, Amazon Kinesis Data Streams, Amazon DynamoDB, and Amazon QuickSight

Post Syndicated from Gandhi Raketla original https://aws.amazon.com/blogs/big-data/a-serverless-operational-data-lake-for-retail-with-aws-glue-amazon-kinesis-data-streams-amazon-dynamodb-and-amazon-quicksight/

Do you want to reduce stockouts at stores? Do you want to improve order delivery timelines? Do you want to provide your customers with accurate product availability, down to the millisecond? A retail operational data lake can help you transform the customer experience by providing deeper insights into a variety of operational aspects of your supply chain.

In this post, we demonstrate how to create a serverless operational data lake using AWS services, including AWS Glue, Amazon Kinesis Data Streams, Amazon DynamoDB, Amazon Athena, and Amazon QuickSight.

Retail operations is a critical functional area that gives retailers a competitive edge. An efficient retail operation can optimize the supply chain for a better customer experience and cost reduction. An optimized retail operation can reduce frequent stockouts and delayed shipments, and provide accurate inventory and order details. Today, a retailer’s channels aren’t just store and web—they include mobile apps, chatbots, connected devices, and social media channels. The data is both structured and unstructured. This coupled with multiple fulfillment options like buy online and pick up at store, ship from store, or ship from distribution centers, which increases the complexity of retail operations.

Most retailers use a centralized order management system (OMS) for managing orders, inventory, shipments, payments, and other operational aspects. These legacy OMSs are unable to scale in response to the rapid changes in retail business models. The enterprise applications that are key for efficient and smooth retail operations rely on a central OMS. Applications for ecommerce, warehouse management, call centers, and mobile all require an OMS to get order status, inventory positions of different items, shipment status, and more. Another challenge with legacy OMSs is they’re not designed to handle unstructured data like weather data and IoT data that could impact inventory and order fulfillment. A legacy OMS that can’t scale prohibits you from implementing new business models that could transform your customer experience.

A data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale. An operational data lake addresses this challenge by providing easy access to structured and unstructured operational data in real time from various enterprise systems. You can store your data as is, without having to first structure the data, and run different types of analytics—from dashboards and visualizations to big data processing, real-time analytics, and machine learning (ML)—to guide better decisions. This can ease the burden on OMSs that can instead focus on order orchestration and management.

Solution overview

In this post, we create an end-to-end pipeline to ingest, store, process, analyze, and visualize operational data like orders, inventory, and shipment updates. We use the following AWS services as key components:

  • Kinesis Data Streams to ingest all operational data in real time from various systems
  • DynamoDB, Amazon Aurora, and Amazon Simple Storage Service (Amazon S3) to store the data
  • AWS Glue DataBrew to clean and transform the data
  • AWS Glue crawlers to catalog the data
  • Athena to query the processed data
  • A QuickSight dashboard that provides insights into various operational metrics

The following diagram illustrates the solution architecture.

The data pipeline consists of stages to ingest, store, process, analyze, and finally visualize the data, which we discuss in more detail in the following sections.

Data ingestion

Orders and inventory data is ingested in real time from multiple sources like web applications, mobile apps, and connected devices into Kinesis Data Streams. Kinesis Data Streams is a massively scalable and durable real-time data streaming service. Kinesis Data Streams can continuously capture gigabytes of data per second from hundreds of thousands of sources, such as web applications, database events, inventory transactions, and payment transactions. Frontend systems like ecommerce applications and mobile apps ingest the order data as soon as items are added to a cart or an order is created. The OMS ingests orders when the order status changes. OMSs, stores, and third-party suppliers ingest inventory updates into the data stream.

To simulate orders, an AWS Lambda function is triggered by a scheduled Amazon CloudWatch event every minute to ingest orders to a data stream. This function simulates the typical order management system lifecycle (order created, scheduled, released, shipped, and delivered). Similarly, a second Lambda function is triggered by a CloudWatch event to generate inventory updates. This function simulates different inventory updates such as purchase orders created from systems like the OMS or third-party suppliers. In a production environment, this data would come from frontend applications and a centralized order management system.

Data storage

There are two types of data: hot and cold data. Hot data is consumed by frontend applications like web applications, mobile apps, and connected devices. The following are some example use cases for hot data:

  • When a customer is browsing products, the real-time availability of the item must be displayed
  • Customers interacting with Alexa to know the status of the order
  • A call center agent interacting with a customer needs to know the status of the customer order or its shipment details

The systems, APIs, and devices that consume this data need the data within seconds or milliseconds of the transactions.

Cold data is used for long-term analytics like orders over a period of time, orders by channel, top 10 items by number of orders, or planned vs. available inventory by item, warehouse, or store.

For this solution, we store orders hot data in DynamoDB. DynamoDB is a fully managed NoSQL database that delivers single-digit millisecond performance at any scale. A Lambda function processes records in the Kinesis data stream and stores it in a DynamoDB table.

Inventory hot data is stored in an Amazon Aurora MySQL-Compatible Edition database. Inventory is transactional data that requires high consistency so that customers aren’t over-promised or under-promised when they place orders. Aurora MySQL is fully managed database that is up to five times faster than standard MySQL databases and three times faster than standard PostgreSQL databases. It provides the security, availability, and reliability of commercial databases at a tenth of the cost.

Amazon S3 is object storage built to store and retrieve any amount of data from anywhere. It’s a simple storage service that offers industry-leading durability, availability, performance, security, and virtually unlimited scalability at very low cost. Order and inventory cold data is stored in Amazon S3.

Amazon Kinesis Data Firehose reads the data from the Kinesis data stream and stores it in Amazon S3. Kinesis Data Firehose is the easiest way to load streaming data into data stores and analytics tools. It can capture, transform, and load streaming data into Amazon S3, Amazon Redshift, Amazon OpenSearch Service, and Splunk, enabling near-real-time analytics.

Data processing

The data processing stage involves cleaning, preparing, and transforming the data to help downstream analytics applications easily query the data. Each frontend system might have a different data format. In the data processing stage, data is cleaned and converted into a common canonical form.

For this solution, we use DataBrew to clean and convert orders into a common canonical form. DataBrew is a visual data preparation tool that makes it easy for data analysts and data scientists to prepare data with an interactive, point-and-click visual interface without writing code. DataBrew provides over 250 built-in transformations to combine, pivot, and transpose the data without writing code. The cleaning and transformation steps in DataBrew are called recipes. A scheduled DataBrew job applies the recipes to the data in an S3 bucket and stores the output in a different bucket.

AWS Glue crawlers can access data stores, extract metadata, and create table definitions in the AWS Glue Data Catalog. You can schedule a crawler to crawl the transformed data and create or update the Data Catalog. The AWS Glue Data Catalog is your persistent metadata store. It’s a managed service that lets you store, annotate, and share metadata in the AWS Cloud in the same way you would in an Apache Hive metastore. We use crawlers to populate the Data Catalog with tables.

Data analysis

We can query orders and inventory data from S3 buckets using Athena. Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. Views are created in Athena that can be consumed by business intelligence (BI) services like QuickSight.

Data visualization

We generate dashboards using QuickSight. QuickSight is a scalable, serverless, embeddable BI service powered by ML and built for the cloud. QuickSight lets you easily create and publish interactive BI dashboards that include ML-powered insights.

QuickSight also has features to forecast orders, detect anomalies in the order, and provide ML-powered insights. We can create analyses such as orders over a period of time, orders split by channel, top 10 locations for orders, or order fulfillment timelines (the time it took from order creation to order delivery).

Walkthrough overview

To implement this solution, you complete the following high-level steps:

  1. Create solution resources using AWS CloudFormation.
  2. Connect to the inventory database.
  3. Load the inventory database with tables.
  4. Create a VPC endpoint using Amazon Virtual Private Cloud (Amazon VPC).
  5. Create gateway endpoints for Amazon S3 on the default VPC.
  6. Enable CloudWatch rules via Amazon EventBridge to ingest the data.
  7. Transform the data using AWS Glue.
  8. Visualize the data with QuickSight.

Prerequisites

Complete the following prerequisite steps:

  1. Create AWS account if you don’t have done already.
  2. Sign up for QuickSight if you’ve never used QuickSight in this account before. To use the forecast ability in QuickSight, sign up for the Enterprise Edition.

Create resources with AWS CloudFormation

To launch the provided CloudFormation template, complete the following steps:

  1. Choose Launch Stack:
  2. Choose Next.
  3. For Stack name, enter a name.
  4. Provide the following parameters:
    1. The name of the S3 bucket that holds all the data for the data lake.
    2. The name of the database that holds the inventory tables.
    3. The database user name.
    4. The database password.
  5. Enter any tags you want to assign to the stack and choose Next.
  6. Select the acknowledgement check boxes and choose Create stack.

The stack takes 5–10 minutes to complete.

On the AWS CloudFormation console, you can navigate to the stack’s Outputs tab to review the resources you created.

If you open the S3 bucket you created, you can observe its folder structure. The stack creates sample order data for the last 7 days.

Connect to the inventory database

To connect to your database in the query editor, complete the following steps:

  1. On the Amazon RDS console, choose the Region you deployed the stack in.
  2. In the navigation pane, choose Query Editor.

    If you haven’t connected to this database before, the Connect to database page opens.
  3. For Database instance or cluster, choose your database.
  4. For Database username, choose Connect with a Secrets Manager ARN.
    The database user name and password provided during stack creation are stored in AWS Secrets Manager. Alternatively, you can choose Add new database credentials and enter the database user name and password you provided when creating the stack.
  5. For Secrets Manager ARN, enter the value for the key InventorySecretManager from the CloudFormation stack outputs.
  6. Optionally, enter the name of your database.
  7. Choose Connect to database.

Load the inventory database with tables

Enter the following DDL statement in the query editor and choose Run:

CREATE TABLE INVENTORY (
    ItemID varchar(25) NOT NULL,
    ShipNode varchar(25) NOT NULL,
    SupplyType varchar(25) NOT NULL,
    SupplyDemandType varchar(25) NOT NULL,
    ItemName varchar(25),
    UOM varchar(10),
    Quantity int(11) NOT NULL,
    ETA varchar(25)	 ,
    UpdatedDate DATE,
    PRIMARY KEY (ItemID,ShipNode,SupplyType)
);

Create a VPC endpoint

To create your VPC endpoint, complete the following steps:

  1. On the Amazon VPC console, choose VPC Dashboard.
  2. Choose Endpoints in the navigation pane.
  3. Choose Create Endpoint.
  4. For Service category, select AWS services.
  5. For Service name, search for rds and choose the service name ending with rds-data.
  6. For VPC, choose the default VPC.
  7. Leave the remaining settings at their default and choose Create endpoint.

Create a gateway endpoint for Amazon S3

To create your gateway endpoint, complete the following steps:

  1. On the Amazon VPC console, choose VPC Dashboard.
  2. Choose Endpoints in the navigation pane.
  3. Choose Create Endpoint.
  4. For Service category, select AWS services.
  5. For Service name, search for S3 and choose the service name with type Gateway.
  6. For VPC, choose the default VPC.
  7. For Configure route tables, select the default route table.
  8. Leave the remaining settings at their default and choose Create endpoint.

Wait for both the gateway endpoint and VPC endpoint status to change to Available.

Enable CloudWatch rules to ingest the data

We created two CloudWatch rules via the CloudFormation template to ingest the order and inventory data to Kinesis Data Streams. To enable the rules via EventBridge, complete the following steps:

  1. On the CloudWatch console, under Events in the navigation pane, choose Rules.
  2. Make sure you’re in the Region where you created the stack.
  3. Choose Go to Amazon EventBridge.
  4. Select the rule Ingest-Inventory-Update-Schedule-Rule and choose Enable.
  5. Select the rule Ingest-Order-Schedule-Rule and choose Enable.

After 5–10 minutes, the Lambda functions start ingesting orders and inventory updates to their respective streams. You can check the S3 buckets orders-landing-zone and inventory-landing-zone to confirm that the data is being populated.

Perform data transformation

Our CloudFormation stack included a DataBrew project, a DataBrew job that runs every 5 minutes, and two AWS Glue crawlers. To perform data transformation using our AWS Glue resources, complete the following steps:

  1. On the DataBrew console, choose Projects in the navigation pane.
  2. Choose the project OrderDataTransform.

    You can review the project and its recipe on this page.
  3. In the navigation pane, choose Jobs.
  4. Review the job status to confirm it’s complete.
  5. On the AWS Glue console, choose Crawlers in the navigation pane.
    The crawlers crawl the transformed data and update the Data Catalog.
  6. Review the status of the two crawlers, which run every 15 minutes.
  7. Choose Tables in the navigation pane to view the two tables the crawlers created.
    If you don’t see these tables, you can run the crawlers manually to create them.

    You can query the data in the tables with Athena.
  8. On the Athena console, choose Query editor.
    If you haven’t created a query result location, you’re prompted to do that first.
  9. Choose View settings or choose the Settings tab.
  10. Choose Manage.
  11. Select the S3 bucket to store the results and choose Choose.
  12. Choose Query editor in the navigation pane.
  13. Choose either table (right-click) and choose Preview Table to view the table contents.

Visualize the data

If you have never used QuickSight in this account before, complete the prerequisite step to sign up for QuickSight. To use the ML capabilities of QuickSight (such as forecasting) sign up for the Enterprise Edition using the steps in this documentation.

While signing up for QuickSight, make sure to use the same region where you created the CloudFormation stack.

Grant QuickSight permissions

To visualize your data, you must first grant relevant permissions to QuickSight to access your data.

  1. On the QuickSight console, on the Admin drop-down menu, choose Manage QuickSight.
  2. In the navigation pane, choose Security & permissions.
  3. Under QuickSight access to AWS services, choose Manage.
  4. Select Amazon Athena.
  5. Select Amazon S3 to edit QuickSight access to your S3 buckets.
  6. Select the bucket you specified during stack creation (for this post, operational-datalake).
  7. Choose Finish.
  8. Choose Save.

Prepare the datasets

To prepare your datasets, complete the following steps:

  1. On the QuickSight console, choose Datasets in the navigation pane.
  2. Choose New dataset.
  3. Choose Athena.
  4. For Data source name, enter retail-analysis.
  5. Choose Validate connection.
  6. After your connection is validated, choose Create data source.
  7. For Database, choose orderdatalake.
  8. For Tables, select orders_clean.
  9. Choose Edit/Preview data.
  10. For Query mode, select SPICE.
    SPICE (Super-fast, Parallel, In-memory Calculation Engine) is the robust in-memory engine that QuickSight uses.
  11. Choose the orderdatetime field (right-click), choose Change data type, and choose Date.
  12. Enter the date format as MM/dd/yyyy HH:mm:ss.
  13. Choose Validate and Update.
  14. Change the data types of the following fields to QuickSight geospatial data types:
    1. billingaddress.zipcode – Postcode
    2. billingaddress.city – City
    3. billingaddress.country – Country
    4. billingaddress.state – State
    5. shippingaddress.zipcode – Postcode
    6. shippingaddress.city – City
    7. shippingaddress.country – Country
    8. shippingaddress.state – State
  15. Choose Save & publish.
  16. Choose Cancel to exit this page.

    Let’s create another dataset for the Athena table inventory_landing_zone.
  17. Follow steps 1–7 to create a new dataset. For Table selection, choose inventory_landing_zone.
  18. Choose Edit/Preview data.
  19. For Query mode, select SPICE.
  20. Choose Save & publish.
  21. Choose Cancel to exit this page.

    Both datasets should now be listed on the Datasets page.
  22. Choose each dataset and choose Refresh now.
  23. Select Full refresh and choose Refresh.

To set up a scheduled refresh, choose Schedule a refresh and provide your schedule details.

Create an analysis

To create an analysis in QuickSight, complete the following steps:

  1. On the QuickSight console, choose Analyses in the navigation pane.
  2. Choose New analysis.
  3. Choose the orders_clean dataset.
  4. Choose Create analysis.
  5. To adjust the theme, choose Themes in the navigation pane, choose your preferred theme, and choose Apply.
  6. Name the analysis retail-analysis.

Add visualizations to the analysis

Let’s start creating visualizations. The first visualization shows orders created over time.

  1. Choose the empty graph on the dashboard and for Visual type¸ choose the line chart.
    For more information about visual types, see Visual types in Amazon QuickSight.
  2. Under Field wells, drag orderdatetime to X axis and ordernumber to Value.
  3. Set ordernumber to Aggregate: Count distinct.

    Now we can filter these orders by Created status.
  4. Choose Filter in the navigation pane and choose Create one.
  5. Search for and choose status.
  6. Choose the status filter you just created.
  7. Select Created from the filter list and choose Apply.
  8. Choose the graph (right-click) and choose Add forecast.
    The forecasting ability is only available in the Enterprise Edition. QuickSight uses a built-in version of the Random Cut Forest (RCF) algorithm. For more information, refer to Understanding the ML algorithm used by Amazon QuickSight.
  9. Leave the settings as default and choose Apply.
  10. Rename the visualization to “Orders Created Over Time.”

If the forecast is applied successfully, the visualization shows the expected number of orders as well as upper and lower bounds.

If you get the following error message, allow for the data to accumulate for a few days before adding the forecast.

Let’s create a visualization on orders by location.

  1. On the Add menu, choose Add visual.
  2. Choose the points on map visual type.
  3. Under Field wells, drag shippingaddress.zipcode to Geospatial and ordernumber to Size.
  4. Change ordernumber to Aggregate: Count distinct.

    You should now see a map indicating the orders by location.
  5. Rename the visualization accordingly.

    Next, we create a drill-down visualization on the inventory count.
  6. Choose the pencil icon.
  7. Choose Add dataset.
  8. Select the inventory_landing_zone dataset and choose Select.
  9. Choose the inventory_landing_zone dataset.
  10. Add the vertical bar chart visual type.
  11. Under Field wells, drag itemname, shipnode, and invtype to X axis, and quantity to Value.
  12. Make sure that quantity is set to Sum.

    The following screenshot shows an example visualization of order inventory.
  13. To determine how many face masks were shipped out from each ship node, choose Face Masks (right-click) and choose Drill down to shipnode.
  14. You can drill down even further to invtype to see how many face masks in a specific ship node are in which status.

The following screenshot shows this drilled-down inventory count.

As a next step, you can create a QuickSight dashboard from the analysis you created. For instructions, refer to Tutorial: Create an Amazon QuickSight dashboard.

Clean up

To avoid any ongoing charges, on the AWS CloudFormation console, select the stack you created and choose Delete. This deletes all the created resources. On the stack’s Events tab, you can track the progress of the deletion, and wait for the stack status to change to DELETE_COMPLETE.

The Amazon EventBridge rules generate orders and inventory data every 15 minutes, to avoid generating huge amount of data, please ensure to delete the stack after testing the blog.

If the deletion of any resources fails, ensure that you delete them manually. For deleting Amazon QuickSight datasets, you can follow these instructions. You can delete the QuickSight Analysis using these steps. For deleting the QuickSight subscription and closing the account, you can follow these instructions.

Conclusion

In this post, we showed you how to use AWS analytics and storage services to build a serverless operational data lake. Kinesis Data Streams lets you ingest large volumes of data, and DataBrew lets you cleanse and transform the data visually. We also showed you how to analyze and visualize the order and inventory data using AWS Glue, Athena, and QuickSight. For more information and resources for data lakes on AWS, visit Analytics on AWS.


About the Authors

Gandhi Raketla is a Senior Solutions Architect for AWS. He works with AWS customers and partners on cloud adoption, as well as architecting solutions that help customers foster agility and innovation. He specializes in the AWS data analytics domain.

Sindhura Palakodety is a Solutions Architect at AWS. She is passionate about helping customers build enterprise-scale Well-Architected solutions on the AWS Cloud and specializes in the containers and data analytics domains.

Integrate AWS Glue Schema Registry with the AWS Glue Data Catalog to enable effective schema enforcement in streaming analytics use cases

Post Syndicated from Sam Mokhtari original https://aws.amazon.com/blogs/big-data/integrate-aws-glue-schema-registry-with-the-aws-glue-data-catalog-to-enable-effective-schema-enforcement-in-streaming-analytics-use-cases/

Metadata is an integral part of data management and governance. The AWS Glue Data Catalog can provide a uniform repository to store and share metadata. The main purpose of the Data Catalog is to provide a central metadata store where disparate systems can store, discover, and use that metadata to query and process the data.

Another important aspect of data governance is serving and managing the relationship between data stores and external clients, which are the producers and consumers of data. As the data evolves, especially in streaming use cases, we need a central framework that provides a contract between producers and consumers to enable schema evolution and improved governance. The AWS Glue Schema Registry provides a centralized framework to help manage and enforce schemas on data streaming applications using convenient integrations with Apache Kafka and Amazon Managed Streaming for Apache Kafka (Amazon MSK), Amazon Kinesis Data Streams, Apache Flink and Amazon Kinesis Data Analytics for Apache Flink, and AWS Lambda.

In this post, we demonstrate how to integrate Schema Registry with the Data Catalog to enable efficient schema enforcement in streaming analytics use cases.

Stream analytics on AWS

There are many different scenarios where customers want to run stream analytics on AWS while managing the schema evolution effectively. To manage the end-to-end stream analytics life cycle, there are many different applications involved for data production, processing, analytics, routing, and consumption. It can be quite hard to manage changes across different applications for stream analytics use cases. Adding/removing a data field across different stream analytics applications can lead to data quality issues or downstream application failures if it is not managed appropriately.

For example, a large grocery store may want to send orders information using Amazon KDS to it’s backend systems. While sending the order information, customer may want to make some data transformations or run analytics on it. The orders may be routed to different targets depending upon the type of orders and it may be integrated with many backend applications which expect order stream data in specific format. But the order details schema can change due to many different reasons such as new business requirements, technical changes, source system upgrades or something else.

The changes are inevitable but customers want a mechanism to manage these changes effectively while running their stream analytics workloads.  To support stream analytics use cases on AWS and enforce schema and governance, customers can make use of AWS Glue Schema Registry along with AWS Stream analytics services.

You can use Amazon Kinesis Data Firehose data transformation to ingest data from Kinesis Data Streams, run a simple data transformation on a batch of records via a Lambda function, and deliver the transformed records to destinations such as Amazon Simple Storage Service (Amazon S3), Amazon Redshift, Amazon OpenSearch Service, Splunk, Datadog, NewRelic, Dynatrace, Sumologic, LogicMonitor, MongoDB, and an HTTP endpoint. The Lambda function transforms the current batch of records with no information or state from previous batches.

Lambda function also has the stream analytics capability for Amazon Kinesis Data Analytics and Amazon DynamoDB. This feature enables data aggregation and state management across multiple function invocations. This capability uses a tumbling window, which is a fixed-size, non-overlapping time interval of up to 15 minutes. When you apply a tumbling window to a stream, records in the stream are grouped by window and sent to the processing Lambda function. The function returns a state value that is passed to the next tumbling window.

Kinesis Data Analytics provides SQL-based stream analytics against streaming data. This service also enables you to use an Apache Flink application to process stream data. Data can be ingested from Kinesis Data Streams and Kinesis Data Firehose while supporting Kinesis Data Firehose (Amazon S3, Amazon Redshift, Amazon OpenSearch Service, and Splunk), Lambda, and Kinesis Data Streams as destinations.

Finally, you can use the AWS Glue streaming extract, transform, and load (ETL) capability as a serverless method to consume data from Kinesis and Apache Kafka or Amazon MSK. The job aggregates, transforms, and enriches the data using Spark streaming, then continuously loads the results into Amazon S3-based data lakes, data warehouses, DynamoDB, JDBC, and more.

Managing stream metadata and schema evolution is becoming more important for stream analytics use cases. To enable these on AWS, the Data Catalog and Schema Registry allow you to centrally control and discover schemas. Before the release of schema referencing in the Data Catalog, you relied on managing schema evolution separately in the Data Catalog and Schema Registry, which usually leads to inconsistencies between these two. With the new release of the Data Catalog and Schema Registry integration, you can now reference schemas stored in the schema registry when creating or updating AWS Glue tables in the Data Catalog. This helps avoid inconsistency between the schema registry and Data Catalog, which results in end-to-end data quality enforcement.

In this post, we walk you through a streaming ETL example in AWS Glue to better showcase how this integration can help. This example includes reading streaming data from Kinesis Data Streams, schema discovery with Schema Registry, using the Data Catalog to store the metadata, and writing out the results to an Amazon S3 as a sink.

Solution overview

The following high-level architecture diagram shows the components to integrate Schema Registry and the Data Catalog to run streaming ETL jobs. In this architecture, Schema Registry helps centrally track and evolve Kinesis Data Streams schemas.

At a high level, we use the Amazon Kinesis Data Generator (KDG) to stream data to a Kinesis data stream, use AWS Glue to run streaming ETL, and use Amazon Athena to query the data.

In the following sections, we walk you through the steps to build this architecture.

Create a Kinesis data stream

To set up a Kinesis data stream, complete the following steps:

  1. On the Kinesis console, choose Data streams.
  2. Choose Create data stream.
  3. Give the stream a name, such as ventilator_gsr_stream.
  4. Complete stream creation.

Configure Kinesis Data Generator to generate sample data

You can use the KDG with the ventilator template available on the GitHub repo to generate sample data. The following diagram shows the template on the KDG console.

Add a new AWS Glue schema registry

To add a new schema registry, complete the following steps:

  1. On the AWS Glue console, under Data catalog in the navigation pane, choose Schema registries.
  2. Choose Add registry.
  3. For Registry name, enter a name (for example, MyDemoSchemaReg).
  4. For Description, enter an optional description for the registry.
  5. Choose Add registry.

Add a schema to the schema registry

To add a new schema, complete the following steps:

  1. On the AWS Glue console, under Schema registries in the navigation pane, choose Schemas.
  2. Choose Add schema.
  3. Provide the schema name (ventilatorstream_schema_gsr) and attach the schema to the schema registry defined in the previous step.
  4. AWS Glue schemas currently support Avro or JSON formats; for this post, select JSON.
  5. Use the default Compatibility mode and provide the necessary tags as per your tagging strategy.

Compatibility modes allow you to control how schemas can or cannot evolve over time. These modes form the contract between applications producing and consuming data. When a new version of a schema is submitted to the registry, the compatibility rule applied to the schema name is used to determine if the new version can be accepted. For more information on different compatibility modes, refer to Schema Versioning and Compatibility.

  1. Enter the following sample JSON:
    {
      "$id": "https://example.com/person.schema.json",
      "$schema": "http://json-schema.org/draft-07/schema#",
      "title": "Ventilator",
      "type": "object",
      "properties": {
        "ventilatorid": {
          "type": "integer",
          "description": "Ventilator ID"
        },
        "eventtime": {
          "type": "string",
          "description": "Time of the event."
        },
        "serialnumber": {
          "description": "Serial number of the device.",
          "type": "string",
          "minimum": 0
        },
        "pressurecontrol": {
          "description": "Pressure control of the device.",
          "type": "integer",
          "minimum": 0
        },
        "o2stats": {
          "description": "O2 status.",
          "type": "integer",
          "minimum": 0
        },
        "minutevolume": {
          "description": "Volume.",
          "type": "integer",
          "minimum": 0
        },
        "manufacturer": {
          "description": "Volume.",
          "type": "string",
          "minimum": 0
        }
      }
    }

  2. Choose Create schema and version.

Create a new Data Catalog table

To add a new table in the Data Catalog, complete the following steps:

  1. On the AWS Glue Console, under Data Catalog in the navigation pane, choose Tables.
  2. Choose Add table.
  3. Select Add tables from existing schema.
  4. Enter the table name and choose the database.
  5. Select the source type as Kinesis and choose a data stream in your own account.
  6. Choose the respective Region and choose the stream ventilator_gsr_stream.
  7. Choose the MyDemoSchemaReg registry created earlier and the schema (ventilatorstream_schema_gsr) with its respective version.

You should be able to preview the schema.

  1. Choose Next and then choose Finish to create your table.

Create the AWS Glue job

To create your AWS Glue job, complete the following steps:

  1. On the AWS Glue Studio console, choose Jobs in the navigation pane.
  2. Select Visual with a source and target.
  3. Under Source, select Amazon Kinesis and under Target, select Amazon S3.
  4. Choose Create.
  5. Choose Data source.
  6. Configure the job properties such as name, AWS Identity and Access Management (IAM) role, type, and AWS version.

For the IAM role, specify a role that is used for authorization to resources used to run the job and access data stores. Because streaming jobs require connecting to sources and sinks, you need to make sure that the IAM role has permissions to read from Kinesis Data Streams and write to Amazon S3.

  1. For This job runs, select A new script authored by you.
  2. Under Advanced properties, keep Job bookmark disabled.
  3. For Log Filtering, select Standard filter and Spark UI.
  4. Under Monitoring options, enable Job metrics and Continuous logging with Standard filter.
  5. Enable the Spark UI and provide the S3 bucket path to store the Spark event logs.
  6. For Job parameters, enter the following key-values:
    • –output_path – The S3 path where the final aggregations are persisted
    • –aws_region – The Region where you run the job
  7. Leave Connections empty and choose Save job and edit script.
  8. Use the following code for the AWS Glue job (update the values for database, table_name, and checkpointLocation):
import sys
import datetime
import boto3
import base64
from pyspark.sql import DataFrame, Row
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame

args = getResolvedOptions(sys.argv, \
['JOB_NAME', \
'aws_region', \
'output_path'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# S3 sink locations
aws_region = args['aws_region']
output_path = args['output_path']

s3_target = output_path + "ventilator_metrics"
checkpoint_location = output_path + "cp/"
temp_path = output_path + "temp/"


def processBatch(data_frame, batchId):
now = datetime.datetime.now()
year = now.year
month = now.month
day = now.day
hour = now.hour
minute = now.minute
if (data_frame.count() > 0):
dynamic_frame = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame")
apply_mapping = ApplyMapping.apply(frame = dynamic_frame, mappings = [ \
("ventilatorid", "long", "ventilatorid", "long"), \
("eventtime", "string", "eventtime", "timestamp"), \
("serialnumber", "string", "serialnumber", "string"), \
("pressurecontrol", "long", "pressurecontrol", "long"), \
("o2stats", "long", "o2stats", "long"), \
("minutevolume", "long", "minutevolume", "long"), \
("manufacturer", "string", "manufacturer", "string")],\
transformation_ctx = "apply_mapping")

dynamic_frame.printSchema()

# Write to S3 Sink
s3path = s3_target + "/ingest_year=" + "{:0>4}".format(str(year)) + "/ingest_month=" + "{:0>2}".format(str(month)) + "/ingest_day=" + "{:0>2}".format(str(day)) + "/ingest_hour=" + "{:0>2}".format(str(hour)) + "/"
s3sink = glueContext.write_dynamic_frame.from_options(frame = apply_mapping, connection_type = "s3", connection_options = {"path": s3path}, format = "parquet", transformation_ctx = "s3sink")

# Read from Kinesis Data Stream
sourceData = glueContext.create_data_frame.from_catalog( \
database = "kinesislab", \
table_name = "ventilator_gsr_new", \
transformation_ctx = "datasource0", \
additional_options = {"startingPosition": "TRIM_HORIZON", "inferSchema": "true"})

sourceData.printSchema()

glueContext.forEachBatch(frame = sourceData, batch_function = processBatch, options = {"windowSize": "100 seconds", "checkpointLocation": "s3://<bucket name>/ventilator_gsr/checkpoint/"})
job.commit()

Our AWS Glue job is ready to read the data from the Kinesis data stream and send it to Amazon S3 in Parquet format.

Query the data using Athena

The processed streaming data is written in Parquet format to Amazon S3. Run an AWS Glue crawler on the Amazon S3 location where the streaming data is written; the crawler updates the Data Catalog. You can then run queries using Athena to start driving relevant insights from the data.

Clean up

It’s always a good practice to clean up all the resources created as part of this post to avoid any undue cost. To clean up your resources, delete the AWS Glue database, tables, crawlers, jobs, service role, and S3 buckets.

Additionally, be sure to clean up all other AWS resources that you created using AWS CloudFormation. You can delete these resources on the AWS CloudFormation console by deleting the stack used for the Kinesis Data Generator.

Conclusion

This post demonstrated the importance of centrally managing metadata and schema evolution in stream analytics use cases. It also described how the integration of the Data Catalog and Schema Registry can help you achieve this on AWS. We used a streaming ETL example in AWS Glue to better showcase how this integration can help to enforce end-to-end data quality.

To learn more and get started, you can check out AWS Glue Data Catalog and AWS Glue Schema Registry.


About the Authors

Dr. Sam Mokhtari is a Senior Solutions Architect at AWS. His main area of depth is data and analytics, and he has published more than 30 influential articles in this field. He is also a respected data and analytics advisor, and has led several large-scale implementation projects across different industries, including energy, health, telecom, and transport.

Amar Surjit is a Sr. Solutions Architect based in the UK who has been working in IT for over 20 years designing and implementing global solutions for enterprise customers. He is passionate about streaming technologies and enjoys working with customers globally to design and build streaming architectures and drive value by analyzing their streaming data.

Synchronize your AWS Glue Studio Visual Jobs to different environments 

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/synchronize-your-aws-glue-studio-visual-jobs-to-different-environments/

AWS Glue has become a popular option for integrating data from disparate data sources due to its ability to integrate large volumes of data using distributed data processing frameworks. Many customers use AWS Glue to build data lakes and data warehouses. Data engineers who prefer to develop data processing pipelines visually using AWS Glue Studio to create data integration jobs. This post introduces Glue Visual Job API to author the Glue Studio Visual Jobs programmatically, and Glue Job Sync utility that uses the API to easily synchronize Glue jobs to different environments without losing the visual representation.

Glue Job Visual API

AWS Glue Studio has a graphical interface called Visual Editor that makes it easy to author extract, transform, and load (ETL) jobs in AWS Glue. The Glue jobs created in the Visual Editor contain its visual representation that composes data transformation. In this post, we call the jobs Glue Studio Visual Jobs.

For example, it’s common to develop and test AWS Glue jobs in a dev account, and then promote the jobs to a prod account. Previously, when you copied the AWS Glue Studio Visual jobs to a different environment, there was no mechanism to copy the visual representation together. This means that the visual representation of the job was lost and you could only copy the code produced with Glue Studio. It can be time consuming and tedious to either copy the code or recreate the job.

AWS Glue Job Visual API lets you programmatically create and update Glue Studio Visual Jobs by providing a JSON object that indicates visual representation, and also retrieve the visual representation from existing Glue Studio Visual Jobs. A Glue Studio Visual Job consists of data source nodes for reading the data, transform nodes for modifying the data, and data target nodes for writing the data.

There are some typical use cases for Glue Visual Job API:

  • Automate creation of Glue Visual Jobs.
  • Migrate your ETL jobs from third-party or on-premises ETL tools to AWS Glue. Many AWS partners, such as Bitwise, Bladebridge, and others have built convertors from the third-party ETL tools to AWS Glue.
  • Synchronize AWS Glue Studio Visual jobs from one environment to another without losing visual representation.

In this post, we focus on a utility that uses Glue Job Visual APIs to achieve the mass synchronization of your Glue Studio Visual Jobs without losing the visual representation.

Glue Job Sync Utility

There are common requirements to synchronize the Glue Visual Jobs between different environments.

  • Promote Glue Visual Jobs from a dev account to a prod account.
  • Transfer ownership of Glue Visual Jobs between different AWS accounts.
  • Replicate Glue Visual Job configurations from one region to another for disaster recovery purpose.

Glue Job Sync Utility is built on top of Glue Visual Job API, and the utility lets you synchronize the jobs to different accounts without losing the visual representation. The Glue Job Sync Utility is a python application that enables you to synchronize your AWS Glue Studio Visual jobs to different environments using the new Glue Job Visual API. This utility requires that you provide source and target AWS environment profiles. Optionally, you can provide a list of jobs that you want to synchronize, and specify how the utility should replace your environment-specific objects using a mapping file. For example, Amazon Simple Storage Service (Amazon S3) locations in your development environment and role can be different than your production environment. The mapping config file will be used to replace the environment specific objects.

How to use Glue Job Sync Utility

In this example, we’re synchronizing two AWS Glue Studio Visual jobs, test1 and test2, from the development environment to the production environment in a different account.

  • Source environment (dev environment)
    • AWS Account ID: 123456789012
    • AWS Region: eu-west-3 (Paris)
    • AWS Glue Studio Visual jobs: test1, test2
    • AWS Identity and Access Management (IAM) Role ARN for Glue job execution role: arn:aws:iam::123456789012:role/GlueServiceRole
    • Amazon S3 bucket for Glue job script and other asset location: s3://aws-glue-assets-123456789012-eu-west-3/
    • Amazon S3 bucket for data location: s3://dev-environment/
  • Destination environment (prod environment)
    • AWS Account ID: 234567890123
    • AWS Region: eu-west-3 (Paris)
    • IAM Role ARN for Glue job execution role: arn:aws:iam::234567890123:role/GlueServiceRole
    • Amazon S3 bucket for Glue job script and other asset location: s3://aws-glue-assets-234567890123-eu-west-3/
    • Amazon S3 bucket for data location: s3://prod-environment/

Set up the utility in your local environment

You will need the following prerequisites for this utility:

  • Python 3.6 or later.
  • Latest version of boto3.
  • Create two AWS named profiles, dev and prod, with the corresponding credentials in your environment. Follow this instruction.

Download the Glue Job Sync Utility

Download the sync utility from the GitHub repository to your local machine.

Create AWS Glue Studio Visual Jobs

  1. Create two AWS Glue Studio Visual jobs, test1, and test2, in the source account.
    • If you don’t have any AWS Glue Studio Visual jobs, then follow this instruction to create the Glue Studio Visual jobs.

  2. Open AWS Glue Studio in the destination account and verify that the test1 and test2 jobs aren’t present.

Run the Job Sync Utility

  1. Create a new file named mapping.json, and enter the following JSON code. With the configuration in line 1, the sync utility will replace all of the Amazon S3 references within the job (in this case s3://aws-glue-assets-123456789012-eu-west-3) to the mapped location (in this case s3://aws-glue-assets-234567890123-eu-west-3). Then, the utility will create the job to the destination environment. Along these lines, line 2 and line 3 will trigger appropriate substitutions in the job. Note that these are example values and you’ll need to substitute the right values that match your environment.

    {
        "s3://aws-glue-assets-123456789012-eu-west-3": "s3://aws-glue-assets-234567890123-eu-west-3",
        "arn:aws:iam::123456789012:role/GlueServiceRole": "arn:aws:iam::234567890123:role/GlueServiceRole",
        "s3://dev-environment": "s3://prod-environment"
    }

  2. Execute the utility by running the following command:
    $ python3 sync.py --src-profile dev --src-region eu-west-3 --dst-profile prod --dst-region eu-west-3 --src-job-names test1,test2 --config-path mapping.json

  3. Verify successful synchronization by opening AWS Glue Studio in the destination account:
  4. Open the Glue Studio Visual jobs, test1, and test2, and verify the visual representation of the DAG.

The screenshot above shows that you were able to copy the jobs test1 and test2 while keeping DAG into the destination account.

Conclusion

AWS Glue Job Visual API and the AWS Glue Sync Utility simplify how you synchronize your jobs to different environments. These are designed to easily integrate into your Continuous Integration pipelines while retaining the visual representation that improves the readability of the ETL pipeline.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for designing AWS features, implementing software artifacts, and helping customer architectures. In his spare time, he enjoys watching anime in Prime Video.

Aaron Meltzer is a Software Engineer on the AWS Glue Studio team. He leads the design and implementation of features to simplify the management of AWS Glue jobs. Outside of work, Aaron likes to read and learn new recipes.

Mohamed Kiswani is the Software Development Manager on the AWS Glue Team

Shiv Narayanan is a Senior Technical Product Manager on the AWS Glue team.

How to unit test and deploy AWS Glue jobs using AWS CodePipeline

Post Syndicated from Praveen Kumar Jeyarajan original https://aws.amazon.com/blogs/devops/how-to-unit-test-and-deploy-aws-glue-jobs-using-aws-codepipeline/

This post is intended to assist users in understanding and replicating a method to unit test Python-based ETL Glue Jobs, using the PyTest Framework in AWS CodePipeline. In the current practice, several options exist for unit testing Python scripts for Glue jobs in a local environment. Although a local development environment may be set up to build and unit test Python-based Glue jobs, by following the documentation, replicating the same procedure in a DevOps pipeline is difficult and time consuming.

Unit test scripts are one of the initial quality gates used by developers to provide a high-quality build. One must reuse these scripts during regression testing to make sure that all of the existing functionality is intact, and that new releases don’t disrupt key application functionality. The majority of the regression test suites are expected to be integrated with the DevOps Pipeline for its execution. Unit testing an application code is a fundamental task that evaluates  whether each (unit) code written by a programmer functions as expected. Unit testing of code provides a mechanism to determine that software quality hasn’t been compromised. One of the difficulties in building Python-based Glue ETL tasks is their ability for unit testing to be incorporated within DevOps Pipeline, especially when there are modernization of mainframe ETL process to modern tech stacks in AWS

AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning (ML), and application development. AWS Glue provides all of the capabilities needed for data integration. This means that you can start analyzing your data and putting it to use in minutes rather than months. AWS Glue provides both visual and code-based interfaces to make data integration easier.

Prerequisites

GitHub Repository

Amazon ECR Image URI for Glue Library

Solution overview

A typical enterprise-scale DevOps pipeline is illustrated in the following diagram. This solution describes how to incorporate the unit testing of Python-based AWS Glue ETL processes into the AWS DevOps Pipeline.

Figure 1 Solution Overview

The GitHub repository aws-glue-jobs-unit-testing has a sample Python-based Glue job in the src folder. Its associated unit test cases built using the Pytest Framework are accessible in the tests folder. An AWS CloudFormation template written in YAML is included in the deploy folder. As a runtime environment, AWS CodeBuild utilizes custom container images. This feature is used to build a project utilizing Glue libraries from Public ECR repository, that can run the code package to demonstrate unit testing integration.

Solution walkthrough

Time to read  7 min
Time to complete  15-20 min
Learning level  300
Services used
AWS CodePipeline, AWS CodeCommit, AWS CodeBuild, Amazon Elastic Container Registry (Amazon ECR) Public Repositories, AWS CloudFormation

The container image at the Public ECR repository for AWS Glue libraries includes all of the binaries required to run PySpark-based AWS Glue ETL tasks locally, as well as unit test them. The public container repository has three image tags, one for each AWS Glue version supported by AWS Glue. To demonstrate the solution, we use the image tag glue_libs_3.0.0_image_01 in this post. To utilize this container image as a runtime image in CodeBuild, copy the Image URI corresponding to the image tag that you intend to use, as shown in the following image.

Figure 2 Select Glue Library from Public ECR

The aws-glue-jobs-unit-testing GitHub repository contains a CloudFormation template, pipeline.yml, which deploys a CodePipeline with CodeBuild projects to create, test, and publish the AWS Glue job. As illustrated in the following, use the copied image URL from Amazon ECR public to create and test a CodeBuild project.

  TestBuild:
    Type: AWS::CodeBuild::Project
    Properties:
      Artifacts:
        Type: CODEPIPELINE
      BadgeEnabled: false
      Environment:
        ComputeType: BUILD_GENERAL1_LARGE
        Image: "public.ecr.aws/glue/aws-glue-libs:glue_libs_3.0.0_image_01"
        ImagePullCredentialsType: CODEBUILD
        PrivilegedMode: false
        Type: LINUX_CONTAINER
      Name: !Sub "${RepositoryName}-${BranchName}-build"
      ServiceRole: !GetAtt CodeBuildRole.Arn  

The pipeline performs the following operations:

  1. It uses the CodeCommit repository as the source and transfers the most recent code from the main branch to the CodeBuild project for further processing.
  2. The following stage is build and test, in which the most recent code from the previous phase is unit tested and the test report is published to CodeBuild report groups.
  3. If all of the test results are good, then the next CodeBuild project is launched to publish the code to an Amazon Simple Storage Service (Amazon S3) bucket.
  4. Following the successful completion of the publish phase, the final step is to deploy the AWS Glue task using the CloudFormation template in the deploy folder.

Deploying the solution

Set up

Now we’ll deploy the solution using a CloudFormation template.

  • Using the GitHub Web, download the code.zip file from the aws-glue-jobs-unit-testing repository. This zip file contains the GitHub repository’s src, tests, and deploy folders. You may also create the zip file yourself using command-line tools, such as git and zip. To create the zip file on Linux or Mac, open the terminal and enter the following commands.
git clone https://github.com/aws-samples/aws-glue-jobs-unit-testing.git
cd aws-glue-jobs-unit-testing
git checkout master
zip -r code.zip src/ tests/ deploy/
  • Sign in to the AWS Management Console and choose the AWS Region of your choice.
  • Create an Amazon S3 bucket. For more information, see How Do I Create an S3 Bucket? in the AWS documentation.
  • Upload the downloaded zip package, code.zip, to the Amazon S3 bucket that you created.

In this example, I created an Amazon S3 bucket named aws-glue-artifacts-us-east-1 in the N. Virginia (us-east-1) Region, and used the console to upload the zip package from the GitHub repository to the Amazon S3 bucket.

Figure 3 Upload code.zip file to S3 bucket

Creating the stack

  1.  In the CloudFormation console, choose Create stack.
  2. On the Specify template page, choose Upload a template file, and then choose the pipeline.yml template, downloaded from the GitHub repository

Figure 4 Upload pipeline.yml template to create a new CloudFormation stack

  1. Specify the following parameters:.
  • Stack name: glue-unit-testing-pipeline (Choose a stack name of your choice)
  • ApplicationStackName: glue-codepipeline-app (This is the name of the CloudFormation stack that will be created by the pipeline)
  • BranchName: master (This is the name of the branch to be created in the CodeCommit repository to check-in the code from the Amazon S3 bucket zip file)
  • BucketName: aws-glue-artifacts-us-east-1 (This is the name of the Amazon S3 bucket that contains the zip file. This bucket will also be used by the pipeline for storing code artifacts)
  • CodeZipFile: lambda.zip (This is the key name of the sample code Amazon S3 object. The object should be a zip file)
  • RepositoryName: aws-glue-unit-testing (This is the name of the CodeCommit repository that will be created by the stack)
  • TestReportGroupName: glue-unittest-report (This is the name of the CodeBuild test report group that will be created to store the unit test reports)

Figure 5 Fill parameters for stack creation

  1. Choose Next, and again Next.
  1. On the Review page, under Capabilities, choose the following options:
  • I acknowledge that CloudFormation might create IAM resources with custom names.

Figure 6 Acknowledge IAM roles creation

  1. Choose Create stack to begin the stack creation process. Once the stack creation is complete, the resources that were created are displayed on the Resources tab. The stack creation takes approximately 5-7 minutes.

Figure 7 Successful completion of stack creation

The stack automatically creates a CodeCommit repository with the initial code checked-in from the zip file uploaded to the Amazon S3 bucket. Furthermore, it creates a CodePipeline view using the CodeCommit repository as the source. In the above example, the CodeCommit repository is aws-glue-unit-test, and the pipeline is aws-glue-unit-test-pipeline.

Testing the solution

To test the deployed pipeline, open the CodePipeline console and select the pipeline created by the CloudFormation stack. Select the Release Change button on the pipeline page.

Figure 8 Choose Release Change on pipeline page

The pipeline begins its execution with the most recent code in the CodeCommit repository.

When the Test_and_Build phase is finished, select the Details link to examine the execution logs.

Figure 9 Successfully completed the Test_and_Build stage

Select the Reports tab, and choose the test report from Report history to view the unit execution results.

Figure 10 Test report from pipeline execution

Finally, after the deployment stage is complete, you can see, run, and monitor the deployed AWS Glue job on the AWS Glue console page. For more information, refer to the Running and monitoring AWS Glue documentation

Figure 11 Successful pipeline execution

Cleanup

To avoid additional infrastructure costs, make sure that you delete the stack after experimenting with the examples provided in the post. On the CloudFormation console, select the stack that you created, and then choose Delete. This will delete all of the resources that it created, including CodeCommit repositories, IAM roles/policies, and CodeBuild projects.

Summary

In this post, we demonstrated how to unit test and deploy Python-based AWS Glue jobs in a pipeline with unit tests written with the PyTest framework. The approach is not limited to CodePipeline, and it can be used to build up a local development environment, as demonstrated in the Big Data blog. The aws-glue-jobs-unit-testing GitHub repository contains the example’s CloudFormation template, as well as sample AWS Glue Python code and Pytest code used in this post. If you have any questions or comments regarding this example, please open an issue or submit a pull request.

Authors:

Praveen Kumar Jeyarajan

Praveen Kumar Jeyarajan is a PraveenKumar is a Senior DevOps Consultant in AWS supporting Enterprise customers and their journey to the cloud. He has 11+ years of DevOps experience and is skilled in solving myriad technical challenges using the latest technologies. He holds a Masters degree in Software Engineering. Outside of work, he enjoys watching movies and playing tennis.

Vaidyanathan Ganesa Sankaran

Vaidyanathan Ganesa Sankaran is a Sr Modernization Architect at AWS supporting Global Enterprise customers on their journey towards modernization. He is specialized in Artificial intelligence, legacy Modernization and Cloud Computing. He holds a Masters degree in Software Engineering and has 12+ years of Modernization experience. Outside work, he loves conducting training sessions for college grads and professional starter who wants to learn cloud and AI. His hobbies are playing tennis, philately and traveling.

Author AWS Glue jobs with PyCharm using AWS Glue interactive sessions

Post Syndicated from Kunal Ghosh original https://aws.amazon.com/blogs/big-data/author-aws-glue-jobs-with-pycharm-using-aws-glue-interactive-sessions/

Data lakes, business intelligence, operational analytics, and data warehousing share a common core characteristic—the ability to extract, transform, and load (ETL) data for analytics. Since its launch in 2017, AWS Glue has provided serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development.

AWS Glue interactive sessions allows programmers to build, test, and run data preparation and analytics applications. Interactive sessions provide access to run fully managed serverless Apache Spark using an on-demand model. AWS Glue interactive sessions also provide advanced users the same Apache Spark engine as AWS Glue 2.0 or AWS Glue 3.0, with built-in cost controls and speed. Additionally, development teams immediately become productive using their existing development tool of choice.

In this post, we walk you through how to use AWS Glue interactive sessions with PyCharm to author AWS Glue jobs.

Solution overview

This post provides a step-by-step walkthrough that builds on the instructions in Getting started with AWS Glue interactive sessions. It guides you through the following steps:

  1. Create an AWS Identity and Access Management (IAM) policy with limited Amazon Simple Storage Service (Amazon S3) read privileges and associated role for AWS Glue.
  2. Configure access to a development environment. You can use a desktop computer or an OS running on the AWS Cloud using Amazon Elastic Compute Cloud (Amazon EC2).
  3. Integrate AWS Glue interactive sessions with an integrated development environments (IDE).

We use the script Validate_Glue_Interactive_Sessions.ipynb for validation, available as a Jupyter notebook.

Prerequisites

You need an AWS account before you proceed. If you don’t have one, refer to How do I create and activate a new AWS account? This guide assumes that you already have installed Python and PyCharm. Python 3.7 or later is the foundational prerequisite.

Create an IAM policy

The first step is to create an IAM policy that limits read access to the S3 bucket s3://awsglue-datasets, which has the AWS Glue public datasets. You use IAM to define the policies and roles for access to AWS Glue.

  1. On the IAM console, choose Policies in the navigation pane.
  2. Choose Create policy.
  3. On the JSON tab, enter the following code:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:Get*",
                    "s3:List*",
                    "s3-object-lambda:Get*",
                    "s3-object-lambda:List*"
                ],
                "Resource": ["arn:aws:s3:::awsglue-datasets/*"]
            }
        ]
    }

  4. Choose Next: Tags.
  5. Choose Next: Review.
  6. For Policy name, enter glue_interactive_policy_limit_s3.
  7. For Description, enter a description.
  8. Choose Create policy.

Create an IAM role for AWS Glue

To create a role for AWS Glue with limited Amazon S3 read privileges, complete the following steps:

  1. On the IAM console, choose Roles in the navigation pane.
  2. Choose Create role.
  3. For Trusted entity type, select AWS service.
  4. For Use cases for other AWS services, choose Glue.
  5. Choose Next.
  6. On the Add permissions page, search and choose the AWS managed permission policies AWSGlueServiceRole and glue_interactive_policy_limit_s3.
  7. Choose Next.
  8. For Role name, enter glue_interactive_role.
  9. Choose Create role.
  10. Note the ARN of the role, arn:aws:iam::<replacewithaccountID>:role/glue_interactive_role.

Set up development environment access

This secondary level of access configuration needs to occur on the developer’s environment. The development environment can be a desktop computer running Windows or Mac/Linux, or similar operating systems running on the AWS Cloud using Amazon EC2. The following steps walk through each client access configuration. You can select the configuration path that is applicable to your environment.

Set up a desktop computer

To set up a desktop computer, we recommend completing the steps in Getting started with AWS Glue interactive sessions.

Set up an AWS Cloud-based computer with Amazon EC2

This configuration path follows the best practices for providing access to cloud-based resources using IAM roles. For more information, refer to Using an IAM role to grant permissions to applications running on Amazon EC2 instances.

  1. On the IAM console, choose Roles in the navigation pane.
  2. Choose Create role.
  3. For Trusted entity type¸ select AWS service.
  4. For Common use cases, select EC2.
  5. Choose Next.
  6. Add the AWSGlueServiceRole policy to the newly created role.
  7. On the Add permissions menu, choose Create inline policy.
  8. Create an inline policy that allows the instance profile role to pass or assume glue_interactive_role and save the new role as ec2_glue_demo.

Your new policy is now listed under Permissions policies.

  1. On the Amazon EC2 console, choose (right-click) the instance you want to attach to the newly created role.
  2. Choose Security and choose Modify IAM role.
  3. For IAM role¸ choose the role ec2_glue_demo.
  4. Choose Save.
  5. On the IAM console, open and edit the trust relationship for glue_interactive_role.
  6. Add “AWS”: [“arn:aws:iam:::user/glue_interactive_user”,”arn:aws:iam:::role/ec2_glue_demo”] to the principal JSON key.
  7. Complete the steps detailed in Getting started with AWS Glue interactive sessions.

You don’t need to provide an AWS access key ID or AWS secret access key as part of the remaining steps.

Integrate AWS Glue interactive sessions with an IDE

You’re now ready to set up and validate your PyCharm integration with AWS Glue interactive sessions.

  1. On the welcome page, choose New Project.
  2. For Location, enter the location of your project glue-interactive-demo.
  3. Expand Python Interpreter.
  4. Select Previously configured interpreter and choose the virtual environment you created earlier.
  5. Choose Create.

The following screenshot shows the New Project page on a Mac computer. A Windows computer setup will have a relative path beginning with C:\ followed by the PyCharm project location.

  1. Choose the project (right-click) and on the New menu, choose Jupyter Notebook.
  2. Name the notebook Validate_Glue_Interactive_Sessions.

The notebook has a drop-down called Managed Jupyter server: auto-start, which means the Jupyter server automatically starts when any notebook cell is run.

  1. Run the following code:
    print("This notebook will start the local Python kernel")

You can observe that the Jupyter server started running the cell.

  1. On the Python 3 (ipykernal) drop-down, choose Glue PySpark.
  2. Run the following code to start a Spark session:
    spark

  3. Wait to receive the message that a session ID has been created.
  4. Run the following code in each cell, which is the boilerplate syntax for AWS Glue:
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    glueContext = GlueContext(SparkContext.getOrCreate())

  5. Read the publicly available Medicare Provider payment data in the AWS Glue data preparation sample document:
    medicare_dynamicframe = glueContext.create_dynamic_frame.from_options(
        's3',
        {'paths': ['s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv']},
        'csv',
        {'withHeader': True})
    print("Count:",medicare_dynamicframe.count())
    medicare_dynamicframe.printSchema()

  6. Change the data type of the provider ID to long to resolve all incoming data to long:
    medicare_res = medicare_dynamicframe.resolveChoice(specs = [('Provider Id','cast:long')])
    medicare_res.printSchema()

  7. Display the providers:
    medicare_res.toDF().select('Provider Name').show(10,truncate=False)

Clean up

You can run %delete_session which deletes the current session and stops the cluster, and the user stops being charged. Have a look at the AWS Glue interactive sessions magics. Also please remember to delete IAM policy and role once you are done.

Conclusion

In this post, we demonstrated how to configure PyCharm to integrate and work with AWS Glue interactive sessions. The post builds on the steps in Getting started with AWS Glue interactive sessions to enable AWS Glue interactive sessions to work with Jupyter notebooks. We also provided ways to validate and test the functionality of the configuration.


About the Authors

Kunal Ghosh is a Sr. Solutions Architect at AWS. His passion is building efficient and effective solutions on cloud, especially involving analytics, AI, data science, and machine learning. Besides family time, he likes reading and watching movies. He is a foodie.

Sebastian Muah is a Solutions Architect at AWS focused on analytics, AI/ML, and big data. He has over 25 years of experience in information technology and helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS. He enjoys cycling and building things around his home.

Build your data pipeline in your AWS modern data platform using AWS Lake Formation, AWS Glue, and dbt Core

Post Syndicated from Benjamin Menuet original https://aws.amazon.com/blogs/big-data/build-your-data-pipeline-in-your-aws-modern-data-platform-using-aws-lake-formation-aws-glue-and-dbt-core/

dbt has established itself as one of the most popular tools in the modern data stack, and is aiming to bring analytics engineering to everyone. The dbt tool makes it easy to develop and implement complex data processing pipelines, with mostly SQL, and it provides developers with a simple interface to create, test, document, evolve, and deploy their workflows. For more information, see docs.getdbt.com.

dbt primarily targets cloud data warehouses such as Amazon Redshift or Snowflake. Now, you can use dbt against AWS data lakes, thanks to the following two services:

In this post, you’ll learn how to deploy a data pipeline in your modern data platform using the dbt-glue adapter built by the AWS Professional Services team in collaboration with dbtlabs.

With this new open-source, battle-tested dbt AWS Glue adapter, developers can now use dbt for their data lakes, paying for just the compute they need, with no need to shuffle data around. They still have access to everything that makes dbt great, including the local developer experience, documentation, tests, incremental data processing, Git integration, CI/CD, and more.

Solution overview

The following diagram shows the architecture of the solution.

The steps in this workflow are as follows:

  1. The data team configures a local Python virtual environment and creates a data pipeline with dbt.
  2. The dbt-glue adapter uses Lake Formation to perform all structure manipulation, like creation of database, tables. or views.
  3. The dbt-glue adapter uses AWS Glue interactive sessions as the backend for processing your data.
  4. All data is stored in Amazon Simple Storage Service (Amazon S3) as Parquet open file format.
  5. The data team can now query all data stored in the data lake using Amazon Athena.

Walkthrough overview

For this post, you run a data pipeline that creates indicators based on NYC taxi data by following these steps:

  1. Deploy the provided AWS CloudFormation stack in Region us-east-1.
  2. Configure your Amazon CloudShell environment.
  3. Install dbt, the dbt CLI, and the dbt adaptor.
  4. Use CloudShell to clone the project and configure it to use your account’s configuration.
  5. Run dbt to implement the data pipeline.
  6. Query the data with Athena.

For our use case, we use the data from the New York City Taxi Records dataset. This dataset is available in the Registry of Open Data on AWS (RODA), which is a repository containing public datasets from AWS resources.

The CloudFormation template creates the nyctaxi database in your AWS Glue Data Catalog and a table (records) that points to the public dataset. You don’t need to host the data in your account.

Prerequisites

The CloudFormation template used by this project configures the AWS Identity and Access Management (IAM) role GlueInteractiveSessionRole with all the mandatory permissions.

For more details on permissions for AWS Glue interactive sessions, refer to Securing AWS Glue interactive sessions with IAM.

Deploy resources with AWS CloudFormation

The CloudFormation stack deploys all the required infrastructure:

  • An IAM role with all the mandatory permissions to run an AWS Glue interactive session and the dbt-glue adapter.
  • An AWS Glue database and table to store the metadata related to the NYC taxi records dataset
  • An S3 bucket to use as output and store the processed data
  • An Athena configuration (a workgroup and S3 bucket to store the output) to explore the dataset
  • An AWS Lambda function as an AWS CloudFormation custom resource that updates all the partitions in the AWS Glue table

To create these resources, choose Launch Stack and follow the instructions:

Configure the CloudShell environment

To start working with the shell, complete the following steps:

  1. Sign in to the AWS Management Console and launch CloudShell using either one of the following two methods:
    1. Choose the CloudShell icon on the console navigation bar.
    2. Enter cloudshell in the Find Services box and then choose the CloudShell option.
  2. Because dbt and the dbt-glue adapter are compatible with Python versions 3.7, 3.8, and 3.9, check the version of Python:
    $ python3 --version

  3. Configure a Python virtual environment to isolate the package version and code dependencies:
    $ sudo yum install git -y
    $ python3 -m venv dbt_venv
    $ source dbt_venv/bin/activate
    $ python3 -m pip install --upgrade pip

  4. Configure the aws-glue-session package:
    $ sudo yum install gcc krb5-devel.x86_64 python3-devel.x86_64 -y
    $ pip3 install --no-cache-dir --upgrade boto3
    $ pip3 install --no-cache-dir --upgrade aws-glue-sessions

Install dbt, the dbt CLI, and the dbt adaptor

The dbt CLI is a command-line interface for running dbt projects. It’s is free to use and available as an open source project. Install dbt and the dbt CLI with the following code:

$ pip3 install --no-cache-dir dbt-core

For more information, refer to How to install dbt, What is dbt?, and Viewpoint.

Install the dbt adapter with the following code:

$ pip3 install --no-cache-dir dbt-glue

Clone the project

The dbt AWS Glue interactive session demo project contains an example of a data pipeline that produces metrics based on NYC taxi dataset. Clone the project with the following code:

$ git clone https://github.com/aws-samples/dbtgluenyctaxidemo

This project comes with the following configuration example:

$ dbtgluenyctaxidemo/profile/profiles.yml

The following table summarizes the parameter options for the adaptor.

Option Description Mandatory
project_name The dbt project name. This must be the same as the one configured in the dbt project. yes
type The driver to use. yes
query-comment A string to inject as a comment in each query that dbt runs. no
role_arn The ARN of the interactive session role created as part of the CloudFormation template. yes
region The AWS Region were you run the data pipeline. yes
workers The number of workers of a defined workerType that are allocated when a job runs. yes
worker_type The type of predefined worker that is allocated when a job runs. Accepts a value of Standard, G.1X, or G.2X. yes
schema The schema used to organize data stored in Amazon S3. yes
database The database in Lake Formation. The database stores metadata tables in the Data Catalog. yes
session_provisioning_timeout_in_seconds The timeout in seconds for AWS Glue interactive session provisioning. yes
location The Amazon S3 location of your target data. yes
idle_timeout The AWS Glue session idle timeout in minutes. (The session stops after being idle for the specified amount of time.) no
glue_version The version of AWS Glue for this session to use. Currently, the only valid options are 2.0 and 3.0. The default value is 2.0. no
security_configuration The security configuration to use with this session. no
connections A comma-separated list of connections to use in the session. no

Run the dbt project

The objective of this sample project is to create the following four tables, which contain metrics based on the NYC taxi dataset:

  • silver_nyctaxi_avg_metrics – Basic metrics based on NYC Taxi Open Data for the year 2016
  • gold_nyctaxi_passengers_metrics – Metrics per passenger based on the silver metrics table
  • gold_nyctaxi_distance_metrics – Metrics per distance based on the silver metrics table
  • gold_nyctaxi_cost_metrics – Metrics per cost based on the silver metrics table
  1. To run the project dbt, you should be in the project folder:
    $ cd dbtgluenyctaxidemo

  2. The project requires you to set environment variables in order to run on the AWS account:
    $ export DBT_ROLE_ARN="arn:aws:iam::$(aws sts get-caller-identity --query "Account" --output text):role/GlueInteractiveSessionRole"
    $ export DBT_S3_LOCATION="s3://aws-dbt-glue-datalake-$(aws sts get-caller-identity --query "Account" --output text)-us-east-1/"

  3. Make sure the profile is set up correctly from the command line:
    $ dbt debug --profiles-dir profile

  4. Run the models with the following code:
    $ dbt run --profiles-dir profile

  5. Generate documentation for the project:
    $ dbt docs generate --profiles-dir profile

  6. View the documentation for the project:
    $ dbt docs serve --profiles-dir profile

Query data via Athena

This section demonstrates how to query the target table using Athena. To query the data, complete the following steps:

  1. On the Athena console, switch the workgroup to athena-dbt-glue-aws-blog.
  2. If the Workgroup athena-dbt-glue-aws-blog settings dialog box appears, choose Acknowledge.
  3. Use the following query to explore the metrics created by the dbt project:
    SELECT cm.avg_cost_per_minute
    , cm.avg_cost_per_distance
    , dm.avg_distance_per_duration
    , dm.year
    , dm.month
    , dm.type
    FROM "dbt_nyc_metrics"."gold_nyctaxi_distance_metrics" dm
    LEFT JOIN "dbt_nyc_metrics"."gold_nyctaxi_cost_metrics" cm
    ON dm.type = cm.type
    AND dm.year = cm.year
    AND dm.month = cm.month
    WHERE dm.type = 'yellow'
    AND dm.year = '2016'
    AND dm.month = '6'

The following screenshot shows the results of this query.

Clean Up

To clean up your environment, complete the following steps in CloudShell:

  1. Delete the database created by dbt:
    $ aws glue delete-database --name dbt_nyc_metrics

  2. Delete all generated data:
    $ aws s3 rm s3://aws-dbt-glue-datalake-$(aws sts get-caller-identity --query "Account" --output text)-us-east-1/ --recursive
    $ aws s3 rm 3://aws-athena-dbt-glue-query-results-$(aws sts get-caller-identity --query "Account" --output text)-us-east-1/ --recursive

  3. Delete the CloudFormation stack:
    $ aws cloudformation delete-stack --stack-name dbt-demo

Summary

This post demonstrates how AWS managed services are key enablers and accelerators to build a modern data platform at scale or take advantage of an existing one.

With the introduction of dbt and aws-glue-dbt-adapter, data teams can access data stored in your modern data platform using SQL statements to extract value from data.

To report a bug or request a feature, please open an issue on GitHub. If you have any questions or suggestions, leave your feedback in the comment section. If you need further assistance to optimize your modern data platform, contact your AWS account team or a trusted AWS Partner.


About the Authors

Benjamin Menuet is a Data Architect with AWS Professional Services. He helps customers develop big data and analytics solutions to accelerate their business outcomes. Outside of work, Benjamin is a trail runner and has finished some mythic races like the UTMB.

Armando Segnini is a Data Architect with AWS Professional Services. He spends his time building scalable big data and analytics solutions for AWS Enterprise and Strategic customers. Armando also loves to travel with his family all around the world and take pictures of the places he visits.

Moshir Mikael is a Senior Practice Manager with AWS Professional Services.  He led development of large enterprise data platforms in EMEA and currently leading the Professional Services teams in EMEA for analytics.

Anouar Zaaber is a Senior Engagement Manager in AWS Professional Services. He leads internal AWS teams, external partners, and customer teams to deliver AWS cloud services that enable customers to realize their business outcomes.

Introducing AWS Glue Auto Scaling: Automatically resize serverless computing resources for lower cost with optimized Apache Spark

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/introducing-aws-glue-auto-scaling-automatically-resize-serverless-computing-resources-for-lower-cost-with-optimized-apache-spark/

Data created in the cloud is growing fast in recent days, so scalability is a key factor in distributed data processing. Many customers benefit from the scalability of the AWS Glue serverless Spark runtime. Today, we’re pleased to announce the release of AWS Glue Auto Scaling, which helps you scale your AWS Glue Spark jobs automatically based on the requirements calculated dynamically during the job run, and accelerate job runs at lower cost without detailed capacity planning.

Before AWS Glue Auto Scaling, you had to predict workload patterns in advance. For example, in cases when you don’t have expertise in Apache Spark, when it’s the first time you’re processing the target data, or when the volume or variety of the data is significantly changing, it’s not so easy to predict the workload and plan the capacity for your AWS Glue jobs. Under-provisioning is error-prone and can lead to either missed SLA or unpredictable performance. On the other hand, over-provisioning can cause underutilization of resources and cost overruns. Therefore, it was a common best practice to experiment with your data, monitor the metrics, and adjust the number of AWS Glue workers before you deployed your Spark applications to production.

With AWS Glue Auto Scaling, you no longer need to plan AWS Glue Spark cluster capacity in advance. You can just set the maximum number of workers and run your jobs. AWS Glue monitors the Spark application execution, and allocates more worker nodes to the cluster in near-real time after Spark requests more executors based on your workload requirements. When there are idle executors that don’t have intermediate shuffle data, AWS Glue Auto Scaling removes the executors to save the cost.

AWS Glue Auto Scaling is available with the optimized Spark runtime on AWS Glue version 3.0, and you can start using it today. This post describes possible use cases and how it works.

Use cases and benefits for AWS Glue Auto Scaling

Traditionally, AWS Glue launches a serverless Spark cluster of a fixed size. The computing resources are held for the whole job run until it is completed. With the new AWS Glue Auto Scaling feature, after you enable it for your AWS Glue Spark jobs, AWS Glue dynamically allocates compute resource considering the given maximum number of workers. It also supports dynamic scale-out and scale-in of the AWS Glue Spark cluster size over the course of job. As more executors are requested by Spark, more AWS Glue workers are added to the cluster. When the executor has been idle without active computation tasks for a period of time and associated shuffle dependencies, the executor and corresponding worker are removed.

AWS Glue Auto Scaling makes it easy to run your data processing in the following typical use cases:

  • Batch jobs to process unpredictable amounts of data
  • Jobs containing driver-heavy workloads (for example, processing many small files)
  • Jobs containing multiple stages with uneven compute demands or due to data skews (for example, reading from a data store, repartitioning it to have more parallelism, and then processing further analytic workloads)
  • Jobs to write large amouns of data into data warehouses such as Amazon Redshift or read and write from databases

Configure AWS Glue Auto Scaling

AWS Glue Auto Scaling is available with the optimized Spark runtime on Glue version 3.0. To enable Auto Scaling on the AWS Glue Studio console, complete the following steps:

  1. Open AWS Glue Studio.
  2. Choose Jobs.
  3. Choose your job.
  4. Choose the Job details tab.
  5. For Glue version, choose Glue 3.0 – Supports spark 3.1, Scala 2, Python.
  6. Select Automatically scale the number of workers.
  7. For Maximum number of workers, enter the maximum workers that can be vended to the job run.
  8. Choose Save.

To enable Auto Scaling in the AWS Glue API or AWS Command Line Interface (AWS CLI), set the following job parameters:

  • Key--enable-auto-scaling
  • Valuetrue

Monitor AWS Glue Auto Scaling

In this section, we discuss three ways to monitor AWS Glue Auto Scaling: via Amazon CloudWatch metrics or Spark UI.

CloudWatch metrics

After you enable AWS Glue Auto Scaling, Spark dynamic allocation is enabled and the executor metrics are visible in CloudWatch. You can review the following metrics to understand the demand and optimized usage of executors in their Spark applications enabled with Auto Scaling:

  • glue.driver.ExecutorAllocationManager.executors.numberAllExecutors
  • glue.driver.ExecutorAllocationManager.executors.numberMaxNeededExecutors

AWS Glue Studio Monitoring page

In the Monitoring page in AWS Glue Studio, you can monitor the DPU hours you spent for a specific job run. The following screenshot shows two job runs that processed the same dataset; one without Auto Scaling which spent 8.71 DPU hours, and another one with Auto Scaling enabled which spent only 1.48 DPU hours. The DPU hour values per job run are also available with GetJobRun API responses.

Spark UI

With the Spark UI, you can monitor that the AWS Glue Spark cluster dynamically scales out and scales in with AWS Glue Auto Scaling. The event timeline shows when each executor is added and removed gradually over the Spark application run.

In the following sections, we demonstrate AWS Glue Auto Scaling with two use cases: jobs with driver-heavy workloads, and jobs with multiple stages.

Example 1: Jobs containing driver-heavy workloads

A typical workload for AWS Glue Spark jobs is to process many small files to prepare the data for further analysis. For such workloads, AWS Glue has built-in optimizations, including file grouping, a Glue S3 Lister, partition pushdown predicates, partition indexes, and more. For more information, see Optimize memory management in AWS Glue. All those optimizations execute on the Spark driver and speed up the planning phase on Spark driver to compute and distribute the work for parallel processing with Spark executors. However, without AWS Glue Auto Scaling, Spark executors are idle during the planning phase. With Auto Scaling, Glue jobs only allocate executors when the driver work is complete, thereby saving executor cost.

Here’s the example DAG shown in AWS Glue Studio. This AWS Glue job reads from an Amazon Simple Storage Service (Amazon S3) bucket, performs the ApplyMapping transformation, runs a simple SELECT query repartitioning data to have 800 partitions, and writes back to another location in Amazon S3.

Without AWS Glue Auto Scaling

The following screenshot shows the executor timeline in Spark UI when the AWS Glue job ran with 20 workers without Auto Scaling. You can confirm that all 20 workers started at the beginning of the job run.

With AWS Glue Auto Scaling

In contrast, the following screenshot shows the executor timeline of the same job with Auto Scaling enabled and the maximum workers set to 20. The driver and one executor started at the beginning, and other executors started only after the driver finished its computation for listing 367,920 partitions on the S3 bucket. These 19 workers were not charged during the long-running driver task.

Both jobs completed in 44 minutes. With AWS Glue Auto Scaling, the job completed in the same amount of time with lower cost.

Example 2: Jobs containing multiple stages

Another typical workload in AWS Glue is to read from the data store or large compressed files, repartition it to have more parallelism for downstream processing, and process further analytic queries. For example, when you want to read from a JDBC data store, you may not want to have many concurrent connections, so you can avoid impacting source database performance. For such workloads, you can have a small number of connections to read data from the JDBC data store, then repartition the data with higher parallelism for further analysis.

Here’s the example DAG shown in AWS Glue Studio. This AWS Glue job reads from the JDBC data source, runs a simple SELECT query adding one more column (mod_id) calculated from the column ID, performs the ApplyMapping node, then writes to an S3 bucket with partitioning by this new column mod_id. Note that the JDBC data source was already registered in the AWS Glue Data Catalog, and the table has two parameters, hashfield=id and hashpartitions=5, to read from JDBC through five concurrent connections.

Without AWS Glue Auto Scaling

The following screenshot shows the executor timeline in the Spark UI when the AWS Glue job ran with 20 workers without Auto Scaling. You can confirm that all 20 workers started at the beginning of the job run.

With AWS Glue Auto Scaling

The following screenshot shows the same executor timeline in the Spark UI with Auto Scaling enabled with 20 maximum workers. The driver and two executors started at the beginning, and other executors started later. The first two executors read data from the JDBC source with fewer number of concurrent connections. Later, the job increased parallelism and more executors were started. You can also observe that there were 16 executors, not 20, which further reduced cost.

Conclusion

This post discussed AWS Glue Auto Scaling, which automatically resizes the computing resources of your AWS Glue Spark job capacity and reduce cost. You can start using AWS Glue Auto Scaling for both your existing workloads and future new workloads, and take advantage of it today! For more information about AWS Glue Auto Scaling, see Using Auto Scaling for AWS Glue. Migrate your jobs to Glue version 3.0 and get the benefits of Auto Scaling.

Special thanks to everyone who contributed to the launch: Raghavendhar Thiruvoipadi Vidyasagar, Ping-Yao Chang, Shashank Bhardwaj, Sampath Shreekantha, Vaibhav Porwal, and Akash Gupta.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is passionate about architecting fast-growing data platforms, diving deep into distributed big data software like Apache Spark, building reusable software artifacts for data lakes, and sharing the knowledge in AWS Big Data blog posts. In his spare time, he enjoys taking care of killifish, hermit crabs, and grubs with his children.

Bo Li is a Software Development Engineer on the AWS Glue team. He is devoted to designing and building end-to-end solutions to address customers’ data analytic and processing needs with cloud-based, data-intensive technologies.

Rajendra Gujja is a Software Development Engineer on the AWS Glue team. He is passionate about distributed computing and everything and anything about data.

Mohit Saxena is a Senior Software Development Manager on the AWS Glue team. His team works on distributed systems for efficiently managing data lakes on AWS and optimizes Apache Spark for performance and reliability.

Enhance analytics with Google Trends data using AWS Glue, Amazon Athena, and Amazon QuickSight

Post Syndicated from Drew Philip original https://aws.amazon.com/blogs/big-data/enhance-analytics-with-google-trends-data-using-aws-glue-amazon-athena-and-amazon-quicksight/

In today’s market, business success often lies in the ability to glean accurate insights and predictions from data. However, data scientists and analysts often find that the data they have at their disposal isn’t enough to help them make accurate predictions for their use cases. A variety of factors might alter an outcome and should be taken into account when making a prediction model. Google Trends is an available option, presenting a broad source of data that reflects global trends more comprehensively. This can help enrich a dataset to yield a better model.

You can use Google Trends data for a variety of analytical use cases. For example, you can use it to learn about how your products or brands are faring among targeted audiences. You can also use it to monitor competitors and see how well they’re performing against your brand.

In this post, we shows how to get Google Trends data programmatically, integrate it into a data pipeline, and use it to analyze data, using Amazon Simple Storage Service (Amazon S3), AWS Glue, Amazon Athena, and Amazon QuickSight. We use an example dataset of movies and TV shows and demonstrate how to get the search queries from Google Trends to analyze the popularity of movies and TV shows.

Solution overview

The following diagram shows a high-level architecture of the solution using Amazon S3, AWS Glue, the Google Trends API, Athena, and QuickSight.

The solution consists of the following components:

  1. Amazon S3 – The storage layer that stores the list of topics for which Google Trends data has to be gathered. It also stores the results returned by Google Trends.
  2. AWS Glue – The serverless data integration service that calls Google Trends for the list of topics to get the search results, aggregates the data, and loads it to Amazon S3.
  3. Athena – The query engine that allows you to query the data stored in Amazon S3. You can use it for supporting one-time SQL queries on Google Trends data and for building dashboards using tools like QuickSight.
  4. QuickSight – The reporting tool used for building visualizations.

In the following sections, we walk through the steps to set up the environment, download the libraries, create and run the AWS Glue job, and explore the data.

Set up your environment

Complete the following steps to set up your environment:

  1. Create an S3 bucket where you upload the list of movies and TV shows. For this post, we use a Netflix Movies and TV Shows public dataset from Kaggle.
  2. Create an AWS Identity and Access Management (IAM) service role that allows AWS Glue to read and write data to the S3 buckets you just created.
  3. Create a new QuickSight account with the admin/author role and access granted to Athena and Amazon S3.

Download the external libraries and dependencies for the AWS Glue Job

The AWS Glue job needs the following two external Python libraries: pytrends and awswrangler. pytrends is a library that provides a simple interface for automating the downloading of reports from Google Trends. awswrangler is a library provided by AWS to integrate data between a Pandas DataFrame and AWS repositories like Amazon S3.

Download the following .whl files for the libraries and upload them to Amazon S3:

Create and configure an AWS Glue job

To set up your AWS Glue job, complete the following steps:

  1. On the AWS Glue console, under ETL in the navigation pane, choose Jobs – New.
  2. For Create job, select Python Shell script editor.
  3. For Options, select Create a new script with boilerplate code.
  4. Choose Create.
  5. On the Script tab, enter the following script, replacing the source and target buckets with your bucket names:
    # Import external library TrendReq needed to connect to Google Trends API and library awswrangler to read/write from pandas to Amazon S3.
    
    from pytrends.request import TrendReq
    pytrend = TrendReq(hl='en-US', tz=360, timeout=10) 
    import pandas as pd
    import awswrangler as wr
    
    # Function get_gtrend, accepts a list of terms as input, calls Google Trends API for each term to get the search trends 
    def get_gtrend(terms):
      trends =[]
      for term in terms:
    # Normalizing the data using popular movie Titanic as baseline to get trends over time.
        pytrend.build_payload(kw_list=["Titanic",term.lower()])
        df = pytrend.interest_over_time()
        df["google_trend"] = round((df[term.lower()] /df['Titanic']) *100)
        
    # Transforming and filtering trends results to align with Analytics use case
        df_trend = df.loc[df.index >= "2018-1-1", "google_trend"].resample(rule="M").max().to_frame()
        df_trend["movie"] = term
        trends.append(df_trend.reset_index())
    
    # Last step in function to concatenate the results for each term and return an aggregated dataset 
      concat_df = pd.concat(trends)
      return concat_df
    
    def main():
      
    # Change the bucket and prefix name to Amazon S3 location where movie titles file from Kaggle has been downloaded. 
      source_bucket = "source_bucket"
      source_prefix = "source_prefix"
    
    # Awswrangler method s3.read_csv is called to load the titles from S3 location into a DataFrame and convert it to a list.
      df = wr.s3.read_csv(f's3://{source_bucket}/{source_prefix}/')
      movies = df['title'].head(20).values.tolist()
    
    #  Call the get_trends function and pass the list of movies as an input. Pandas dataframe is returned with trend data for movies.
      df = get_gtrend(terms=movies)
    
    # Change the prefix name to location where you want to store results. 
      target_bucket = "target_bucket" 
      target_prefix = "target_prefix" 
    
    # Use awswrangler to save pandas dataframe to Amazon S3. 
      wr.s3.to_csv(df,f's3://{target_bucket}/{target_prefix}/trends.csv',index= False)
    
    
    # Invoke the main function
    main()

  6. On the Job details tab, for Name, enter the name of the AWS Glue job.
  7. For IAM Role, choose the role that you created earlier with permissions to run the job and access Amazon S3.
  8. For Type, enter Python Shell to run the Python code.
  9. For Python Version, specify the Python version as Python 3.6.
  10. For Data processing units, choose 1 DPU.
  11. For Number of retries, enter .
  12. Expand Advanced properties and under Libraries, enter the location of the S3 bucket where the pytrends and awswrangler files were downloaded.
  13. Choose Save to save the job.

Run the AWS Glue job

Navigate to the AWS Glue console and run the AWS Glue job you created. When the job is complete, a CSV file with the Google Trends values is created in the target S3 bucket with the prefix specified in the main() function. In the next step, we create an AWS Glue table referring to the target bucket and prefix to allow queries to be run against the Google Trends data.

Create an AWS Glue table on the Google Trends data

In this step, we create a table in the AWS Glue Data Catalog using Athena. The table is created on top of the Google Trends data saved in the target S3 bucket.

In the Athena query editor, select default as the database and enter the following DDL command to create a table named trends. Replace the target bucket and prefix with your own values.

CREATE EXTERNAL TABLE `trends`(
  `date` date, 
  `google_trend` double, 
  `title` string)
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY ',' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://<< target_bucket >>/<<target_prefix >>/'
TBLPROPERTIES (
  'has_encrypted_data'='false', 
  'skip.header.line.count'='1')

This table has three columns:

  • date – The time dimension for aggregating the data. In this example, the time period is monthly.
  • google_trend – The count of Google Trends values normalized on a scale of 0–100.
  • title – The name of the movie or TV show.

Query the data using Athena

Now you can run one-time queries to find the popularity of movies and TV shows.

In the first example, we find the top 10 most popular movies and TV shows for November 2021. In the Athena query editor, enter the following SQL command to query the trends table created in the previous step:

select title,google_trend
from trends 
where date = date_parse('2021-11-30','%Y-%m-%d')
order by google_trend desc
limit 10

In the following example, we find the top 10 most popular movies and TV shows that have grown most in popularity in 2021 until November 30. In the Athena query editor, enter the following SQL command to query the trends table:

select  title,max(google_trend)-min(google_trend) trend_diff
from trends
where date between date_parse('2021-01-31','%Y-%m-%d') and date_parse('2021-11-30','%Y-%m-%d')
group by title
order by 2 desc
limit 10

Build a dashboard to visualize the data using QuickSight

We can use QuickSight to build a dashboard on the data downloaded from Google Trends to identify top movies and TV shows. Complete the following steps:

  1. Sign in to your QuickSight account.
  2. On the QuickSight console, choose Datasets and choose New dataset.
  3. Choose Athena as your data source.
  4. For Data source name, enter a name.
  5. For Athena workgroup, choose [primary].
  6. Choose Create data source.
  7. For Database, choose default.
  8. For Tables, select the trends table.
  9. Choose Select.
  10. Select Directly query your data.
  11. Choose Visualize.

For the first visual, we create a bar chart of the top movies or TV shows by title sorted in ascending order of aggregated Google Trends values.

  1. Choose the horizontal bar chart visual type.
  2. For Y axis, choose title.
  3. For Value, choose google_trend (Average).

Next, we create a time series plot of Google Trends count by month for titles.

  1. Add a new visual and choose the autograph visual type.
  2. For X axis, choose date.
  3. For Value, choose google_trend (Sum).
  4. For Color¸ choose title.

Clean up

To avoid incurring future charges, delete the resources you created for AWS Glue, Amazon S3, IAM, and QuickSight.

  1. AWS Glue Catalog table
    • On the AWS Glue console, choose Tables under Databases in the navigation pane.
    • Select the AWS Glue Data Catalog table that you created.
    • On the Actions drop-down menu, choose Delete.
    • Choose Delete to confirm.
  2. AWS Glue Job
    • Choose Jobs in the navigation pane.
    • Select the AWS Glue job you created.
    • On the Actions drop-down menu, choose Delete.
  3. S3 bucket
    • On the Amazon S3 console, choose Buckets in navigation pane.
    • Choose the bucket you created.
    • Choose Empty and enter your bucket name.
    • Choose Confirm.
    • Choose Delete and enter your bucket name.
    • Choose Delete bucket.
  4. IAM Role
    • On the IAM console, choose Roles in navigation pane.
    • Choose the role you attached to AWS Glue job.
    • Choose Delete role.
    • Choose Yes.
  5. Amazon QuickSight
    • If you created a QuickSight user for trying out this blog and do not want to retain that access, please ask your QuickSight admin to delete your user.
    • If you created the QuickSight account itself just for trying this blog and no longer want to retain it, use following steps to delete it.
    • Choose your user name on the application bar, and then choose Manage QuickSight
    • Choose Account settings.
    • Choose Delete Account.

You can only have one QuickSight account active for each AWS account. Make sure that other users aren’t using QuickSight before you delete the account.

Conclusion

Integrating external data sources such as Google Trends via AWS Glue, Athena, and QuickSight can help you enrich your datasets to yield greater insights. You can use it in a data science context when the model is under-fit and requires more relevant data in order to make better predictions. In this post, we used movies as an example, but the solution extends to a wide breadth of industries, such as products in a retail context or commodities in a finance context. If the simple inventory histories or the transaction dates are available, you may find little correlation to future demand or prices. But with an integrated data pipeline using external data, new relationships in the dataset make the model more reliable.

In a business context, whether your team wants to test out a machine learning (ML) proof of concept more quickly or have limited access to pertinent data, Google Trends integration is a relatively quick way to enrich your data for the purposes of ML and data insights.

You can also extend this concept to other third-party datasets, such as social media sentiment, as your team’s expertise grows and your ML and analytics operations mature. Integrating external datasets such as Google Trends is just one part of the feature and data engineering process, but it’s a great place to start and, in our experience, most often leads to better models that businesses can innovate from.


About the Authors

Drew Philip is a Sr. Solutions Architect with AWS Private Equity. He has held senior
technical leadership positions within key AWS partners such as Microsoft, Oracle, and
Rackspace. Drew focuses on applied engineering that leverages AI-enabled digital innovation and development, application modernization, resiliency and operational excellence for workloads at scale in the public and private sector. He sits on the board of Calvin University’s computer science department and is a contributing member of the AWS Machine Learning Technical Focus Community.

Gautam Prothia is a Senior Solution Architect within AWS dedicated to Strategic Accounts. Gautam has more than 15+ years of experience designing and implementing large-scale data management and analytical solutions. He has worked with many clients across industries to help them modernize their data platforms on the cloud.

Simon Zamarin is an AI/ML Solutions Architect whose main focus is helping customers extract value from their data assets. In his spare time, Simon enjoys spending time with family, reading sci-fi, and working on various DIY house projects.

Develop and test AWS Glue version 3.0 jobs locally using a Docker container

Post Syndicated from Subramanya Vajiraya original https://aws.amazon.com/blogs/big-data/develop-and-test-aws-glue-version-3-0-jobs-locally-using-a-docker-container/

AWS Glue is a fully managed serverless service that allows you to process data coming through different data sources at scale. You can use AWS Glue jobs for various use cases such as data ingestion, preprocessing, enrichment, and data integration from different data sources. AWS Glue version 3.0, the latest version of AWS Glue Spark jobs, provides a performance-optimized Apache Spark 3.1 runtime experience for batch and stream processing.

You can author AWS Glue jobs in different ways. If you prefer coding, AWS Glue allows you to write Python/Scala source code with the AWS Glue ETL library. If you prefer interactive scripting, AWS Glue interactive sessions and AWS Glue Studio notebooks helps you to write scripts in notebooks by inspecting and visualizing the data. If you prefer a graphical interface rather than coding, AWS Glue Studio helps you author data integration jobs visually without writing code.

For a production-ready data platform, a development process and CI/CD pipeline for AWS Glue jobs is key. We understand the huge demand for developing and testing AWS Glue jobs where you prefer to have flexibility, a local laptop, a Docker container on Amazon Elastic Compute Cloud (Amazon EC2), and so on. You can achieve that by using AWS Glue Docker images hosted on Docker Hub or the Amazon Elastic Container Registry (Amazon ECR) Public Gallery. The Docker images help you set up your development environment with additional utilities. You can use your preferred IDE, notebook, or REPL using the AWS Glue ETL library.

This post is a continuation of blog post “Developing AWS Glue ETL jobs locally using a container“. While the earlier post introduced the pattern of development for AWS Glue ETL Jobs on a Docker container using a Docker image, this post focuses on how to develop and test AWS Glue version 3.0 jobs using the same approach.

Solution overview

The following Docker images are available for AWS Glue on Docker Hub:

  • AWS Glue version 3.0amazon/aws-glue-libs:glue_libs_3.0.0_image_01
  • AWS Glue version 2.0amazon/aws-glue-libs:glue_libs_2.0.0_image_01

You can also obtain the images from the Amazon ECR Public Gallery:

  • AWS Glue version 3.0public.ecr.aws/glue/aws-glue-libs:glue_libs_3.0.0_image_01
  • AWS Glue version 2.0public.ecr.aws/glue/aws-glue-libs:glue_libs_2.0.0_image_01

Note: AWS Glue Docker images are x86_64 compatible and arm64 hosts are currently not supported.

In this post, we use amazon/aws-glue-libs:glue_libs_3.0.0_image_01 and run the container on a local machine (Mac, Windows, or Linux). This container image has been tested for AWS Glue version 3.0 Spark jobs. The image contains the following:

  • Amazon Linux
  • AWS Glue ETL Library (aws-glue-libs)
  • Apache Spark 3.1.1
  • Spark history server
  • JupyterLab
  • Livy
  • Other library dependencies (the same as the ones of the AWS Glue job system)

To set up your container, you pull the image from Docker Hub and then run the container. We demonstrate how to run your container with the following methods, depending on your requirements:

  • spark-submit
  • REPL shell (pyspark)
  • pytest
  • JupyterLab
  • Visual Studio Code

Prerequisites

Before you start, make sure that Docker is installed and the Docker daemon is running. For installation instructions, see the Docker documentation for Mac, Windows, or Linux. Also make sure that you have at least 7 GB of disk space for the image on the host running Docker.

For more information about restrictions when developing AWS Glue code locally, see Local Development Restrictions.

Configure AWS credentials

To enable AWS API calls from the container, set up your AWS credentials with the following steps:

  1. Create an AWS named profile.
  2. Open cmd on Windows or a terminal on Mac/Linux, and run the following command:
    PROFILE_NAME="profile_name"

In the following sections, we use this AWS named profile.

Pull the image from Docker Hub

If you’re running Docker on Windows, choose the Docker icon (right-click) and choose Switch to Linux containers… before pulling the image.

Run the following command to pull the image from Docker Hub:

docker pull amazon/aws-glue-libs:glue_libs_3.0.0_image_01

Run the container

Now you can run a container using this image. You can choose any of following methods based on your requirements.

spark-submit

You can run an AWS Glue job script by running the spark-submit command on the container.

Write your ETL script (sample.py in the example below) and save it under the /local_path_to_workspace/src/ directory using the following commands:

$ WORKSPACE_LOCATION=/local_path_to_workspace
$ SCRIPT_FILE_NAME=sample.py
$ mkdir -p ${WORKSPACE_LOCATION}/src
$ vim ${WORKSPACE_LOCATION}/src/${SCRIPT_FILE_NAME}

These variables are used in the docker run command below. The sample code (sample.py) used in the spark-submit command below is included in the appendix at the end of this post.

Run the following command to run the spark-submit command on the container to submit a new Spark application:

$ docker run -it -v ~/.aws:/home/glue_user/.aws -v $WORKSPACE_LOCATION:/home/glue_user/workspace/ -e AWS_PROFILE=$PROFILE_NAME -e DISABLE_SSL=true --rm -p 4040:4040 -p 18080:18080 --name glue_spark_submit amazon/aws-glue-libs:glue_libs_3.0.0_image_01 spark-submit /home/glue_user/workspace/src/$SCRIPT_FILE_NAME
...22/01/26 09:08:55 INFO DAGScheduler: Job 0 finished: fromRDD at DynamicFrame.scala:305, took 3.639886 s
root
|-- family_name: string
|-- name: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- gender: string
|-- image: string
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- other_names: array
| |-- element: struct
| | |-- lang: string
| | |-- note: string
| | |-- name: string
|-- sort_name: string
|-- images: array
| |-- element: struct
| | |-- url: string
|-- given_name: string
|-- birth_date: string
|-- id: string
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
|-- death_date: string

...

REPL shell (pyspark)

You can run a REPL (read-eval-print loop) shell for interactive development. Run the following command to run the pyspark command on the container to start the REPL shell:

$ docker run -it -v ~/.aws:/home/glue_user/.aws -e AWS_PROFILE=$PROFILE_NAME -e DISABLE_SSL=true --rm -p 4040:4040 -p 18080:18080 --name glue_pyspark amazon/aws-glue-libs:glue_libs_3.0.0_image_01 pyspark
...
 ____ __
 / __/__ ___ _____/ /__
 _\ \/ _ \/ _ `/ __/ '_/
 /__ / .__/\_,_/_/ /_/\_\  version 3.1.1-amzn-0
 /_/

Using Python version 3.7.10 (default, Jun 3 2021 00:02:01)
Spark context Web UI available at http://56e99d000c99:4040
Spark context available as 'sc' (master = local[*], app id = local-1643011860812).
SparkSession available as 'spark'.
>>> 

pytest

For unit testing, you can use pytest for AWS Glue Spark job scripts.

Run the following commands for preparation:

$ WORKSPACE_LOCATION=/local_path_to_workspace
$ SCRIPT_FILE_NAME=sample.py
$ UNIT_TEST_FILE_NAME=test_sample.py
$ mkdir -p ${WORKSPACE_LOCATION}/tests
$ vim ${WORKSPACE_LOCATION}/tests/${UNIT_TEST_FILE_NAME}

Run the following command to run pytest on the test suite:

$ docker run -it -v ~/.aws:/home/glue_user/.aws -v $WORKSPACE_LOCATION:/home/glue_user/workspace/ -e AWS_PROFILE=$PROFILE_NAME -e DISABLE_SSL=true --rm -p 4040:4040 -p 18080:18080 --name glue_pytest amazon/aws-glue-libs:glue_libs_3.0.0_image_01 -c "python3 -m pytest"
starting org.apache.spark.deploy.history.HistoryServer, logging to /home/glue_user/spark/logs/spark-glue_user-org.apache.spark.deploy.history.HistoryServer-1-5168f209bd78.out
============================================================= test session starts =============================================================
platform linux -- Python 3.7.10, pytest-6.2.3, py-1.11.0, pluggy-0.13.1
rootdir: /home/glue_user/workspace
plugins: anyio-3.4.0
collected 1 item  

tests/test_sample.py . [100%]

============================================================== warnings summary ===============================================================
tests/test_sample.py::test_counts
 /home/glue_user/spark/python/pyspark/sql/context.py:79: DeprecationWarning: Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead.
 DeprecationWarning)

-- Docs: https://docs.pytest.org/en/stable/warnings.html
======================================================== 1 passed, 1 warning in 21.07s ========================================================

JupyterLab

You can start Jupyter for interactive development and ad hoc queries on notebooks. Complete the following steps:

  1. Run the following command to start JupyterLab:
    $ JUPYTER_WORKSPACE_LOCATION=/local_path_to_workspace/jupyter_workspace/
    $ docker run -it -v ~/.aws:/home/glue_user/.aws -v $JUPYTER_WORKSPACE_LOCATION:/home/glue_user/workspace/jupyter_workspace/ -e AWS_PROFILE=$PROFILE_NAME -e DISABLE_SSL=true --rm -p 4040:4040 -p 18080:18080 -p 8998:8998 -p 8888:8888 --name glue_jupyter_lab amazon/aws-glue-libs:glue_libs_3.0.0_image_01 /home/glue_user/jupyter/jupyter_start.sh
    ...
    [I 2022-01-24 08:19:21.368 ServerApp] Serving notebooks from local directory: /home/glue_user/workspace/jupyter_workspace
    [I 2022-01-24 08:19:21.368 ServerApp] Jupyter Server 1.13.1 is running at:
    [I 2022-01-24 08:19:21.368 ServerApp] http://faa541f8f99f:8888/lab
    [I 2022-01-24 08:19:21.368 ServerApp] or http://127.0.0.1:8888/lab
    [I 2022-01-24 08:19:21.368 ServerApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).

  2. Open http://127.0.0.1:8888/lab in your web browser in your local machine to access the JupyterLab UI.
  3. Choose Glue Spark Local (PySpark) under Notebook.

Now you can start developing code in the interactive Jupyter notebook UI.

Visual Studio Code

To set up the container with Visual Studio Code, complete the following steps:

  1. Install Visual Studio Code.
  2. Install Python.
  3. Install Visual Studio Code Remote – Containers.
  4. Open the workspace folder in Visual Studio Code.
  5. Choose Settings.
  6. Choose Workspace.
  7. Choose Open Settings (JSON).
  8. Enter the following JSON and save it:
    {
        "python.defaultInterpreterPath": "/usr/bin/python3",
        "python.analysis.extraPaths": [
            "/home/glue_user/aws-glue-libs/PyGlue.zip:/home/glue_user/spark/python/lib/py4j-0.10.9-src.zip:/home/glue_user/spark/python/",
        ]
    }

Now you’re ready to set up the container.

  1. Run the Docker container:
    $ docker run -it -v ~/.aws:/home/glue_user/.aws -v $WORKSPACE_LOCATION:/home/glue_user/workspace/ -e AWS_PROFILE=$PROFILE_NAME -e DISABLE_SSL=true --rm -p 4040:4040 -p 18080:18080 --name glue_pyspark amazon/aws-glue-libs:glue_libs_3.0.0_image_01 pyspark

  2. Start Visual Studio Code.
  3. Choose Remote Explorer in the navigation pane, and choose the container amazon/aws-glue-libs:glue_libs_3.0.0_image_01.
  4. Right-click and choose Attach to Container.
  5. If the following dialog appears, choose Got it.
  6. Open /home/glue_user/workspace/.
  7. Create an AWS Glue PySpark script and choose Run.

You should see the successful run on the AWS Glue PySpark script.

Conclusion

In this post, we learned how to get started on AWS Glue Docker images. AWS Glue Docker images help you develop and test your AWS Glue job scripts anywhere you prefer. It is available on Docker Hub and Amazon ECR Public Gallery. Check it out, we look forward to getting your feedback.

Appendix: AWS Glue job sample codes for testing

This appendix introduces three different scripts as AWS Glue job sample codes for testing purposes. You can use any of them in the tutorial.

The following sample.py code uses the AWS Glue ETL library with an Amazon Simple Storage Service (Amazon S3) API call. The code requires Amazon S3 permissions in AWS Identity and Access Management (IAM). You need to grant the IAM-managed policy arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess or IAM custom policy that allows you to make ListBucket and GetObject API calls for the S3 path.

import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions


class GluePythonSampleTest:
    def __init__(self):
        params = []
        if '--JOB_NAME' in sys.argv:
            params.append('JOB_NAME')
        args = getResolvedOptions(sys.argv, params)

        self.context = GlueContext(SparkContext.getOrCreate())
        self.job = Job(self.context)

        if 'JOB_NAME' in args:
            jobname = args['JOB_NAME']
        else:
            jobname = "test"
        self.job.init(jobname, args)

    def run(self):
        dyf = read_json(self.context, "s3://awsglue-datasets/examples/us-legislators/all/persons.json")
        dyf.printSchema()

        self.job.commit()


def read_json(glue_context, path):
    dynamicframe = glue_context.create_dynamic_frame.from_options(
        connection_type='s3',
        connection_options={
            'paths': [path],
            'recurse': True
        },
        format='json'
    )
    return dynamicframe


if __name__ == '__main__':
    GluePythonSampleTest().run()z
	

The following test_sample.py code is a sample for a unit test of sample.py:

import pytest
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
import sys
from src import sample


@pytest.fixture(scope="module", autouse=True)
def glue_context():
    sys.argv.append('--JOB_NAME')
    sys.argv.append('test_count')

    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    context = GlueContext(SparkContext.getOrCreate())
    job = Job(context)
    job.init(args['JOB_NAME'], args)

    yield(context)

    job.commit()


def test_counts(glue_context):
    dyf = sample.read_json(glue_context, "s3://awsglue-datasets/examples/us-legislators/all/persons.json")
    assert dyf.toDF().count() == 1961

About the Authors

Subramanya Vajiraya is a Cloud Engineer (ETL) at AWS Sydney specialized in AWS Glue. He is passionate about helping customers solve issues related to their ETL workload and implement scalable data processing and analytics pipelines on AWS. Outside of work, he enjoys going on bike rides and taking long walks with his dog Ollie, a 1-year-old Corgi.

Vishal Pathak is a Data Lab Solutions Architect at AWS. Vishal works with customers on their use cases, architects solutions to solve their business problems, and helps them build scalable prototypes. Prior to his journey in AWS, Vishal helped customers implement business intelligence, data warehouse, and data lake projects in the US and Australia.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He enjoys learning different use cases from customers and sharing knowledge about big data technologies with the wider community.

Best practices to optimize data access performance from Amazon EMR and AWS Glue to Amazon S3

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/best-practices-to-optimize-data-access-performance-from-amazon-emr-and-aws-glue-to-amazon-s3/

Customers are increasingly building data lakes to store data at massive scale in the cloud. It’s common to use distributed computing engines, cloud-native databases, and data warehouses when you want to process and analyze your data in data lakes. Amazon EMR and AWS Glue are two key services you can use for such use cases. Amazon EMR is a managed big data framework that supports several different applications, including Apache Spark, Apache Hive, Presto, Trino, and Apache HBase. AWS Glue Spark jobs run on top of Apache Spark, and distribute data processing workloads in parallel to perform extract, transform, and load (ETL) jobs to enrich, denormalize, mask, and tokenize data on a massive scale.

For data lake storage, customers typically use Amazon Simple Storage Service (Amazon S3) because it’s secure, scalable, durable, and highly available. Amazon S3 is designed for 11 9’s of durability and stores over 200 trillion objects for millions of applications around the world, making it the ideal storage destination for your data lake. Amazon S3 averages over 100 million operations per second, so your applications can easily achieve high request rates when using Amazon S3 as your data lake.

This post describes best practices to achieve the performance scaling you need when analyzing data in Amazon S3 using Amazon EMR and AWS Glue. We specifically focus on optimizing for Apache Spark on Amazon EMR and AWS Glue Spark jobs.

Optimizing Amazon S3 performance for large Amazon EMR and AWS Glue jobs

Amazon S3 is a very large distributed system, and you can scale to thousands of transactions per second in request performance when your applications read and write data to Amazon S3. Amazon S3 performance isn’t defined per bucket, but per prefix in a bucket. Your applications can achieve at least 3,500 PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per prefix in a bucket. Additionally, there are no limits to the number of prefixes in a bucket, so you can horizontally scale your read or write performance using parallelization. For example, if you create 10 prefixes in an S3 bucket to parallelize reads, you could scale your read performance to 55,000 read requests per second. You can similarly scale writes by writing data across multiple prefixes.

You can scale performance by utilizing automatic scaling in Amazon S3 and scan millions of objects for queries run over petabytes of data. Amazon S3 automatically scales in response to sustained new request rates, dynamically optimizing performance. While Amazon S3 is internally optimizing for a new request rate, you receive HTTP 503 request responses temporarily until the optimization completes:

AmazonS3Exception: Please reduce your request rate. (Service: Amazon S3; Status Code: 503; Error Code: SlowDown)

Such situations require the application to retry momentarily, but after Amazon S3 internally optimizes performance for the new request rate, all requests are generally served without retries. One such situation is when multiple workers in distributed compute engines such as Amazon EMR and AWS Glue momentarily generate a high number of requests to access data under the same prefix.

When using Amazon EMR and AWS Glue to process data in Amazon S3, you can employ certain best practices to manage request traffic and avoid HTTP Slow Down errors. Let’s look at some of these strategies.

Best practices to manage HTTP Slow Down responses

You can use the following approaches to take advantage of the horizontal scaling capability in Amazon S3 and improve the success rate of your requests when accessing Amazon S3 data using Amazon EMR and AWS Glue:

  • Modify the retry strategy for Amazon S3 requests
  • Adjust the number of Amazon S3 objects processed
  • Adjust the number of concurrent Amazon S3 requests

We recommend choosing and applying the options that fit best for your use case to optimize data processing on Amazon S3. In the following sections, we describe best practices of each approach.

Modify the retry strategy for Amazon S3 requests

This is the easiest way to avoid HTTP 503 Slow Down responses and improve the success rate of your requests. To access Amazon S3 data, both Amazon EMR and AWS Glue use the EMR File System (EMRFS), which retries Amazon S3 requests with jitters when it receives 503 Slow Down responses. To improve the success rate of your Amazon S3 requests, you can adjust your retry strategy by configuring certain properties. In Amazon EMR, you can configure parameters in your emrfs-site configuration. In AWS Glue, you can configure the parameters in job parameters. You can adjust your retry strategy in the following ways:

  • Increase the EMRFS default retry limit – By default, EMRFS uses an exponential backoff strategy to retry requests to Amazon S3. The default EMRFS retry limit is 15. However, you can increase this limit when you create a new cluster, on a running cluster, or at application runtime. To increase the retry limit, you can change the value of the fs.s3.maxRetries parameter. Note that you may experience longer job duration if you set a higher value for this parameter. We recommend experimenting with different values, such as 20 as a starting point, confirm the duration overhead of the jobs for each value, and then adjust this parameter based on your requirement.
  • For Amazon EMR, use the AIMD retry strategy – With Amazon EMR versions 6.4.0 and later, EMRFS supports an alternative retry strategy based on an additive-increase/multiplicative-decrease (AIMD) model. This strategy can be useful in shaping the request rate from large clusters. Instead of treating each request in isolation, this mode keeps track of the rate of recent successful and throttled requests. Requests are limited to a rate determined from the rate of recent successful requests. This decreases the number of throttled requests, and therefore the number of attempts needed per request. To enable the AIMD retry strategy, you can set the fs.s3.aimd.enabled property to true. You can further refine the AIMD retry strategy using the advanced AIMD retry settings.

Adjust the number of Amazon S3 objects processed

Another approach is to adjust the number of Amazon S3 objects processed so you have fewer requests made concurrently. When you lower the number of objects to be processed in a job, you use fewer Amazon S3 requests, thereby lowering the request rate or transactions per second (TPS) required for each job. Note the following considerations:

  • Preprocess the data by aggregating multiple smaller files into fewer, larger chunks – For example, use s3-dist-cp or an AWS Glue compaction blueprint to merge a large number of small files (generally less than 64 MB) into a smaller number of optimally sized files (such as 128–512 MB). This approach reduces the number of requests required, while simultaneously improving the aggregate throughput to read and process data in Amazon S3. You may need to experiment to arrive at the optimal size for your workload, because creating extremely large files can reduce the parallelism of the job.
  • Use partition pruning to scan data under specific partitions – In Apache Hive and Hive Metastore-compatible applications such as Apache Spark or Presto, one table can have multiple partition folders. Partition pruning is a technique to scan only the required data in a specific partition folder of a table. It’s useful when you want to read a specific portion from the entire table. To take advantage of predicate pushdown, you can use partition columns in the WHERE clause in Spark SQL or the filter expression in a DataFrame. In AWS Glue, you can also use a partition pushdown predicate when creating DynamicFrames.
  • For AWS Glue, enable job bookmarks – You can use AWS Glue job bookmarks to process continuously ingested data repeatedly. It only picks unprocessed data from the previous job run, thereby reducing the number of objects read or retrieved from Amazon S3.
  • For AWS Glue, enable bounded executionsAWS Glue bounded execution is a technique to only pick unprocessed data, with an upper bound on the dataset size or the number of files to be processed. This is another way to reduce the number of requests made to Amazon S3.

Adjust the number of concurrent Amazon S3 requests

To adjust the number of Amazon S3 requests to have fewer concurrent reads per prefix, you can configure Spark parameters. By default, Spark populates 10,000 tasks to list prefixes when creating Spark DataFrames. You may experience Slow Down responses, especially when you read from a table with highly nested prefix structures. In this case, it’s a good idea to configure Spark to limit the number of maximum listing parallelism by decreasing the parameter spark.sql.sources.parallelPartitionDiscovery.parallelism (the default is 10000).

To have fewer concurrent write requests per prefix, you can use the following techniques:

  • Reduce the number of Spark RDD partitions before writes – You can do this by using df.repartition(n) or df.coalesce(n) in DataFrames. For Spark SQL, you can also use query hints like REPARTITION or COALESCE. You can see the number of tasks (=RDD partitions) on the Spark UI.
  • For AWS Glue, group the input data – If the datasets are made up of small files, we recommend grouping the input data because it reduces the number of RDD partitions, and reduces the number of Amazon S3 requests to write the files.
  • Use the EMRFS S3-optimized committer – The EMRFS S3-optimized committer is used by default in Amazon EMR 5.19.0 and later, and AWS Glue 3.0. In AWS Glue 2.0, you can configure it in the job parameter --enable-s3-parquet-optimized-committer. The committer uses Amazon S3 multipart uploads instead of renaming files, and it usually reduces the number of HEAD/LIST requests significantly.

The following are other techniques to adjust the Amazon S3 request rate in Amazon EMR and AWS Glue. These options have the net effect of reducing parallelism of the Spark job, thereby reducing the probability of Amazon S3 Slow Down responses, although it can lead to longer job duration. We recommend testing and adjusting these values for your use case.

  • Reduce the number of concurrent jobs – Start with the most read/write heavy jobs. If you configured cross-account access for Amazon S3, keep in mind that other accounts might also be submitting jobs to the prefix.
  • Reduce the number of concurrent Spark tasks – You have several options:
    • For Amazon EMR, set the number of Spark executors (for example, the spark-submit option --num-executors and Spark parameter spark.executor.instance).
    • For AWS Glue, set the number of workers in the NumberOfWorkers parameter.
    • For AWS Glue, change the WorkerType parameter to a smaller one (for example, G.2X to G.1X).
    • Configure Spark parameters:
      • Decrease the number of spark.default.parallelism.
      • Decrease the number of spark.sql.shuffle.partitions.
      • Increase the number of spark.task.cpus (the default is 1) to allocate more CPU cores per Spark task.

Conclusion

In this post, we described the best practices to optimize data access from Amazon EMR and AWS Glue to Amazon S3. With these best practices, you can easily run Amazon EMR and AWS Glue jobs by taking advantage of Amazon S3 horizontal scaling, and process data in a highly distributed way at a massive scale.

For further guidance, please reach out to AWS Premium Support.

Appendix A: Configure CloudWatch request metrics

To monitor Amazon S3 requests, you can enable request metrics in Amazon CloudWatch for the bucket. Then, define a filter for the prefix. For a list of useful metrics to monitor, see Monitoring metrics with Amazon CloudWatch. After you enable metrics, use the data in the metrics to determine which of the aforementioned options is best for your use case.

Appendix B: Configure Spark parameters

To configure Spark parameters in Amazon EMR, there are several options:

  • spark-submit command – You can pass Spark parameters via the --conf option.
  • Job script – You can set Spark parameters in the SparkConf object in the job script codes.
  • Amazon EMR configurations – You can configure Spark parameters via API using Amazon EMR configurations. For more information, see Configure Spark.

To configure Spark parameters in AWS Glue, you can configure AWS Glue job parameters using key --conf with value like spark.hadoop.fs.s3.maxRetries=50.

To set multiple configs, configure your job parameters using key --conf with value like spark.hadoop.fs.s3.maxRetries=50 --conf spark.task.cpus=2.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is passionate about releasing AWS Glue connector custom blueprints and other software artifacts to help customers build their data lakes. In his spare time, he enjoys watching hermit crabs with his children.

Aditya Kalyanakrishnan is a Senior Product Manager on the Amazon S3 team at AWS. He enjoys learning from customers about how they use Amazon S3 and helping them scale performance. Adi’s based in Seattle, and in his spare time enjoys hiking and occasionally brewing beer.

Introducing Protocol buffers (protobuf) schema support in Amazon Glue Schema Registry

Post Syndicated from Vikas Bajaj original https://aws.amazon.com/blogs/big-data/introducing-protocol-buffers-protobuf-schema-support-in-amazon-glue-schema-registry/

AWS Glue Schema Registry now supports Protocol buffers (protobuf) schemas in addition to JSON and Avro schemas. This allows application teams to use protobuf schemas to govern the evolution of streaming data and centrally control data quality from data streams to data lake. AWS Glue Schema Registry provides an open-source library that includes Apache-licensed serializers and deserializers for protobuf that integrate with Java applications developed for Apache Kafka, Amazon Managed Streaming for Apache Kafka (Amazon MSK), Amazon Kinesis Data Streams, and Kafka Streams. Similar to Avro and JSON schemas, Protocol buffers schemas also support compatibility modes, schema sourcing via metadata, auto-registration of schemas, and AWS Identity and Access Management (IAM) compatibility.

In this post, we focus on Protocol buffers schema support in AWS Glue Schema Registry and how to use Protocol buffers schemas in stream processing Java applications that integrate with Apache Kafka, Amazon Managed Streaming for Apache Kafka and Amazon Kinesis Data Streams

Introduction to Protocol buffers

Protocol buffers is a language and platform-neutral, extensible mechanism for serializing and deserializing structured data for use in communications protocols and data storage. A protobuf message format is defined in the .proto file. Protobuf is recommended over other data formats when you need language interoperability, faster serialization and deserialization, type safety, schema adherence between data producer and consumer applications, and reduced coding effort. With protobuf, you can use generated code from the schema using the protobuf compiler (protoc) to easily write and read your data to and from data streams using a variety of languages. You can also use build tools plugins such as Maven and Gradle to generate code from protobuf schemas as part of your CI/CD pipelines. ​We use the following schema for code examples in this post, which defines an employee with a gRPC service definition to find an employee by ID:

Employee.proto

syntax = "proto2";
package gsr.proto.post;

import "google/protobuf/wrappers.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
import "google/type/money.proto";

service EmployeeSearch {
    rpc FindEmployee(EmployeeSearchParams) returns (Employee);
}
message EmployeeSearchParams {
    required int32 id = 1;
}
message Employee {
    required int32 id = 1;
    required string name = 2;
    required string address = 3;
    required google.protobuf.Int32Value employee_age = 4;
    required google.protobuf.Timestamp start_date = 5;
    required google.protobuf.Duration total_time_span_in_company = 6;
    required google.protobuf.BoolValue is_certified = 7;
    required Team team = 8;
    required Project project = 9;
    required Role role = 10;
    required google.type.Money total_award_value = 11;
}
message Team {
    required string name = 1;
    required string location = 2;
}
message Project {
    required string name = 1;
    required string state = 2;
}
enum Role {
    MANAGER = 0;
    DEVELOPER = 1;
    ARCHITECT = 2;
}

AWS Glue Schema Registry supports both proto2 and proto3 syntax. The preceding protobuf schema using version 2 contains three message types: Employee, Team, and Project using scalar, composite, and enumeration data types. Each field in the message definitions has a unique number, which is used to identify fields in the message binary format, and should not be changed once your message type is in use. In a proto2 message, a field can be required, optional, or repeated; in proto3, the options are repeated and optional. The package declaration makes sure generated code is namespaced to avoid any collisions. In addition to scalar, composite, and enumeration types, AWS Glue Schema Registry also supports protobuf schemas with common types such as Money, PhoneNumber,Timestamp, Duration, and nullable types such as BoolValue and Int32Value. It also supports protobuf schemas with gRPC service definitions with compatibility rules, such as EmployeeSearch, in the preceding schema. To learn more about the Protocol buffers, refer to its documentation.

Supported Protocol buffers specification and features

AWS Glue Schema Registry supports all the features of Protocol buffers for versions 2 and 3 except for groups, extensions, and importing definitions. AWS Glue Schema Registry APIs and its open-source library supports the latest protobuf runtime version. The protobuf schema operations in AWS Glue Schema Registry are supported via the AWS Management Console, AWS Command Line Interface (AWS CLI), AWS Glue Schema Registry API, AWS SDK, and AWS CloudFormation.

How AWS Glue Schema Registry works

The following diagram illustrates a high-level view of how AWS Glue Schema Registry works. AWS Glue Schema Registry allows you to register and evolve JSON, Apache Avro, and Protocol buffers schemas with compatibility modes. You can register multiple versions of each schema as the business needs or stream processing application’s requirements evolve. The AWS Glue Schema Registry open-source library provides JSON, Avro, and protobuf serializers and deserializers that you configure in producer and consumer stream processing applications, as shown in the following diagram. The open-source library also supports optional compression and caching configuration to save on data transfers.

To accommodate various business use cases, AWS Glue Schema Registry supports multiple compatibility modes. For example, if a consumer application is updated to a new schema version but is still able to consume and process messages based on the previous version of the same schema, then the schema is backward-compatible. However, if a schema version has bumped up in the producer application and the consumer application is not updated yet but can still consume and process the old and new message, then the schema is configured as forward-compatible. For more information, refer to How the Schema Registry Works.

Create a Protocol buffers schema in AWS Glue Schema Registry

In this section, we create a protobuf schema in AWS Glue Schema Registry via the console and AWS CLI.

Create a schema via the console

Make sure you have the required AWS Glue Schema Registry IAM permissions.

  1. On the AWS Glue console, choose Schema registries in the navigation pane.
  2. Click Add registry.
  3. For Registry name, enter employee-schema-registry.
  4. Click Add Registry.
  5. After the registry is created, click Add schema to register a new schema.
  6. For Schema name, enter Employee.proto.

The schema must be either Employee.proto or Employee if the protobuf schema doesn’t have the options option java_multiple_files = true; and option java_outer_classname = "<Outer class name>"; and if you decide to use protobuf schema generated code (POJOs) in your stream processing applications. We cover this with an example in a subsequent section of this post.­ For more information on protobuf options, refer to Options.

  1. For Registry, choose the registry employee-schema-registry.
  2. For Data format, choose Protocol buffers.
  3. For Compatibility mode, choose Backward.

You can choose other compatibility modes as per your use case.

  1. For First schema version, enter the preceding protobuf schema, then click Create schema and version.

After the schema is registered successfully, its status will be Available, as shown in the following screenshot.

Create a schema via the AWS CLI

Make sure you have IAM credentials with AWS Glue Schema Registry permissions.

  1. Run the following AWS CLI command to create a schema registry employee-schema-registry (for this post, we use the Region us-east-2):
    aws glue create-registry \
    --registry-name employee-schema-registry \
    --region us-east-2

The AWS CLI command returns the newly created schema registry ARN in response.

  1. Copy the RegistryArn value from the response to use in the following AWS CLI command.
  2. In the following command, use the preceding protobuf schema and schema name Employee.proto:
    aws glue create-schema --schema-name Employee.proto \
    --registry-id RegistryArn=<Schema Registry ARN that you copied from response of create registry CLI command> \
    --compatibility BACKWARD \
    --data-format PROTOBUF \
    --schema-definition file:///<project-directory>/Employee.proto \
    --region us-east-2

You can also use AWS CloudFormation to create schemas in AWS Glue Schema Registry.

Using a Protocol buffers schema with Amazon MSK and Kinesis Data Streams

Like Apache Avro’s SpecificRecord and GenericRecord, protobuf also supports working with POJOs to ensure type safety and DynamicMessage to create generic data producer and consumer applications. The following examples showcase the use of a protobuf schema registered in AWS Glue Schema Registry with Kafka and Kinesis Data Streams producer and consumer applications.

Use a protobuf schema with Amazon MSK

Create an Amazon MSK or Apache Kafka cluster with a topic called protobuf-demo-topic. If creating an Amazon MSK cluster, you can use the console. For instructions, refer to Getting Started Using Amazon MSK.

Use protobuf schema-generated POJOs

To use protobuf schema-generated POJOs, complete the following steps:

  1. Install the protobuf compiler (protoc) on your local machine from GitHub and add it in the PATH variable.
  2. Add the following plugin configuration to your application’s pom.xml file. We use the xolstice protobuf Maven plugin for this post to generate code from the protobuf schema.
    <plugin>
       <!-- https://www.xolstice.org/protobuf-maven-plugin/usage.html -->
       <groupId>org.xolstice.maven.plugins</groupId>
       <artifactId>protobuf-maven-plugin</artifactId>
       <version>0.6.1</version>
       <configuration>
           <protoSourceRoot>${basedir}/src/main/resources/proto</protoSourceRoot>
           <outputDirectory>${basedir}/src/main/java</outputDirectory>
           <clearOutputDirectory>false</clearOutputDirectory>
       </configuration>
       <executions>
           <execution>
               <goals>
                   <goal>compile</goal>
               </goals>
           </execution>
       </executions>
    </plugin>

  3. Add the following dependencies to your application’s pom.xml file:
    <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
    <dependency>
       <groupId>com.google.protobuf</groupId>
       <artifactId>protobuf-java</artifactId>
       <version>3.19.4</version>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/software.amazon.glue/schema-registry-serde -->
    <dependency>
       <groupId>software.amazon.glue</groupId>
       <artifactId>schema-registry-serde</artifactId>
       <version>1.1.9</version>
    </dependency>	

  4. Create a schema registry employee-schema-registry in AWS Glue Schema Registry and register the Employee.proto protobuf schema with it. Name your schema Employee.proto (or Employee).
  5. Run the following command to generate the code from Employee.proto. Make sure you have the schema file in the ${basedir}/src/main/resources/proto directory or change it as per your application directory structure in the application’s pom.xml <protoSourceRoot> tag value:
    mvn clean compile

Next, we configure the Kafka producer publishing protobuf messages to the Kafka topic on Amazon MSK.

  1. Configure the Kafka producer properties:
private Properties getProducerConfig() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName());
    props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.PROTOBUF.name());
    props.put(AWSSchemaRegistryConstants.AWS_REGION,"us-east-2");
    props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "employee-schema-registry");
    props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "Employee.proto");
    props.put(AWSSchemaRegistryConstants.PROTOBUF_MESSAGE_TYPE, ProtobufMessageType.POJO.getName());
    return props;
}

The VALUE_SERIALIZER_CLASS_CONFIG configuration specifies the AWS Glue Schema Registry serializer, which serializes the protobuf message.

  1. Use the schema-generated code (POJOs) to create a protobuf message:
    public EmployeeOuterClass.Employee createEmployeeRecord(int employeeId){
        EmployeeOuterClass.Employee employee =
                EmployeeOuterClass.Employee.newBuilder()
                        .setId(employeeId)
                        .setName("Dummy")
                        .setAddress("Melbourne, Australia")
                        .setEmployeeAge(Int32Value.newBuilder().setValue(32).build())
                        .setStartDate(Timestamp.newBuilder().setSeconds(235234532434L).build())
                        .setTotalTimeSpanInCompany(Duration.newBuilder().setSeconds(3453245345L).build())
                        .setIsCertified(BoolValue.newBuilder().setValue(true).build())
                        .setRole(EmployeeOuterClass.Role.ARCHITECT)
                        .setProject(EmployeeOuterClass.Project.newBuilder()
                                .setName("Protobuf Schema Demo")
                                .setState("GA").build())
                        .setTotalAwardValue(Money.newBuilder()
                                            .setCurrencyCode("USD")
                                            .setUnits(5)
                                            .setNanos(50000).build())
                        .setTeam(EmployeeOuterClass.Team.newBuilder()
                                .setName("Solutions Architects")
                                .setLocation("Australia").build()).build();
        return employee;
    }

  2. Publish the protobuf messages to the protobuf-demo-topic topic on Amazon MSK:
    public void startProducer() throws InterruptedException {
        String topic = "protobuf-demo-topic";
        KafkaProducer<String, EmployeeOuterClass.Employee> producer = new KafkaProducer<String, EmployeeOuterClass.Employee>(getProducerConfig());
        logger.info("Starting to send records...");
        int employeeId = 0;
        while(employeeId < 100)
        {
            EmployeeOuterClass.Employee person = createEmployeeRecord(employeeId);
            String key = "key-" + employeeId;
            ProducerRecord<String,  EmployeeOuterClass.Employee> record = new ProducerRecord<String,  EmployeeOuterClass.Employee>(topic, key, person);
            producer.send(record, new ProducerCallback());
            employeeId++;
        }
    }
    private class ProducerCallback implements Callback {
        @Override
        public void onCompletion(RecordMetadata recordMetaData, Exception e){
            if (e == null) {
                logger.info("Received new metadata. \n" +
                        "Topic:" + recordMetaData.topic() + "\n" +
                        "Partition: " + recordMetaData.partition() + "\n" +
                        "Offset: " + recordMetaData.offset() + "\n" +
                        "Timestamp: " + recordMetaData.timestamp());
            }
            else {
                logger.info("There's been an error from the Producer side");
                e.printStackTrace();
            }
        }
    }

  3. Start the Kafka producer:
    public static void main(String args[]) throws InterruptedException {
        ProducerProtobuf producer = new ProducerProtobuf();
        producer.startProducer();
    }

  4. In the Kafka consumer application’s pom.xml, add the same plugin and dependencies as the Kafka producer’s pom.xml.

Next, we configure the Kafka consumer consuming protobuf messages from the Kafka topic on Amazon MSK.

  1. Configure the Kafka consumer properties:
    private Properties getConsumerConfig() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "protobuf-consumer");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
        props.put(AWSSchemaRegistryConstants.AWS_REGION,"us-east-2");
        props.put(AWSSchemaRegistryConstants.PROTOBUF_MESSAGE_TYPE, ProtobufMessageType.POJO.getName());
        return props;
    }

The VALUE_DESERIALIZER_CLASS_CONFIG config specifies the AWS Glue Schema Registry deserializer that deserializes the protobuf messages.

  1. Consume the protobuf message (as a POJO) from the protobuf-demo-topic topic on Amazon MSK:
    public void startConsumer() {
        logger.info("starting consumer...");
        String topic = "protobuf-demo-topic";
        KafkaConsumer<String, EmployeeOuterClass.Employee> consumer = new KafkaConsumer<String, EmployeeOuterClass.Employee>(getConsumerConfig());
        consumer.subscribe(Collections.singletonList(topic));
        while (true) {
            final ConsumerRecords<String, EmployeeOuterClass.Employee> records = consumer.poll(Duration.ofMillis(1000));
            for (final ConsumerRecord<String, EmployeeOuterClass.Employee> record : records) {
                final EmployeeOuterClass.Employee employee = record.value();
                logger.info("Employee Id: " + employee.getId() + " | Name: " + employee.getName() + " | Address: " + employee.getAddress() +
                        " | Age: " + employee.getEmployeeAge().getValue() + " | Startdate: " + employee.getStartDate().getSeconds() +
                        " | TotalTimeSpanInCompany: " + employee.getTotalTimeSpanInCompany() +
                        " | IsCertified: " + employee.getIsCertified().getValue() + " | Team: " + employee.getTeam().getName() +
                        " | Role: " + employee.getRole().name() + " | Project State: " + employee.getProject().getState() +
                        " | Project Name: " + employee.getProject().getName() + "| Award currency code: " + employee.getTotalAwardValue().getCurrencyCode() +
                        " | Award units : " + employee.getTotalAwardValue().getUnits() + " | Award nanos " + employee.getTotalAwardValue().getNanos());
            }
        }
    }

  2. Start the Kafka consumer:
    public static void main(String args[]){
        ConsumerProtobuf consumer = new ConsumerProtobuf();
        consumer.startConsumer();
    }

Use protobuf’s DynamicMessage

You can use DynamicMessage to create generic producer and consumer applications without generating the code from the protobuf schema. To use DynamicMessage, you first need to create a protobuf schema file descriptor.

  1. Generate a file descriptor from the protobuf schema using the following command:
    protoc --include_imports --proto_path=proto --descriptor_set_out=proto/Employeeproto.desc proto/Employee.proto

The option --descritor_set_out has the descriptor file name that this command generates. The protobuf schema Employee.proto is in the proto directory.

  1. Make sure you have created a schema registry and registered the preceding protobuf schema with it.

Now we configure the Kafka producer publishing DynamicMessage to the Kafka topic on Amazon MSK.

  1. Create the Kafka producer configuration. The PROTOBUF_MESSAGE_TYPE configuration is DYNAMIC_MESSAGE instead of POJO.
    private Properties getProducerConfig() {
       Properties props = new Properties();
       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
       props.put(ProducerConfig.ACKS_CONFIG, "-1");
       props.put(ProducerConfig.CLIENT_ID_CONFIG,"protobuf-dynamicmessage-record-producer");
       props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,GlueSchemaRegistryKafkaSerializer.class.getName());
       props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.PROTOBUF.name());
       props.put(AWSSchemaRegistryConstants.AWS_REGION,"us-east-2");
       props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "employee-schema-registry");
       props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "Employee.proto");
       props.put(AWSSchemaRegistryConstants.PROTOBUF_MESSAGE_TYPE, ProtobufMessageType.DYNAMIC_MESSAGE.getName());
       return props;
        }

  2. Create protobuf dynamic messages and publish them to the Kafka topic on Amazon MSK:
    public void startProducer() throws Exception {
        Descriptor desc = getDescriptor();
        String topic = "protobuf-demo-topic";
        KafkaProducer<String, DynamicMessage> producer = new KafkaProducer<String, DynamicMessage>(getProducerConfig());
        logger.info("Starting to send records...");
        int i = 0;
        while (i < 100) {
            DynamicMessage dynMessage = DynamicMessage.newBuilder(desc)
                    .setField(desc.findFieldByName("id"), 1234)
                    .setField(desc.findFieldByName("name"), "Dummy Name")
                    .setField(desc.findFieldByName("address"), "Melbourne, Australia")
                    .setField(desc.findFieldByName("employee_age"), Int32Value.newBuilder().setValue(32).build())
                    .setField(desc.findFieldByName("start_date"), Timestamp.newBuilder().setSeconds(235234532434L).build())
                    .setField(desc.findFieldByName("total_time_span_in_company"), Duration.newBuilder().setSeconds(3453245345L).build())
                    .setField(desc.findFieldByName("is_certified"), BoolValue.newBuilder().setValue(true).build())
    		.setField(desc.findFieldByName("total_award_value"), Money.newBuilder().setCurrencyCode("USD")
    						.setUnits(1).setNanos(50000).build())
                    .setField(desc.findFieldByName("team"), createTeam(desc.findFieldByName("team").getMessageType()))
                    .setField(desc.findFieldByName("project"), createProject(desc.findFieldByName("project").getMessageType()))
                    .setField(desc.findFieldByName("role"), desc.findFieldByName("role").getEnumType().findValueByName("ARCHITECT"))
                    .build();
            String key = "key-" + i;
            ProducerRecord<String, DynamicMessage> record = new ProducerRecord<String, DynamicMessage>(topic, key, dynMessage);
            producer.send(record, new ProtobufProducer.ProducerCallback());
            Thread.sleep(1000);
            i++;
        }
    }
    private static DynamicMessage createTeam(Descriptor desc) {
        DynamicMessage dynMessage = DynamicMessage.newBuilder(desc)
                .setField(desc.findFieldByName("name"), "Solutions Architects")
                .setField(desc.findFieldByName("location"), "Australia")
                .build();
        return dynMessage;
    }
    
    private static DynamicMessage createProject(Descriptor desc) {
        DynamicMessage dynMessage = DynamicMessage.newBuilder(desc)
                .setField(desc.findFieldByName("name"), "Protobuf Schema Demo")
                .setField(desc.findFieldByName("state"), "GA")
                .build();
        return dynMessage;
    }
    
    private class ProducerCallback implements Callback {
        @Override
        public void onCompletion(RecordMetadata recordMetaData, Exception e) {
            if (e == null) {
                logger.info("Received new metadata. \n" +
                        "Topic:" + recordMetaData.topic() + "\n" +
                        "Partition: " + recordMetaData.partition() + "\n" +
                        "Offset: " + recordMetaData.offset() + "\n" +
                        "Timestamp: " + recordMetaData.timestamp());
            } else {
                logger.info("There's been an error from the Producer side");
                e.printStackTrace();
            }
        }
    }

  3. Create a descriptor using the Employeeproto.desc file that we generated from the Employee.proto schema file in the previous steps:
    private Descriptor getDescriptor() throws Exception {
        InputStream inStream = ProtobufProducer.class.getClassLoader().getResourceAsStream("proto/Employeeproto.desc");
        DescriptorProtos.FileDescriptorSet fileDescSet = DescriptorProtos.FileDescriptorSet.parseFrom(inStream);
        Map<String, DescriptorProtos.FileDescriptorProto> fileDescProtosMap = new HashMap<String, DescriptorProtos.FileDescriptorProto>();
        List<DescriptorProtos.FileDescriptorProto> fileDescProtos = fileDescSet.getFileList();
        for (DescriptorProtos.FileDescriptorProto fileDescProto : fileDescProtos) {
            fileDescProtosMap.put(fileDescProto.getName(), fileDescProto);
        }
        DescriptorProtos.FileDescriptorProto fileDescProto = fileDescProtosMap.get("Employee.proto");
        FileDescriptor[] dependencies = getProtoDependencies(fileDescProtosMap, fileDescProto);
        FileDescriptor fileDesc = FileDescriptor.buildFrom(fileDescProto, dependencies);
        Descriptor desc = fileDesc.findMessageTypeByName("Employee");
        return desc;
    }
    
    public static FileDescriptor[] getProtoDependencies(Map<String, FileDescriptorProto> fileDescProtos, 
    				  FileDescriptorProto fileDescProto) throws Exception {
    
        if (fileDescProto.getDependencyCount() == 0)
            return new FileDescriptor[0];
    
        ProtocolStringList dependencyList = fileDescProto.getDependencyList();
        String[] dependencyArray = dependencyList.toArray(new String[0]);
        int noOfDependencies = dependencyList.size();
    
        FileDescriptor[] dependencies = new FileDescriptor[noOfDependencies];
        for (int i = 0; i < noOfDependencies; i++) {
            FileDescriptorProto dependencyFileDescProto = fileDescProtos.get(dependencyArray[i]);
            FileDescriptor dependencyFileDesc = FileDescriptor.buildFrom(dependencyFileDescProto, 
    					     getProtoDependencies(fileDescProtos, dependencyFileDescProto));
            dependencies[i] = dependencyFileDesc;
        }
        return dependencies;
    }

  4. Start the Kafka producer:
    public static void main(String args[]) throws InterruptedException {
    	 ProducerProtobuf producer = new ProducerProtobuf();
             producer.startProducer();
    }

Now we configure the Kafka consumer consuming dynamic messages from the Kaka topic on Amazon MSK.

  1. Enter the following Kafka consumer configuration:
    private Properties getConsumerConfig() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "protobuf-record-consumer");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
        props.put(AWSSchemaRegistryConstants.AWS_REGION,"us-east-2");
        props.put(AWSSchemaRegistryConstants.PROTOBUF_MESSAGE_TYPE, ProtobufMessageType.DYNAMIC_MESSAGE.getName());
        return props;
    }

  2. Consume protobuf dynamic messages from the Kafka topic protobuf-demo-topic. Because we’re using DYNAMIC_MESSAGE, the retrieved objects are of type DynamicMessage.
    public void startConsumer() {
        logger.info("starting consumer...");
        String topic = "protobuf-demo-topic";
        KafkaConsumer<String, DynamicMessage> consumer = new KafkaConsumer<String, DynamicMessage>(getConsumerConfig());
        consumer.subscribe(Collections.singletonList(topic));
        while (true) {
            final ConsumerRecords<String, DynamicMessage> records = consumer.poll(Duration.ofMillis(1000));
            for (final ConsumerRecord<String, DynamicMessage> record : records) {
                for (Descriptors.FieldDescriptor field : record.value().getAllFields().keySet()) {
                    logger.info(field.getName() + ": " + record.value().getField(field));
                }
            }
        }
    }

  3. Start the Kafka consumer:
    public static void main(String args[]){
            ConsumerProtobuf consumer = new ConsumerProtobuf();
            consumer.startConsumer();
         }

Use a protobuf schema with Kinesis Data Streams

You can use the protobuf schema-generated POJOs with the Kinesis Producer Library (KPL) and Kinesis Client Library (KCL).

  1. Install the protobuf compiler (protoc) on your local machine from GitHub and add it in the PATH variable.
  2. Add the following plugin configuration to your application’s pom.xml file. We’re using the xolstice protobuf Maven plugin for this post to generate code from the protobuf schema.
    <plugin>
       <!-- https://www.xolstice.org/protobuf-maven-plugin/usage.html -->
       <groupId>org.xolstice.maven.plugins</groupId>
       <artifactId>protobuf-maven-plugin</artifactId>
       <version>0.6.1</version>
       <configuration>
           <protoSourceRoot>${basedir}/src/main/resources/proto</protoSourceRoot>
           <outputDirectory>${basedir}/src/main/java</outputDirectory>
           <clearOutputDirectory>false</clearOutputDirectory>
       </configuration>
       <executions>
           <execution>
               <goals>
                   <goal>compile</goal>
               </goals>
           </execution>
       </executions>
    </plugin>

  3. Because the KPL and KCL latest versions have the AWS Glue Schema Registry open-source library (schema-registry-serde) and protobuf runtime (protobuf-java) included, you only need to add the following dependencies to your application’s pom.xml:
    <!-- https://mvnrepository.com/artifact/com.amazonaws/amazon-kinesis-producer -->
    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>amazon-kinesis-producer</artifactId>
        <version>0.14.11</version>
    	</dependency>
    	<!-- https://mvnrepository.com/artifact/software.amazon.kinesis/amazon-kinesis-client -->
    <dependency>
        <groupId>software.amazon.kinesis</groupId>
        <artifactId>amazon-kinesis-client</artifactId>
        <version>2.4.0version>
    </dependency>

  4. Create a schema registry employee-schema-registry and register the Employee.proto protobuf schema with it. Name your schema Employee.proto (or Employee).
  5. Run the following command to generate the code from Employee.proto. Make sure you have the schema file in the ${basedir}/src/main/resources/proto directory or change it as per your application directory structure in the application’s pom.xml <protoSourceRoot> tag value.
    mvn clean compile

The following Kinesis producer code with the KPL uses the Schema Registry open-source library to publish protobuf messages to Kinesis Data Streams.

  1. Start the Kinesis Data Streams producer:
    private static final String PROTO_SCHEMA_FILE = "proto/Employee.proto";
    private static final String SCHEMA_NAME = "Employee.proto";
    private static String REGION_NAME = "us-east-2";
    private static String REGISTRY_NAME = "employee-schema-registry";
    private static String STREAM_NAME = "employee_data_stream";
    private static int NUM_OF_RECORDS = 100;
    private static String REGISTRY_ENDPOINT = "https://glue.us-east-2.amazonaws.com";
    
    public static void main(String[] args) throws Exception {
         ProtobufKPLProducer producer = new ProtobufKPLProducer();
         producer.startProducer();
     }
    }

  2. Configure the Kinesis producer:
public void startProducer() throws Exception {
    logger.info("Starting KPL client with Glue Schema Registry Integration...");
    GlueSchemaRegistryConfiguration schemaRegistryConfig = new GlueSchemaRegistryConfiguration(REGION_NAME);
    schemaRegistryConfig.setCompressionType(AWSSchemaRegistryConstants.COMPRESSION.ZLIB);
    schemaRegistryConfig.setSchemaAutoRegistrationEnabled(false);
    schemaRegistryConfig.setCompatibilitySetting(Compatibility.BACKWARD);
    schemaRegistryConfig.setEndPoint(REGISTRY_ENDPOINT);
    schemaRegistryConfig.setProtobufMessageType(ProtobufMessageType.POJO);
    schemaRegistryConfig.setRegistryName(REGISTRY_NAME);
	
    //Setting Glue Schema Registry configuration in Kinesis Producer Configuration along with other configs
    KinesisProducerConfiguration config = new KinesisProducerConfiguration()
                                        .setRecordMaxBufferedTime(3000)
                                        .setMaxConnections(1)
                                        .setRequestTimeout(60000)
                                        .setRegion(REGION_NAME)
                                        .setRecordTtl(60000)
                                        .setGlueSchemaRegistryConfiguration(schemaRegistryConfig);

    FutureCallback<UserRecordResult> myCallback = new FutureCallback<UserRecordResult>() {
        @Override public void onFailure(Throwable t) {
              t.printStackTrace();
        };
        @Override public void onSuccess(UserRecordResult result) {
            logger.info("record sent successfully. Sequence Number: " + result.getSequenceNumber() + " | Shard Id : " + result.getShardId());
        };
    };
    
	//Creating schema definition object from the Employee.proto schema file.
    Schema gsrSchema = getSchemaDefinition();
    final KinesisProducer producer = new KinesisProducer(config);
    int employeeCount = 1;
    while(true) {
        //Creating and serializing schema generated POJO object (protobuf message)

        EmployeeOuterClass.Employee employee = createEmployeeRecord(employeeCount);
        byte[] serializedBytes = employee.toByteArray();
        ByteBuffer data = ByteBuffer.wrap(serializedBytes);
        Instant timestamp = Instant.now();

        //Publishing protobuf message to the Kinesis Data Stream
        ListenableFuture<UserRecordResult> f =
                    producer.addUserRecord(STREAM_NAME,
                                        Long.toString(timestamp.toEpochMilli()),
                                        new BigInteger(128, new Random()).toString(10),
                                        data,
                                        gsrSchema);
        Futures.addCallback(f, myCallback, MoreExecutors.directExecutor());
        employeeCount++;
        if(employeeCount > NUM_OF_RECORDS)
            break;
    }
    List<Future<UserRecordResult>> putFutures = new LinkedList<>();
    for (Future<UserRecordResult> future : putFutures) {
        UserRecordResult userRecordResult = future.get();
        logger.info(userRecordResult.getShardId() + userRecordResult.getSequenceNumber());
    }
}
  1. Create a protobuf message using schema-generated code (POJOs):
    public EmployeeOuterClass.Employee createEmployeeRecord(int count){
        EmployeeOuterClass.Employee employee =
                EmployeeOuterClass.Employee.newBuilder()
                .setId(count)
                .setName("Dummy")
                .setAddress("Melbourne, Australia")
                .setEmployeeAge(Int32Value.newBuilder().setValue(32).build())
                .setStartDate(Timestamp.newBuilder().setSeconds(235234532434L).build())
                .setTotalTimeSpanInCompany(Duration.newBuilder().setSeconds(3453245345L).build())
                .setIsCertified(BoolValue.newBuilder().setValue(true).build())
                .setRole(EmployeeOuterClass.Role.ARCHITECT)
                .setProject(EmployeeOuterClass.Project.newBuilder()
                            .setName("Protobuf Schema Demo")
                            .setState("GA").build())
                .setTotalAwardValue(Money.newBuilder()
                            .setCurrencyCode("USD")
                            .setUnits(5)
                            .setNanos(50000).build())
                .setTeam(EmployeeOuterClass.Team.newBuilder()
                            .setName("Solutions Architects")
                            .setLocation("Australia").build()).build();
        return employee;
    }

  2. Create the schema definition from Employee.proto:
    private Schema getSchemaDefinition() throws IOException {
        InputStream inputStream = ProtobufKPLProducer.class.getClassLoader().getResourceAsStream(PROTO_SCHEMA_FILE);
        StringBuilder resultStringBuilder = new StringBuilder();
        try (BufferedReader br = new BufferedReader(new InputStreamReader(inputStream))) {
            String line;
            while ((line = br.readLine()) != null) {
                resultStringBuilder.append(line).append("\n");
            }
        }
        String schemaDefinition = resultStringBuilder.toString();
        logger.info("Schema Definition " + schemaDefinition);
        Schema gsrSchema =
                new Schema(schemaDefinition, DataFormat.PROTOBUF.toString(), SCHEMA_NAME);
        return gsrSchema;
    }

The following is the Kinesis consumer code with the KCL using the Schema Registry open-source library to consume protobuf messages from the Kinesis Data Streams.

  1. Initialize the application:
    public void run(){
        logger.info("Starting KCL client with Glue Schema Registry Integration...");
        Region region = Region.of(ObjectUtils.firstNonNull(REGION_NAME, "us-east-2"));
        KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));
        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
    
        EmployeeRecordProcessorFactory employeeRecordProcessorFactory = new EmployeeRecordProcessorFactory();
        ConfigsBuilder configsBuilder =
                new ConfigsBuilder(STREAM_NAME,
                        APPLICATION_NAME,
                        kinesisClient,
                        dynamoClient,
                        cloudWatchClient,
                        APPLICATION_NAME,
                        employeeRecordProcessorFactory);
    
        //Creating Glue Schema Registry configuration and Glue Schema Registry Deserializer object.
        GlueSchemaRegistryConfiguration gsrConfig = new GlueSchemaRegistryConfiguration(region.toString());
        gsrConfig.setEndPoint(REGISTRY_ENDPOINT);
        gsrConfig.setProtobufMessageType(ProtobufMessageType.POJO);
        GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer =
                new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), gsrConfig);
        /*
         Setting Glue Schema Registry deserializer in the Retrieval Config for
         Kinesis Client Library to use it while deserializing the protobuf messages.
         */
        RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(STREAM_NAME, kinesisClient));
        retrievalConfig.glueSchemaRegistryDeserializer(glueSchemaRegistryDeserializer);
    
        Scheduler scheduler = new Scheduler(
                		configsBuilder.checkpointConfig(),
                		configsBuilder.coordinatorConfig(),
               		configsBuilder.leaseManagementConfig(),
                		configsBuilder.lifecycleConfig(),
                		configsBuilder.metricsConfig(),
                		configsBuilder.processorConfig(),
                		retrievalConfig);
    
        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();
    
        logger.info("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
            Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
            logger.info("Waiting up to 20 seconds for shutdown to complete.");
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (Exception e) {
            logger.info("Interrupted while waiting for graceful shutdown. Continuing.");
        }
        logger.info("Completed, shutting down now.");
    }

  2. Consume protobuf messages from Kinesis Data Streams:
    public static class EmployeeRecordProcessorFactory implements ShardRecordProcessorFactory {
        @Override
        public ShardRecordProcessor shardRecordProcessor() {
            return new EmployeeRecordProcessor();
        }
    }
    public static class EmployeeRecordProcessor implements ShardRecordProcessor {
        private static final Logger logger = Logger.getLogger(EmployeeRecordProcessor.class.getSimpleName());
        public void initialize(InitializationInput initializationInput) {}
        public void processRecords(ProcessRecordsInput processRecordsInput) {
            try {
                logger.info("Processing " + processRecordsInput.records().size() + " record(s)");
                for (KinesisClientRecord r : processRecordsInput.records()) {
    			
                    //Deserializing protobuf message into schema generated POJO
                    EmployeeOuterClass.Employee employee = EmployeeOuterClass.Employee.parseFrom(r.data().array());
                    
                   logger.info("Processed record: " + employee);
                    logger.info("Employee Id: " + employee.getId() + " | Name: "  + employee.getName() + " | Address: " + employee.getAddress() +
                            " | Age: " + employee.getEmployeeAge().getValue() + " | Startdate: " + employee.getStartDate().getSeconds() +
                            " | TotalTimeSpanInCompany: " + employee.getTotalTimeSpanInCompany() +
                            " | IsCertified: " + employee.getIsCertified().getValue() + " | Team: " + employee.getTeam().getName() +
                            " | Role: " + employee.getRole().name() + " | Project State: " + employee.getProject().getState() +
                            " | Project Name: " + employee.getProject().getName() + " | Award currency code: " +    
                           employee.getTotalAwardValue().getCurrencyCode() + " | Award units : " + employee.getTotalAwardValue().getUnits() + 
    		      " | Award nanos " + employee.getTotalAwardValue().getNanos());
                }
            } catch (Exception e) {
                logger.info("Failed while processing records. Aborting" + e);
                Runtime.getRuntime().halt(1);
            }
        }
        public void leaseLost(LeaseLostInput leaseLostInput) {. . .}
        public void shardEnded(ShardEndedInput shardEndedInput) {. . .}
        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {. . .}
    }

  3. Start the Kinesis Data Streams consumer:
    private static final Logger logger = Logger.getLogger(ProtobufKCLConsumer.class.getSimpleName());
    private static String REGION_NAME = "us-east-2";
    private static String STREAM_NAME = "employee_data_stream";
    private static final String APPLICATION_NAME =  "protobuf-demo-kinesis-kpl-consumer";
    private static String REGISTRY_ENDPOINT = "https://glue.us-east-2.amazonaws.com";
    
    public static void main(String[] args) throws ParseException {
        new ProtobufKCLConsumer().run();
    }
    

Enhance your protobuf schema

We covered examples of data producer and consumer applications integrating with Amazon MSK, Apache Kafka, and Kinesis Data Streams, and using a Protocol buffers schema registered with AWS Glue Schema Registry. You can further enhance these examples with schema evolution using the following rules, which are supported by AWS Glue Schema Registry. For example, the following protobuf schema shown is a backward-compatible updated version of Employee.proto. We have added another gRPC service definition CreateEmployee under EmployeeSearch and added an Optional field in the Employee message type. If you upgrade the consumer application with this version of the protobuf schema, the consumer application can still consume old and new protobuf messages.

Employee.proto (version-2)

syntax = "proto2";
package gsr.proto.post;

import "google/protobuf/wrappers.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
import "google/type/money.proto";

service EmployeeSearch {
    rpc FindEmployee(EmployeeSearchParams) returns (Employee);
    rpc CreateEmployee(EmployeeSearchParams) returns (google.protobuf.Empty);
}
message EmployeeSearchParams {
    required int32 id = 1;
}
message Employee {
    required int32 id = 1;
    required string name = 2;
    required string address = 3;
    required google.protobuf.Int32Value employee_age = 4;
    required google.protobuf.Timestamp start_date = 5;
    required google.protobuf.Duration total_time_span_in_company = 6;
    required google.protobuf.BoolValue is_certified = 7;
    required Team team = 8;
    required Project project = 9;
    required Role role = 10;
    required google.type.Money total_award_value = 11;
    optional string title = 12;
}
message Team {
    required string name = 1;
    required string location = 2;
}
message Project {
    required string name = 1;
    required string state = 2;
}
enum Role {
    MANAGER = 0;
    DEVELOPER = 1;
    ARCHITECT = 2;
}

Conclusion

In this post, we introduced Protocol buffers schema support in AWS Glue Schema Registry. AWS Glue Schema Registry now supports Apache Avro, JSON, and Protocol buffers schemas with different compatible modes. The examples in this post demonstrated how to use Protocol buffers schemas registered with AWS Glue Schema Registry in stream processing applications integrated with Apache Kafka, Amazon MSK, and Kinesis Data Streams. We used the schema-generated POJOs for type safety and protobuf’s DynamicMessage to create generic producer and consumer applications. The examples in this post contain the basic components of the stream processing pattern; you can adapt these examples to your use case needs.

To learn more, refer to the following resources:


About the Author

Vikas Bajaj is a Principal Solutions Architect at AWS. Vikas works with digital native customers and advises them on technology architecture and solutions to meet strategic business objectives.

Run AWS Glue crawlers using Amazon S3 event notifications

Post Syndicated from Pradeep Patel original https://aws.amazon.com/blogs/big-data/run-aws-glue-crawlers-using-amazon-s3-event-notifications/

The AWS Well-Architected Data Analytics Lens provides a set of guiding principles for analytics applications on AWS. One of the best practices it talks about is build a central Data Catalog to store, share, and track metadata changes. AWS Glue provides a Data Catalog to fulfill this requirement. AWS Glue also provides crawlers that automatically discover datasets stored in multiple source systems, including Amazon Redshift, Amazon DynamoDB, Amazon Simple Storage Service (Amazon S3), MongoDB, Amazon DocumentDB (with MongoDB compatibility), and various other data stores using JDBC. A crawler extracts schemas of tables from these sources and stores the information in the AWS Glue Data Catalog. You can run a crawler on-demand or on a schedule.

When you schedule a crawler to discover data in Amazon S3, you can choose to crawl all folders or crawl new folders only. In the first mode, every time the crawler runs, it scans data in every folder under the root path it was configured to crawl. This can be slow for large tables because on every run, the crawler must list all objects and then compare metadata to identify new objects. In the second mode, commonly referred as incremental crawls, every time the crawler runs, it processes only S3 folders that were added since the last crawl. Incremental crawls can reduce runtime and cost when used with datasets that append new objects with consistent schema on a regular basis.

AWS Glue also supports incremental crawls using Amazon S3 Event Notifications. You can configure Amazon S3 Event Notifications to be sent to an Amazon Simple Queue Service (Amazon SQS) queue, which the crawler uses to identify the newly added or deleted objects. With each run of the crawler, the SQS queue is inspected for new events; if none are found, the crawler stops. If events are found in the queue, the crawler inspects their respective folders and processes the new objects. This new mode reduces cost and crawler runtime to update large and frequently changing tables.

In this post, we present two design patterns to create a crawler pipeline using this new feature. A crawler pipeline refers to components required to implement incremental crawling using Amazon S3 Event Notifications.

Crawler pipeline design patterns

We define design patterns for the crawler pipeline based on a simple question: do I have any applications other than the crawler that consume S3 event notifications?

If the answer is no, you can send event notifications directly to an SQS queue that has no other consumers. The crawler consumes events from the queue.

If you have multiple applications that want to consume the event notifications, send the notifications directly to an Amazon Simple Notification Service (Amazon SNS) topic, and then broadcast them to an SQS queue. If you have an application or microservice that consumes notifications, you can subscribe it to the SNS topic. This way, you can populate metadata in the Data Catalog while still supporting use cases around the files ingested into the S3 bucket.

The following are some considerations for these options:

  • S3 event notifications can only be sent to standard Amazon SNS; Amazon SNS FIFO is not supported. Refer to Amazon S3 Event Notifications for more details.
  • Similarly, S3 event notifications sent to Amazon SNS can only be forwarded to standard SQS queues and not Amazon SQS FIFO queues. For more information, see FIFO topics example use case.
  • The AWS Identity and Access Management (IAM) role used by the crawler needs to include an IAM policy for Amazon SQS. We provide an example policy later in this post.

Let’s take a deeper look into each design pattern to understand the architecture and its pros and cons.

Option 1: Publish events to an SQS queue

The following diagram represents a design pattern where S3 event notifications are published directly to a standard SQS queue. First, you need to configure an SQS queue as a target for S3 event notification on the S3 bucket where the table you want to crawl is stored. Next, attach an IAM policy to the queue including permissions for Amazon S3 to send messages to Amazon SQS, and permissions for the crawler IAM role to read and delete messages from Amazon SQS. This approach is useful when the SQS queue is used only for incremental crawling and no other application or service is depending on it. The crawler removes events from the queue after they are processed, so they’re not available for other applications. The following diagram illustrates this architecture.

Figure 1: Crawler pipeline using Amazon SQS queue

Figure 1: Crawler pipeline using Amazon SQS queue

Option 2: Publish events to an SNS topic and forward to an SQS queue

The following diagram represents a design pattern where S3 event notifications are sent to an SNS topic, which are then forwarded to an SQS queue for the crawler to consume. First, you need to configure an SNS topic as a target for S3 event notification on the S3 bucket where the table you want to crawl is stored. Next, attach an IAM policy to the topic including permissions for Amazon S3 to send messages to Amazon SNS. Then, create an SQS queue and subscribe it to the SNS topic to receives S3 events. Finally, attach an IAM policy to the queue that includes permissions for Amazon SNS to publish messages to Amazon SQS and permissions for the crawler IAM role to read and delete messages from Amazon SQS. This approach is useful when other applications depend on the S3 event notifications. For more information about fanout capabilities in Amazon SNS, see Fanout S3 Event Notifications to Multiple Endpoints.

Figure 2 : Crawler pipeline using Amazon SNS topic and Amazon SQS queue

Figure 2: Crawler pipeline using Amazon SNS topic and Amazon SQS queue

Solution overview

It’s common to have multiple applications consuming S3 event notifications, so in this post we demonstrate how to implement the second design pattern using Amazon SNS and Amazon SQS.

We create the following AWS resources:

  • S3 bucket – The location where table data is stored. Event notifications are enabled.
  • SNS topic and access policy – Amazon S3 sends event notifications to the SNS topic. The topic must have a policy that gives permissions to Amazon S3.
  • SQS queue and access policy – The SNS topic publishes messages to SQS queue. The queue must have a policy that gives the SNS topic permission to write messages.
  • Three IAM policies – The policies are as follows:
    • SQS queue policy – Lets the crawler consume messages from the SQS queue.
    • S3 policy – Lets the crawler read files from the S3 bucket.
    • AWS Glue crawler policy – Lets the crawler make changes to the AWS Glue Data Catalog.
  • IAM role – The IAM role used by the crawler. This role uses the three preceding policies.
  • AWS Glue crawler – Crawls the table’s objects and updates the AWS Glue Data Catalog.
  • AWS Glue database – The database in the Data Catalog.
  • AWS Glue table – The crawler creates a table in the Data Catalog.

In the following sections, we walk you through the steps to create your resources and test the solution.

Create an S3 bucket and set up a folder

To create your Amazon S3 resources, complete the following steps:

  1. On the Amazon S3 console, choose Buckets in the navigation pane.
  2. Choose Create bucket.
  3. For Bucket name, enter s3-event-notifications-bucket-<random-number>.
  4. Select Block all public access.
  5. Choose Create bucket.
  6. In the buckets list, select the bucket and choose Create a folder.
  7. For Folder name, enter nyc_data.
  8. Choose Create folder.

Create an IAM policy with permissions on Amazon S3

To create your IAM policy with Amazon S3 permissions, complete the following steps:

  1. On the IAM console, choose Policies.
  2. Choose Create policy.
  3. On the JSON tab, enter the policy code from s3_event_notifications_iam_policy_s3.json.
  4. Update the S3 bucket name.
  5. Choose Next: Tags.
  6. Choose Next: Review.
  7. For Name, enter s3_event_notifications_iam_policy_s3.
  8. Choose Create policy.

Create an IAM policy with permissions on Amazon SQS

To create your IAM policy with Amazon SQS permissions, complete the following steps:

  1. On the IAM console, choose Policies.
  2. Choose Create policy.
  3. On the JSON tab, enter the policy code from s3_event_notifications_iam_policy_sqs.json.
  4. Update the AWS account number.
  5. Choose Next: Tags.
  6. Choose Next: Review.
  7. For Name, enter s3_event_notifications_iam_policy_sqs.
  8. Choose Create policy.

Create an IAM role for the crawler

To create your IAM policy with for the AWS Glue crawler, complete the following steps:

  1. On the IAM console, choose Roles.
  2. Choose Create role.
  3. For Choose a use case, choose Glue.
  4. Choose Next: Permissions.
  5. Attach the two policies you just created: s3_event_notifications_iam_policy_s3 and s3_event_notifications_iam_policy_sqs.
  6. Attach the AWS managed policy AWSGlueServiceRole.
  7. Choose Next: Tags.
  8. Choose Next: Review.
  9. For Role name, enter s3_event_notifications_crawler_iam_role.
  10. Review to confirm that all three policies are attached.
  11. Choose Create role.

Create an SNS topic

To create your SNS topic, complete the following steps:

  1. On the Amazon SNS console, choose Topics.
  2. Choose Create topic.
  3. For Type, choose Standard (FIFO isn’t supported).
  4. For Name, enter s3_event_notifications_topic.
  5. Choose Create topic.
  6. On the Access policy tab, choose Advanced.
  7. Enter the policy contents from s3_event_notifications_sns_topic_access_policy.json.
  8. Update the account number and S3 bucket.
  9. Choose Create topic.

Create an SQS queue

To create your SQS queue, complete the following steps.

  1. On the Amazon SQS console, choose Create a queue.
  2. For Type, choose Standard.
  3. For Name, enter s3_event_notifications_queue.
  4. Keep the remaining settings at their default.
  5. On the Access policy tab, choose Advanced.
  6. Enter the policy contents from s3_event_notifications_sqs_queue_policy.json.
  7. Update the account number.
  8. Choose Create queue.
  9. On the SNS subscription tab, choose Subscribe to SNS topic.
  10. Choose the topic you created, s3_event_notifications_topic.
  11. Choose Save.

Create event notifications on the S3 bucket

To create event notifications for your S3 bucket, complete the following steps:

  1. Navigate to the Properties tab of the S3 bucket you created.
  2. In the Event notifications section, choose Create event notification.
  3. For Event name, enter crawler_event.
  4. For Prefix, enter nyc_data/.
  5. For Event Types, choose All Object Create Event.
  6. For Destination, choose SNS topic and the topic s3_event_notifications_topic.

Create a crawler

To create your AWS Glue crawler, complete the following steps:

  1. On the AWS Glue console, choose Crawlers.
  2. Choose Add crawler.
  3. For Crawler name, enter s3_event_notifications_crawler.
  4. Choose Next.
  5. For Crawler source type, choose data stores.
  6. For Repeat crawls of S3 data stores, choose Crawl changes identified by Amazon S3 events.
  7. Choose Next.
  8. For Include path, enter an S3 path.
  9. For Include SQS ARN, add your Amazon SQS ARN.

Including a dead-letter queue is optional; we skip it for this post. Dead-letter queues help you isolate problematic event notifications a crawler can’t process successfully. To understand general benefits of dead-letter queues and how it gets messages from the main SQS queue, refer to Amazon SQS dead-letter queues.

  1. Choose Next.
  2. When asked to add another data store, choose No.
  3. For IAM role, select “Choose an existing role” and enter the IAM role created above.
  4. Choose Next.
  5. For Frequency, choose Run on demand.
  6. Choose Next.
  7. Under Database, choose Add database.
  8. For Database name, enter s3_event_notifications_database.
  9. Choose Create.
  10. Choose Next.
  11. Choose Finish to create your crawler.

Test the solution

The following steps show how adding new objects triggers an event notification that propagates to Amazon SQS, which the crawler uses on subsequent runs. For sample data, we use NYC taxi records from January and February, 2020.

  1. Download the following datasets:
    1. green_tripdata_2020-01.csv
    2. green_tripdata_2020-02.csv
  2. On the Amazon S3 console, navigate to the bucket you created earlier.
  3. Create a folder called nyc_data.
  4. Create a subfolder called dt=202001.

This sends a notification to the SNS topic, and a message is sent to the SQS queue.

  1. In the folder dt=202001, upload the file green_tripdata_2020-01.csv.
  2. To validate that this step generated an S3 event notification, navigate to the queue on the Amazon SQS console.
  3. Choose Send and receive messages.
  4. Under Receive messages, Messages available should show as 1.
  5. Return to the Crawlers page on the AWS Glue console and select the crawler s3_event_notifications_crawler.
  6. Choose Run crawler. After a few seconds, the crawler status changes to Starting and then to Running. The crawler should complete in 1–2 minutes and display a success message.
  7. Confirm that a new table, nyc_data, is in your database.
  8. Choose the table to verify its schema.

The dt column is marked as a partition key.

  1. Choose View partitions to see partition details.
  2. To validate that the crawler consumed this event, navigate to the queue on the Amazon SQS console and choose Send and receive messages.
  3. Under Receive messages, Messages available should show as 0.

Now upload another file and see how the S3 event triggers a crawler to run.

  1. On the Amazon S3 console, in your nyc_data folder, create the subfolder dt=202002.
  2. Upload the file green_tripdata_2020-02.csv to this subfolder.
  3. Run the crawler again and wait for the success message.
  4. Return to the AWS Glue table and choose View partitions to see a new partition added.

Additional notes

Keep in mind the following when using this solution:

Clean up

When you’re finished evaluating this feature, you should delete the SNS topic and SQS queue, AWS Glue crawler, and S3 bucket and objects to avoid any further charges.

Conclusion

In this post, we discussed a new way for AWS Glue crawlers to use S3 Event Notifications to reduce the time and cost needed to incrementally process table data updates in the AWS Glue Data Catalog. We discussed two design patterns to implement this approach. The first pattern publishes events directly to an SQS queue, which is useful when only the crawler needs these events. The second pattern publishes events to an SNS topic, which are forwarded to an SQS queue for the crawler to process. This is useful when other applications also depend on these events. We also discussed how to implement the second design pattern to incrementally crawl your data. Incremental crawlers using S3 event notifications reduces the runtime and cost of your crawlers for large and frequently changing tables.

Let us know your feedback in the comments section. Happy crawling!


About the Authors

Pradeep Patel is a Sr. Software Engineer at AWS Glue. He is passionate about helping customers solve their problems by using the power of the AWS Cloud to deliver highly scalable and robust solutions. In his spare time, he loves to hike and play with web applications.

Vivek Shrivastava is a Principal Data Architect, Data Lake in AWS Professional Services. He is a Bigdata enthusiast and holds 13 AWS Certifications. He is passionate about helping customers build scalable and high-performance data analytics solutions in the cloud. In his spare time, he loves reading and finds areas for home automation.

Ravi Itha is a Sr. Data Architect at AWS. He works with customers to design and implement data lakes, analytics, and microservices on AWS. He is an open-source committer and has published more than a dozen solutions using AWS CDK, AWS Glue, AWS Lambda, AWS Step Functions, Amazon ECS, Amazon MQ, Amazon SQS, Amazon Kinesis Data Streams, and Amazon Kinesis Data Analytics for Apache Flink. His solutions can be found at his GitHub handle. Outside of work, he is passionate about books, cooking, movies, and yoga.

Build data lineage for data lakes using AWS Glue, Amazon Neptune, and Spline

Post Syndicated from Khoa Nguyen original https://aws.amazon.com/blogs/big-data/build-data-lineage-for-data-lakes-using-aws-glue-amazon-neptune-and-spline/

Data lineage is one of the most critical components of a data governance strategy for data lakes. Data lineage helps ensure that accurate, complete and trustworthy data is being used to drive business decisions. While a data catalog provides metadata management features and search capabilities, data lineage shows the full context of your data by capturing in greater detail the true relationships between data sources, where the data originated from and how it gets transformed and converged. Different personas in the data lake benefit from data lineage:

  • For data scientists, the ability to view and track data flow as it moves from source to destination helps you easily understand the quality and origin of a particular metric or dataset
  • Data platform engineers can get more insights into the data pipelines and the interdependencies between datasets
  • Changes in data pipelines are easier to apply and validate because engineers can identify a job’s upstream dependencies and downstream usage to properly evaluate service impacts

As the complexity of data landscape grows, customers are facing significant manageability challenges in capturing lineage in a cost-effective and consistent manner. In this post, we walk you through three steps in building an end-to-end automated data lineage solution for data lakes: lineage capturing, modeling and storage and finally visualization.

In this solution, we capture both coarse-grained and fine-grained data lineage. Coarse-grained data lineage, which often targets business users, focuses on capturing the high-level business processes and overall data workflows. Typically, it captures and visualizes the relationships between datasets and how they’re propagated across storage tiers, including extract, transform and load (ETL) jobs and operational information. Fine-grained data lineage gives access to column-level lineage and the data transformation steps in the processing and analytical pipelines.

Solution overview

Apache Spark is one of the most popular engines for large-scale data processing in data lakes. Our solution uses the Spline agent to capture runtime lineage information from Spark jobs, powered by AWS Glue. We use Amazon Neptune, a purpose-built graph database optimized for storing and querying highly connected datasets, to model lineage data for analysis and visualization.

The following diagram illustrates the solution architecture. We use AWS Glue Spark ETL jobs to perform data ingestion, transformation, and load. The Spline agent is configured in each AWS Glue job to capture lineage and run metrics, and sends such data to a lineage REST API. This backend consists of producer and consumer endpoints, powered by Amazon API Gateway and AWS Lambda functions. The producer endpoints process the incoming lineage objects before storing them in the Neptune database. We use consumer endpoints to extract specific lineage graphs for different visualizations in the frontend application. We perform ad hoc interactive analysis on the graph through Neptune notebooks.

Solution Architecture

We provide sample code and Terraform deployment scripts on GitHub to quickly deploy this solution to the AWS Cloud.

Data lineage capturing

The Spline agent is an open-source project that can harvest data lineage automatically from Spark jobs at runtime, without the need to modify the existing ETL code. It listens to Spark’s query run events, extracts lineage objects from the job run plans and sends them to a preconfigured backend (such as HTTP endpoints). The agent also automatically collects job run metrics such as the number of output rows. As of this writing, the Spline agent works only with Spark SQL (DataSet/DataFrame APIs) and not with RDDs/DynamicFrames.

The following screenshot shows how to integrate the Spline agent with AWS Glue Spark jobs. The Spline agent is an uber JAR that needs to be added to the Java classpath. The following configurations are required to set up the Spline agent:

  • spark.sql.queryExecutionListeners configuration is used to register a Spline listener during its initialization.
  • spark.spline.producer.url specifies the address of the HTTP server that the Spline agent should send lineage data to.

Spline Agent Configuration on AWS Glue

We build a data lineage API that is compatible with the Spline agent. This API facilitates the insertion of lineage data to Neptune database and graph extraction for visualization. The Spline agent requires three HTTP endpoints:

  • /status – For health checks
  • /execution-plans – For sending the captured Spark execution plans after the jobs are submitted to run
  • /execution-events – For sending the job’s run metrics when the job is complete

We also create additional endpoints to manage various metadata of the data lake, such as the names of the storage layers and dataset classification.

When a Spark SQL statement is run or a DataFrame action is called, Spark’s optimization engine, namely Catalyst, generates different query plans: a logical plan, optimized logical plan and physical plan, which can be inspected using the EXPLAIN statement. In a job run, the Spline agent parses the analyzed logical plan to construct a JSON lineage object. The object consists of the following:

  • A unique job run ID
  • A reference schema (attribute names and data types)
  • A list of operations
  • Other system metadata such as Spark version and Spline agent version

A run plan specifies the steps the Spark job performs, from reading data sources, applying different transformations, to finally persisting the job’s output into a storage location.

To sum up, the Spline agent captures not only the metadata of the job (such as job name and run date and time), the input and output tables (such as data format, physical location and schema) but also detailed information about the business logic (SQL-like operations that the job performs, such as join, filter, project and aggregate).

Data modeling and storage

Data modeling starts with the business requirements and use cases and maps those needs into a structure for storing and organizing our data. In data lineage for data lakes, the relationships between data assets (jobs, tables and columns) are as important as the metadata of those. As a result, graph databases are suitable to model such highly connected entities, making it efficient to understand the complex and deep network of relationships within the data.

Neptune is a fast, reliable, fully managed graph database service that makes it easy to build and run applications with highly connected datasets. You can use Neptune to create sophisticated, interactive graph applications that can query billions of relationships in milliseconds. Neptune supports three popular graph query languages: Apache TinkerPop Gremlin and openCypher for property graphs and SPARQL for W3C’s RDF data model. In this solution, we use the property graph’s primitives (including vertices, edges, labels and properties) to model the objects and use the gremlinpython library to interact with the graphs.

The objective of our data model is to provide an abstraction for data assets and their relationships within the data lake. In the producer Lambda functions, we first parse the JSON lineage objects to form logical entities such as jobs, tables and operations before constructing the final graph in Neptune.

Lineage Processing

The following diagram shows a sample data model used in this solution.

Lineage Data Model

This data model allows us to easily traverse the graph to extract coarse-grained and fine-grained data lineage, as mentioned earlier.

Data lineage visualization

You can extract specific views of the lineage graph from Neptune using the consumer endpoints backed by Lambda functions. Hierarchical views of lineage at different levels make it easy for the end-user to analyze the information.

The following screenshot shows a data lineage view across all jobs and tables.

Lineage Visualization Overall

The following screenshot shows a view of a specific job plan.

Lineage Visualization at Job Level

The following screenshot shows a detailed look into the operations taken by the job.

Lineage Visualization at Execution Level

The graphs are visualized using the vis.js network open-source project. You can interact with the graph elements to learn more about the entity’s properties, such as data schema.

Conclusion

In this post, we showed you architectural design options to automatically collect end-to-end data lineage for AWS Glue Spark ETL jobs across a data lake in a multi-account AWS environment using Neptune and the Spline agent. This approach enables searchable metadata, helps to draw insights and achieve an improved organization-wide data lineage posture. The proposed solution uses AWS managed and serverless services, which are scalable and configurable for high availability and performance.

For more information about this solution, see Github. You may modify the code to extend the data model and APIs.


About the Authors

Khoa Nguyen is a Senior Big Data Architect at Amazon Web Services. He works with large enterprise customers and AWS partners to accelerate customers’ business outcomes by providing expertise in Big Data and AWS services.

Krithivasan Balasubramaniyan is a Principal Consultant at Amazon Web Services. He enables global enterprise customers in their digital transformation journey and helps architect cloud native solutions.

Rahul Shaurya is a Senior Big Data Architect at Amazon Web Services. He helps and works closely with customers building data platforms and analytical applications on AWS.

Insights for CTOs: Part 3 – Growing your business with modern data capabilities

Post Syndicated from Syed Jaffry original https://aws.amazon.com/blogs/architecture/insights-for-ctos-part-3-growing-your-business-with-modern-data-capabilities/

This post was co-wrtiten with Jonathan Hwang, head of Foundation Data Analytics at Zendesk.


In my role as a Senior Solutions Architect, I have spoken to chief technology officers (CTOs) and executive leadership of large enterprises like big banks, software as a service (SaaS) businesses, mid-sized enterprises, and startups.

In this 6-part series, I share insights gained from various CTOs and engineering leaders during their cloud adoption journeys at their respective organizations. I have taken these lessons and summarized architecture best practices to help you build and operate applications successfully in the cloud. This series also covers building and operating cloud applications, security, cloud financial management, modern data and artificial intelligence (AI), cloud operating models, and strategies for cloud migration.

In Part 3, I’ve collaborated with the head of Foundation Analytics at Zendesk, Jonathan Hwang, to show how Zendesk incrementally scaled their data and analytics capabilities to effectively use the insights they collect from customer interactions. Read how Zendesk built a modern data architecture using Amazon Simple Storage Service (Amazon S3) for storage, Apache Hudi for row-level data processing, and AWS Lake Formation for fine-grained access control.

Why Zendesk needed to build and scale their data platform

Zendesk is a customer service platform that connects over 100,000 brands with hundreds of millions of customers via telephone, chat, email, messaging, social channels, communities, review sites, and help centers. They use data from these channels to make informed business decisions and create new and updated products.

In 2014, Zendesk’s data team built the first version of their big data platform in their own data center using Apache Hadoop for incubating their machine learning (ML) initiative. With that, they launched Answer Bot and Zendesk Benchmark report. These products were so successful they soon overwhelmed the limited compute resources available in the data center. By the end of 2017, it was clear Zendesk needed to move to the cloud to modernize and scale their data capabilities.

Incrementally modernizing data capabilities

Zendesk built and scaled their workload to use data lakes on AWS, but soon encountered new architecture challenges:

  • The General Data Protection Regulation (GDPR) “right to be forgotten” rule made it difficult and costly to maintain data lakes, because deleting a small piece of data required reprocessing large datasets.
  • Security and governance was harder to manage when data lake scaled to a larger number of users.

The following sections show you how Zendesk is addressing GDPR rules by evolving from plain Apache Parquet files on Amazon S3 to Hudi datasets on Amazon S3 to enable row level inserts/updates/deletes. To address security and governance, Zendesk is migrating to AWS Lake Formation centralized security for fine-grained access control at scale.

Zendesk’s data platform

Figure 1 shows Zendesk’s current data platform. It consists of three data pipelines: “Data Hub,” “Data Lake,” and “Self Service.”

Zendesk data pipelines

Figure 1. Zendesk data pipelines

Data Lake pipelines

The Data Lake and Data Hub pipelines cover the entire lifecycle of the data from ingestion to consumption.

The Data Lake pipelines consolidate the data from Zendesk’s highly distributed databases into a data lake for analysis.

Zendesk uses Amazon Database Migration Service (AWS DMS) for change data capture (CDC) from over 1,800 Amazon Aurora MySQL databases in eight AWS Regions. It detects transaction changes and applies them to the data lake using Amazon EMR and Hudi.

Zendesk ticket data consists of over 10 billion events and petabytes of data. The data lake files in Amazon S3 are transformed and stored in Apache Hudi format and registered on the AWS Glue catalog to be available as data lake tables for analytics querying and consumption via Amazon Athena.

Data Hub pipelines

The Data Hub pipelines focus on real-time events and streaming analytics use cases with Apache Kafka. Any application at Zendesk can publish events to a global Kafka message bus. Apache Flink ingests these events into Amazon S3.

The Data Hub provides high-quality business data that is highly available and scalable.

Self-managed pipeline

The self-managed pipelines empower product engineering teams to use the data lake for those use cases that don’t fit into our standard integration patterns. All internal Zendesk product engineering teams can use standard tools such as Amazon EMR, Amazon S3, Athena, and AWS Glue to publish their own analytics dataset and share them with other teams.

A notable example of this is Zendesk’s fraud detection engineering team. They publish their fraud detection data and findings through our self-manage data lake platform and use Amazon QuickSight for visualization.

You need fine-grained security and compliance

Data lakes can accelerate growth through faster decision making and product innovation. However, they can also bring new security and compliance challenges:

  • Visibility and auditability. Who has access to what data? What level of access do people have and how/when and who is accessing it?
  • Fine-grained access control. How do you define and enforce least privilege access to subsets of data at scale without creating bottlenecks or key person/team dependencies?

Lake Formation helps address these concerns by auditing data access and offering row- and column-level security and a delegated access control model to create data stewards for self-managed security and governance.

Zendesk used Lake Formation to build a fine-grained access control model that uses row-level security. It detects personally identifiable information (PII) while scaling the data lake for self-managed consumption.

Some Zendesk customers opt out of having their data included in ML or market research. Zendesk uses Lake Formation to apply row-level security to filter out records associated with a list of customer accounts who have opted out of queries. They also help data lake users understand which data lake tables contain PII by automatically detecting and tagging columns in the data catalog using AWS Glue’s PII detection algorithm.

The value of real-time data processing

When you process and consume data closer to the time of its creation, you can make faster decisions. Streaming analytics design patterns, implemented using services like Amazon Managed Streaming for Apache Kafka (Amazon MSK) or Amazon Kinesis, create an enterprise event bus to exchange data between heterogeneous applications in near real time.

For example, it is common to use streaming to augment the traditional database CDC ingestion into the data lake with additional streaming ingestion of application events. CDC is a common data ingestion pattern, but the information can be too low level. This requires application context to be reconstructed in the data lake and business logic to be duplicated in two places, inside the application and in the data lake processing layer. This creates a risk of semantic misrepresentation of the application context.

Zendesk faced this challenge with their CDC data lake ingestion from their Aurora clusters. They created an enterprise event bus built with Apache Kafka to augment their CDC with higher-level application domain events to be exchanged directly between heterogeneous applications.

Zendesk’s streaming architecture

A CDC database ticket table schema can sometimes contain unnecessary and complex attributes that are application specific and do not capture the domain model of the ticket. This makes it hard for downstream consumers to understand and use the data. A ticket domain object may span several database tables when modeled in third normal form, which makes querying for analysts difficult downstream. This is also a brittle integration method because downstream data consumers can easily be impacted when the application logic changes, which makes it hard to derive a common data view.

To move towards event-based communication between microservices, Zendesk created the Platform Data Architecture (PDA) project, which uses a standard object model to represent a higher level, semantic view of their application data. Standard objects are domain objects designed for cross-domain communication and do not suffer from the lower level fragmented scope of database CDC. Ultimately, Zendesk aims to transition their data architecture from a collection of isolated products and data silos into a cohesive unified data platform.

An application view of Zendesk’s streaming architecture

Figure 2. An application view of Zendesk’s streaming architecture

Figure 3 shows how all Zendesk products and users integrate through common standard objects and standard events within the Data Hub. Applications publish and consume standard objects and events to/from the event bus.

For example, a complete ticket standard object will be published to the message bus whenever it is created, updated, or changed. On the consumption side, these events get used by product teams to enable platform capabilities such as search, data export, analytics, and reporting dashboards.

Summary

As Zendesk’s business grew, their data lake evolved from simple Parquet files on Amazon S3 to a modern Hudi-based incrementally updateable data lake. Now, their original coarse-grained IAM security policies use fine-grained access control with Lake Formation.

We have repeatedly seen this incremental architecture evolution achieve success because it reduces the business risk associated with the change and provides sufficient time for your team to learn and evaluate cloud operations and managed services.

Looking for more architecture content? AWS Architecture Center provides reference architecture diagrams, vetted architecture solutions, Well-Architected best practices, patterns, icons, and more!

Other posts in this series

Mainframe offloading and modernization: Using mainframe data to build cloud native services with AWS

Post Syndicated from Malathi Pinnamaneni original https://aws.amazon.com/blogs/architecture/mainframe-offloading-and-modernization-using-mainframe-data-to-build-cloud-native-services-with-aws/

Many companies in the financial services and insurance industries rely on mainframes for their most business-critical applications and data. But mainframe workloads typically lack agility. This is one reason that organizations struggle to innovate, iterate, and pivot quickly to develop new applications or release new capabilities. Unlocking this mainframe data can be the first step in your modernization journey.

In this blog post, we will discuss some typical offloading patterns. Whether your goal is developing new applications using mainframe data or modernizing with the Strangler Fig Application pattern, you might want some guidance on how to begin.

Refactoring mainframe applications to the cloud

Refactoring mainframe applications to cloud-native services on AWS is a common industry pattern and a long-term goal for many companies to remain competitive. But this takes an investment of time, money, and organizational change management to realize the full benefits. We see customers start their modernization journey by offloading data from the mainframe to AWS to reduce risks and create new capabilities.

The mainframe data offloading patterns that we will discuss in this post use software services that facilitate data replication to Amazon Web Services (AWS):

  • File-based data synchronization
  • Change data capture
  • Event-sourced replication

Once data is liberated from the mainframe, you can develop new agile applications for deeper insights using analytics and machine learning (ML). You could create a microservices-based, or voice-based mobile application. For example, if a bank could access their historical mainframe data to analyze customer behavior, they could develop a new solution based on profiles to use for loan recommendations.

The patterns we illustrate can be used as a reference to begin your modernization efforts with reduced risk. The long-term goal is to rewrite the mainframe applications and modernize them workload by workload.

Solution overview: Mainframe offloading and modernization

This figure shows the flow of data being replicated from mainframe using integration services and consumed in AWS

Figure 1. Mainframe offloading and modernization conceptual flow

Mainframe modernization: Architecture reference patterns

File-based batch integration

Modernization scenarios often require replicating files to AWS, or synchronizing between on-premises and AWS. Use cases include:

  • Analyzing current and historical data to enhance business analytics
  • Providing data for further processing on downstream or upstream dependent systems. This is necessary for exchanging data between applications running on the mainframe and applications running on AWS
This diagram shows a file-based integration pattern on how data can be replicated to AWS for interactive data analytics

Figure 2. File-based batch ingestion pattern for interactive data analytics

File-based batch integration – Batch ingestion for interactive data analytics (Figure 2)

  1. Data ingestion. In this example, we show how data can be ingested to Amazon S3 using AWS Transfer Family Services or AWS DataSync. Mainframe data is typically encoded in extended binary-coded decimal interchange code (EBCDIC) format. Prescriptive guidance exists to convert EBCDIC to ASCII format.
  2. Data transformation. Before moving data to AWS data stores, transformation of the data may be necessary to use it for analytics. AWS analytics services like AWS Glue and AWS Lambda can be used to transform the data. For large volume processing, use Apache Spark on AWS Elastic Map Reduce (Amazon EMR), or a custom Spring Boot application running on Amazon EC2 to perform these transformations. This process can be orchestrated using AWS Step Functions or AWS Data Pipeline.
  3. Data store. Data is transformed into a consumable format that can be stored in Amazon S3.
  4. Data consumption. You can use AWS analytics services like Amazon Athena for interactive ad-hoc query access, Amazon QuickSight for analytics, and Amazon Redshift for complex reporting and aggregations.
This diagram shows a file-based integration pattern on how data can be replicated to AWS for further processing by downstream systems

Figure 3. File upload to operational data stores for further processing

File-based batch integration – File upload to operational data stores for further processing (Figure 3)

  1. Using AWS File Transfer Services, upload CSV files to Amazon S3.
  2. Once the files are uploaded, S3’s event notification can invoke AWS Lambda function to load to Amazon Aurora. For low latency data access requirements, you can use a scalable serverless import pattern with AWS Lambda and Amazon SQS to load into Amazon DynamoDB.
  3. Once the data is in data stores, it can be consumed for further processing.

Transactional replication-based integration (Figure 4)

Several modernization scenarios require continuous near-real-time replication of relational data to keep a copy of the data in the cloud. Change Data Capture (CDC) for near-real-time transactional replication works by capturing change log activity to drive changes in the target dataset. Use cases include:

  • Command Query Responsibility Segregation (CQRS) architectures that use AWS to service all read-only and retrieve functions
  • On-premises systems with tightly coupled applications that require a phased modernization
  • Real-time operational analytics
This diagram shows a transaction-based replication (CDC) integration pattern on how data can be replicated to AWS for building reporting and read-only functions

Figure 4. Transactional replication (CDC) pattern

  1. Partner CDC tools in the AWS Marketplace can be used to manage real-time data movement between the mainframe and AWS.
  2. You can use a fan-out pattern to read once from the mainframe to reduce processing requirements and replicate data to multiple data stores based on your requirements:
    • For low latency requirements, replicate to Amazon Kinesis Data Streams and use AWS Lambda to store in Amazon DynamoDB.
    • For critical business functionality with complex logic, use Amazon Aurora or Amazon Relational Database Service (RDS) as targets.
    • To build data lake or use as an intermediary for ETL processing, customers can replicate to S3 as target.
  3. Once the data is in AWS, customers can build agile microservices for read-only functions.

Message-oriented middleware (event sourcing) integration (Figure 5)

With message-oriented middleware (MOM) systems like IBM MQ on mainframe, several modernization scenarios require integrating with cloud-based streaming and messaging services. These act as a buffer to keep your data in sync. Use cases include:

  • Consume data from AWS data stores to enable new communication channels. Examples of new channels can be mobile or voice-based applications and can be innovations based on ML
  • Migrate the producer (senders) and consumer (receivers) applications communicating with on-premises MOM platforms to AWS with an end goal to retire on-premises MOM platform
This diagram shows an event-sourcing integration reference pattern for customers using middleware systems like IBM MQ on-premises with AWS services

Figure 5. Event-sourcing integration pattern

  1. Mainframe transactions from IBM MQ can be read using a connector or a bridge solution. They can then be published to Amazon MQ queues or Amazon Managed Streaming for Apache Kakfa (MSK) topics.
  2. Once the data is published to the queue or topic, consumers encoded in AWS Lambda functions or Amazon compute services can process, map, transform, or filter the messages. They can store the data in Amazon RDS, Amazon ElastiCache, S3, or DynamoDB.
  3. Now that the data resides in AWS, you can build new cloud-native applications and do the following:

Conclusion

Mainframe offloading and modernization using AWS services enables you to reduce cost, modernize your architectures, and integrate your mainframe and cloud-native technologies. You’ll be able to inform your business decisions with improved analytics, and create new opportunities for innovation and the development of modern applications.

More posts for Women’s History Month!

Other ways to participate

Build a serverless pipeline to analyze streaming data using AWS Glue, Apache Hudi, and Amazon S3

Post Syndicated from Nikhil Khokhar original https://aws.amazon.com/blogs/big-data/build-a-serverless-pipeline-to-analyze-streaming-data-using-aws-glue-apache-hudi-and-amazon-s3/

Organizations typically accumulate massive volumes of data and continue to generate ever-exceeding data volumes, ranging from terabytes to petabytes and at times to exabytes of data. Such data is usually generated in disparate systems and requires an aggregation into a single location for analysis and insight generation. A data lake architecture allows you to aggregate data present in various silos, store it in a centralized repository, enforce data governance, and support analytics and machine learning (ML) on top of this stored data.

Typical building blocks to implement such an architecture include a centralized repository built on Amazon Simple Storage Service (Amazon S3) providing the least possible unit cost of storage per GB, big data ETL (extract, transform, and load) frameworks such as AWS Glue, and analytics using Amazon Athena, Amazon Redshift, and Amazon EMR notebooks.

Building such systems involves technical challenges. For example, data residing in S3 buckets can’t be updated in-place using standard data ingestion approaches. Therefore, you must perform constant ad-hoc ETL jobs to consolidate data into new S3 files and buckets.

This is especially the case with streaming sources, which require constant support for increasing data velocity to provide faster insights generation. An example use case might be an ecommerce company looking to build a real-time date lake. They need their solution to do the following:

  • Ingest continuous changes (like customer orders) from upstream systems
  • Capture tables into the data lake
  • Provide ACID properties on the data lake to support interactive analytics by enabling consistent views on data while new data is being ingested
  • Provide schema flexibility due to upstream data layout changes and provisions for late arrival of data

To deliver on these requirements, organizations have to build custom frameworks to handle in-place updates (also referred as upserts), handle small files created due to the continuous ingestion of changes from upstream systems (such as databases), handle schema evolution, and compromise on providing ACID guarantees on its data lake.

A processing framework like Apache Hudi can be a good way solve such challenges. Hudi allows you to build streaming data lakes with incremental data pipelines, with support for transactions, record-level updates, and deletes on data stored in data lakes. Hudi is integrated with various AWS analytics services, like AWS Glue, Amazon EMR, Athena, and Amazon Redshift. This helps you ingest data from a variety of sources via batch streaming while enabling in-place updates to an append-oriented storage system such as Amazon S3 (or HDFS). In this post, we discuss a serverless approach to integrate Hudi with a streaming use case and create an in-place updatable data lake on Amazon S3.

Solution overview

We use Amazon Kinesis Data Generator to send sample streaming data to Amazon Kinesis Data Streams. To consume this streaming data, we set up an AWS Glue streaming ETL job that uses the Apache Hudi Connector for AWS Glue to write ingested and transformed data to Amazon S3, and also creates a table in the AWS Glue Data Catalog.

After the data is ingested, Hudi organizes a dataset into a partitioned directory structure under a base path pointing to a location in Amazon S3. Data layout in these partitioned directories depends on the Hudi dataset type used during ingestion, such as Copy on Write (CoW) and Merge on Read (MoR). For more information about Hudi storage types, see Using Athena to Query Apache Hudi Datasets and Storage Types & Views.

CoW is the default storage type of Hudi. In this storage type, data is stored in columnar format (Parquet). Each ingestion creates a new version of files during a write. With CoW, each time there is an update to a record, Hudi rewrites the original columnar file containing the record with the updated values. Therefore, this is better suited for read-heavy workloads on data that changes less frequently.

The MoR storage type is stored using a combination of columnar (Parquet) and row-based (Avro) formats. Updates are logged to row-based delta files and are compacted to create new versions of columnar files. With MoR, each time there is an update to a record, Hudi writes only the row for the changed record into the row-based (Avro) format, which is compacted (synchronously or asynchronously) to create columnar files. Therefore, MoR is better suited for write or change-heavy workloads with a lesser amount of read.

For this post, we use the CoW storage type to illustrate our use case of creating a Hudi dataset and serving the same via a variety of readers. You can extend this solution to support MoR storage via selecting the specific storage type during ingestion. We use Athena to read the dataset. We also illustrate the capabilities of this solution in terms of in-place updates, nested partitioning, and schema flexibility.

The following diagram illustrates our solution architecture.

Create the Apache Hudi connection using the Apache Hudi Connector for AWS Glue

To create your AWS Glue job with an AWS Glue custom connector, complete the following steps:

  1. On the AWS Glue Studio console, choose Marketplace in the navigation pane.
  2. Search for and choose Apache Hudi Connector for AWS Glue.
  3. Choose Continue to Subscribe.

  4. Review the terms and conditions and choose Accept Terms.
  5. Make sure that the subscription is complete and you see the Effective date populated next to the product, then choose Continue to Configuration.
  6. For Delivery Method, choose Glue 3.0.
  7. For Software Version, choose the latest version (as of this writing, 0.9.0 is the latest version of the Apache Hudi Connector for AWS Glue).
  8. Choose Continue to Launch.
  9. Under Launch this software, choose Usage Instructions and then choose Activate the Glue connector for Apache Hudi in AWS Glue Studio.

You’re redirected to AWS Glue Studio.

  1. For Name, enter a name for your connection (for example, hudi-connection).
  2. For Description, enter a description.
  3. Choose Create connection and activate connector.

A message appears that the connection was successfully created, and the connection is now visible on the AWS Glue Studio console.

Configure resources and permissions

For this post, we provide an AWS CloudFormation template to create the following resources:

  • An S3 bucket named hudi-demo-bucket-<your-stack-id> that contains a JAR artifact copied from another public S3 bucket outside of your account. This JAR artifact is then used to define the AWS Glue streaming job.
  • A Kinesis data stream named hudi-demo-stream-<your-stack-id>.
  • An AWS Glue streaming job named Hudi_Streaming_Job-<your-stack-id> with a dedicated AWS Glue Data Catalog named hudi-demo-db-<your-stack-id>. Refer to the aws-samples github repository for the complete code of the job.
  • AWS Identity and Access Management (IAM) roles and policies with appropriate permissions.
  • AWS Lambda functions to copy artifacts to the S3 bucket and empty buckets first upon stack deletion.

To create your resources, complete the following steps:

  1. Choose Launch Stack:
  2. For Stack name, enter hudi-connector-blog-for-streaming-data.
  3. For HudiConnectionName, use the name you specified in the previous section.
  4. Leave the other parameters as default.
  5. Choose Next.
  6. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  7. Choose Create stack.

Set up Kinesis Data Generator

In this step, you configure Kinesis Data Generator to send sample data to a Kinesis data stream.

  1. On the Kinesis Data Generator console, choose Create a Cognito User with CloudFormation.

You’re redirected to the AWS CloudFormation console.

  1. On the Review page, in the Capabilities section, select I acknowledge that AWS CloudFormation might create IAM resources.
  2. Choose Create stack.
  3. On the Stack details page, in the Stacks section, verify that the status shows CREATE_COMPLETE.
  4. On the Outputs tab, copy the URL value for KinesisDataGeneratorUrl.
  5. Navigate to this URL in your browser.
  6. Enter the user name and password provided and choose Sign In.

Start an AWS Glue streaming job

To start an AWS Glue streaming job, complete the following steps:

  1. On the AWS CloudFormation console, navigate to the Resources tab of the stack you created.
  2. Copy the physical ID corresponding to the AWS::Glue::Job resource.
  3. On the AWS Glue Studio console, find the job name using the physical ID.
  4. Choose the job to review the script and job details.
  5. Choose Run to start the job.
  6. On the Runs tab, validate if the job is successfully running.

Send sample data to a Kinesis data stream

Kinesis Data Generator generates records using random data based on a template you provide. Kinesis Data Generator extends faker.js, an open-source random data generator.

In this step, you use Kinesis Data Generator to send sample data using a sample template using the faker.js documentation to the previously created data stream created at one record per second rate. You sustain the ingestion until the end of this tutorial to achieve reasonable data for analysis while performing the remaining steps.

  1. On the Kinesis Data Generator console, for Records per second, choose the Constant tab, and change the value to 1.
  2. For Record template, choose the Template 1 tab, and enter the following code sample into the text box:
    {
     "name" : "{{random.arrayElement(["Person1","Person2","Person3", "Person4"])}}",  
     "date": "{{date.utc(YYYY-MM-DD)}}",
     "year": "{{date.utc(YYYY)}}",
     "month": "{{date.utc(MM)}}",
     "day": "{{date.utc(DD)}}",
     "column_to_update_integer": {{random.number(1000000000)}},
     "column_to_update_string": "{{random.arrayElement(["White","Red","Yellow", "Silver"])}}" 
    }

  3. Choose Test template.
  4. Verify the structure of the sample JSON records and choose Close.
  5. Choose Send data.
  6. Leave the Kinesis Data Generator page open to ensure sustained streaming of random records into the data stream.

Continue through the remaining steps while you generate your data.

Verify dynamically created resources

While you’re generating data for analysis, you can verify the resources you created.

Amazon S3 dataset

When the AWS Glue streaming job runs, the records from the Kinesis data stream are consumed and stored in an S3 bucket. While creating Hudi datasets in Amazon S3, the streaming job can also create a nested partition structure. This is enabled through the usage of Hudi configuration properties hoodie.datasource.write.partitionpath.field and hoodie.datasource.write.keygenerator.class in the streaming job definition.

In this example, nested partitions have been created by name, year, month, and day. The values of these properties are set as follows in the script for the AWS Glue streaming job.

For further details on how CustomKeyGenerator works to generate such partition paths, refer to Apache Hudi Key Generators.

The following screenshot shows the nested partitions created in Amazon S3.

AWS Glue Data Catalog table

A Hudi table is also created in the AWS Glue Data Catalog and mapped to the Hudi datasets on Amazon S3. See the following code in the AWS Glue streaming job.

The following table provides more details on the configuration options.

hoodie.datasource.hive_sync.enable Indicates if the table is synced to Apache Hive Metastore.
hoodie.datasource.hive_sync.sync_as_datasource Avoids breaking changes introduced with HUDI-1415 (JIRA).
hoodie.datasource.hive_sync.database The database name for your Data Catalog.
hoodie.datasource.hive_sync.table The table name in your Data Catalog.
hoodie.datasource.hive_sync.use_jdbc Uses JDBC for Hive synchronization. For more information, see the GitHub repo.
hoodie.datasource.write.hive_style_partitioning Creates partitions with <partition_column_name>=<partition_value> format.
hoodie.datasource.hive_sync.partition_extractor_class Required for nested partitioning.
hoodie.datasource.hive_sync.partition_fields Columns in the table to use for Hive partition columns.

The following screenshot shows the Hudi table in the Data Catalog and the associated S3 bucket.

Read results using Athena

Using Hudi with an AWS Glue streaming job allows us to have in-place updates (upserts) on the Amazon S3 data lake. This functionality allows for incremental processing, which enables faster and more efficient downstream pipelines. Apache Hudi enables in-place updates with the following steps:

  1. Define an index (using columns of the ingested record).
  2. Use this index to map every subsequent ingestion to the record storage locations (in our case Amazon S3) ingested previously.
  3. Perform compaction (synchronously or asynchronously) to allow the retention of the latest record for a given index.

In reference to our AWS Glue streaming job, the following Hudi configuration options enable us to achieve in-place updates for the generated schema.

The following table provides more details of the highlighted configuration options.

hoodie.datasource.write.recordkey.field Indicates the column to be used within the ingested record for the Hudi index.
hoodie.datasource.write.operation Defines the nature of operation on the Hudi dataset. In this example, it’s set to upsert for in-place updates.
hoodie.datasource.write.table.type Indicates the Hudi storage type to be used. In this example, it’s set to COPY_ON_WRITE.
hoodie.datasource.write.precombine.field When two records have the same key value, Apache Hudi picks the one with the largest value for the precombined field.

To demonstrate an in-place update, consider the following input records sent to the AWS Glue streaming job via Kinesis Data Generator. The record identifier highlighted indicates the Hudi record key in the AWS Glue configuration. In this example, Person3 receives two updates. In first update, column_to_update_string is set to White; in the second update, it’s set to Red.

The streaming job processes these records and creates the Hudi datasets in Amazon S3. You can query the dataset using Athena. In the following example, we get the latest update.

Schema flexibility

The AWS Glue streaming job allows for automatic handling of different record schemas encountered during the ingestion. This is specifically useful in situations where record schemas can be subject to frequent changes. To elaborate on this point, consider the following scenario:

  • Case 1 – At time t1, the ingested record has the layout <col 1, col 2, col 3, col 4>
  • Case 2 – At time t2, the ingested record has an extra column, with new layout <col 1, col 2, col 3, col 4, col 5>
  • Case 3 – At time t3, the ingested record dropped the extra column and therefore has the layout <col 1, col 2, col 3, col 4>

For Case 1 and 2, the AWS Glue streaming job relies on the built-in schema evolution capabilities of Hudi, which enables an update to the Data Catalog with the extra column (col 5 in this case). Additionally, Hudi also adds an extra column in the output files (Parquet files written to Amazon S3). This allows for the querying engine (Athena) to query the Hudi dataset with an extra column without any issues.

Because Case 2 ingestion updates the Data Catalog, the extra column (col 5) is expected to be present in every subsequent ingested record. If we don’t resolve this difference, the job fails.

To overcome this and achieve Case 3, the streaming job defines a custom function named evolveSchema, which handles the record layout mismatches. The method queries the AWS Glue Data Catalog for each to-be-ingested record and gets the current Hudi table schema. It then merges the Hudi table schema with the schema of the to-be-ingested record and enriches the schema of the record before exposing with the Hudi dataset.

For this example, the to-be-ingested record’s schema <col 1, col 2, col 3, col 4> is modified to <col 1, col 2, col 3, col 4, col 5>, where the value of the extra col 5 is set to NULL.

To illustrate this, we stop the existing ingestion of Kinesis Data Generator and modify the record layout to send an extra column called new_column:

{
 "name" : "{{random.arrayElement(["Person1","Person2","Person3", "Person4"])}}",  
 "date": "{{date.utc(YYYY-MM-DD)}}",
 "year": "{{date.utc(YYYY)}}",
 "month": "{{date.utc(MM)}}",
 "day": "{{date.utc(DD)}}",
 "column_to_update_integer": {{random.number(1000000000)}},
 "column_to_update_string": "{{random.arrayElement(["White","Red","Yellow", "Silver"])}}",
 "new_column": "{{random.number(1000000000)}}" 
}

The Hudi table in the Data Catalog updates as follows, with the newly added column (Case 2).

When we query the Hudi dataset using Athena, we can see the presence of a new column.

We can now use Kinesis Data Generator to send records with an old schema—without the newly added column (Case 3).

In this scenario, our AWS Glue job keeps running. When we query using Athena, the extra added column gets populated with NULL values.

If we stop Kinesis Data Generator and start sending records with a schema containing extra columns, the job keeps running and the Athena query continues to return the latest values.

Clean up

To avoid incurring future charges, delete the resources you created as part of the CloudFormation stack.

Summary

This post illustrated how to set up a serverless pipeline using an AWS Glue streaming job with the Apache Hudi Connector for AWS Glue, which runs continuously and consumes data from Kinesis Data Streams to create a near-real-time data lake that supports in-place updates, nested partitioning, and schema flexibility.

You can also use Apache Kafka and Amazon Managed Streaming for Apache Kafka (Amazon MSK) as the source of a similar streaming job. We encourage you to use this approach for setting up a near-real-time data lake. As always, AWS welcomes feedback, so please leave your thoughts or questions in the comments.


About the Authors

Nikhil Khokhar is a Solutions Architect at AWS. He joined AWS in 2016 and specializes in building and supporting data streaming solutions that help customers analyze and get value out of their data. In his free time, he makes use of his 3D printing skills to solve everyday problems.

Dipta S Bhattacharya is a Solutions Architect Manager at AWS. Dipta joined AWS in 2018. He works with large startup customers to design and develop architectures on AWS and support their journey on the cloud.

How the Georgia Data Analytics Center built a cloud analytics solution from scratch with the AWS Data Lab

Post Syndicated from Kanti Chalasani original https://aws.amazon.com/blogs/big-data/how-the-georgia-data-analytics-center-built-a-cloud-analytics-solution-from-scratch-with-the-aws-data-lab/

This is a guest post by Kanti Chalasani, Division Director at Georgia Data Analytics Center (GDAC). GDAC is housed within the Georgia Office of Planning and Budget to facilitate governed data sharing between various state agencies and departments.

The Office of Planning and Budget (OPB) established the Georgia Data Analytics Center (GDAC) with the intent to provide data accountability and transparency in Georgia. GDAC strives to support the state’s government agencies, academic institutions, researchers, and taxpayers with their data needs. Georgia’s modern data analytics center will help to securely harvest, integrate, anonymize, and aggregate data.

In this post, we share how GDAC created an analytics platform from scratch using AWS services and how GDAC collaborated with the AWS Data Lab to accelerate this project from design to build in record time. The pre-planning sessions, technical immersions, pre-build sessions, and post-build sessions helped us focus on our objectives and tangible deliverables. We built a prototype with a modern data architecture and quickly ingested additional data into the data lake and the data warehouse. The purpose-built data and analytics services allowed us to quickly ingest additional data and deliver data analytics dashboards. It was extremely rewarding to officially release the GDAC public website within only 4 months.

A combination of clear direction from OPB executive stakeholders, input from the knowledgeable and driven AWS team, and the GDAC team’s drive and commitment to learning played a huge role in this success story. GDAC’s partner agencies helped tremendously through timely data delivery, data validation, and review.

We had a two-tiered engagement with the AWS Data Lab. In the first tier, we participated in a Design Lab to discuss our near-to-long-term requirements and create a best-fit architecture. We discussed the pros and cons of various services that can help us meet those requirements. We also had meaningful engagement with AWS subject matter experts from various AWS services to dive deeper into the best practices.

The Design Lab was followed by a Build Lab, where we took a smaller cross section of the bigger architecture and implemented a prototype in 4 days. During the Build Lab, we worked in GDAC AWS accounts, using GDAC data and GDAC resources. This not only helped us build the prototype, but also helped us gain hands-on experience in building it. This experience also helped us better maintain the product after we went live. We were able to continually build on this hands-on experience and share the knowledge with other agencies in Georgia.

Our Design and Build Lab experiences are detailed below.

Step 1: Design Lab

We wanted to stand up a platform that can meet the data and analytics needs for the Georgia Data Analytics Center (GDAC) and potentially serve as a gold standard for other government agencies in Georgia. Our objective with the AWS Data Design Lab was to come up with an architecture that meets initial data needs and provides ample scope for future expansion, as our user base and data volume increased. We wanted each component of the architecture to scale independently, with tighter controls on data access. Our objective was to enable easy exploration of data with faster response times using Tableau data analytics as well as build data capital for Georgia. This would allow us to empower our policymakers to make data-driven decisions in a timely manner and allow State agencies to share data and definitions within and across agencies through data governance. We also stressed on data security, classification, obfuscation, auditing, monitoring, logging, and compliance needs. We wanted to use purpose-built tools meant for specialized objectives.

Over the course of the 2-day Design Lab, we defined our overall architecture and picked a scaled-down version to explore. The following diagram illustrates the architecture of our prototype.

The architecture contains the following key components:

  • Amazon Simple Storage Service (Amazon S3) for raw data landing and curated data staging.
  • AWS Glue for extract, transform, and load (ETL) jobs to move data from the Amazon S3 landing zone to Amazon S3 curated zone in optimal format and layout. We used an AWS Glue crawler to update the AWS Glue Data Catalog.
  • AWS Step Functions for AWS Glue job orchestration.
  • Amazon Athena as a powerful tool for a quick and extensive SQL data analysis and to build a logical layer on the landing zone.
  • Amazon Redshift to create a federated data warehouse with conformed dimensions and star schemas for consumption by Tableau data analytics.

Step 2: Pre-Build Lab

We started with planning sessions to build foundational components of our infrastructure: AWS accounts, Amazon Elastic Compute Cloud (Amazon EC2) instances, an Amazon Redshift cluster, a virtual private cloud (VPC), route tables, security groups, encryption keys, access rules, internet gateways, a bastion host, and more. Additionally, we set up AWS Identity and Access Management (IAM) roles and policies, AWS Glue connections, dev endpoints, and notebooks. Files were ingested via secure FTP, or from a database to Amazon S3 using AWS Command Line Interface (AWS CLI). We crawled Amazon S3 via AWS Glue crawlers to build Data Catalog schemas and tables for quick SQL access in Athena.

The GDAC team participated in Immersion Days for training in AWS Glue, AWS Lake Formation, and Amazon Redshift in preparation for the Build Lab.

We defined the following as the success criteria for the Build Lab:

  • Create ETL pipelines from source (Amazon S3 raw) to target (Amazon Redshift). These ETL pipelines should create and load dimensions and facts in Amazon Redshift.
  • Have a mechanism to test the accuracy of the data loaded through our pipelines.
  • Set up Amazon Redshift in a private subnet of a VPC, with appropriate users and roles identified.
  • Connect from AWS Glue to Amazon S3 to Amazon Redshift without going over the internet.
  • Set up row-level filtering in Amazon Redshift based on user login.
  • Data pipelines orchestration using Step Functions.
  • Build and publish Tableau analytics with connections to our star schema in Amazon Redshift.
  • Automate the deployment using AWS CloudFormation.
  • Set up column-level security for the data in Amazon S3 using Lake Formation. This allows for differential access to data based on user roles to users using both Athena and Amazon Redshift Spectrum.

Step 3: Four-day Build Lab

Following a series of implementation sessions with our architect, we formed the GDAC data lake and organized downstream data pulls for the data warehouse with governed data access. Data was ingested in the raw data landing lake and then curated into a staging lake, where data was compressed and partitioned in Parquet format.

It was empowering for us to build PySpark Extract Transform Loads (ETL) AWS Glue jobs with our meticulous AWS Data Lab architect. We built reusable glue jobs for the data ingestion and curation using the code snippets provided. The days were rigorous and long, but we were thrilled to see our centralized data repository come into fruition so rapidly. Cataloging data and using Athena queries proved to be a fast and cost-effective way for data exploration and data wrangling.

The serverless orchestration with Step Functions allowed us to put AWS Glue jobs into a simple readable data workflow. We spent time designing for performance and partitioning data to minimize cost and increase efficiency.

Database access from Tableau and SQL Workbench/J were set up for my team. Our excitement only grew as we began building data analytics and dashboards using our dimensional data models.

Step 4: Post-Build Lab

During our post-Build Lab session, we closed several loose ends and built additional AWS Glue jobs for initial and historic loads and append vs. overwrite strategies. These strategies were picked based on the nature of the data in various tables. We returned for a second Build Lab to work on building data migration tasks from Oracle Database via VPC peering, file processing using AWS Glue DataBrew, and AWS CloudFormation for automated AWS Glue job generation. If you have a team of 4–8 builders looking for a fast and easy foundation for a complete data analytics system, I would highly recommend the AWS Data Lab.

Conclusion

All in all, with a very small team we were able to set up a sustainable framework on AWS infrastructure with elastic scaling to handle future capacity without compromising quality. With this framework in place, we are moving rapidly with new data feeds. This would not have been possible without the assistance of the AWS Data Lab team throughout the project lifecycle. With this quick win, we decided to move forward and build AWS Control Tower with multiple accounts in our landing zone. We brought in professionals to help set up infrastructure and data compliance guardrails and security policies. We are thrilled to continually improve our cloud infrastructure, services and data engineering processes. This strong initial foundation has paved the pathway to endless data projects in Georgia.


About the Author

Kanti Chalasani serves as the Division Director for the Georgia Data Analytics Center (GDAC) at the Office of Planning and Budget (OPB). Kanti is responsible for GDAC’s data management, analytics, security, compliance, and governance activities. She strives to work with state agencies to improve data sharing, data literacy, and data quality through this modern data engineering platform. With over 26 years of experience in IT management, hands-on data warehousing, and analytics experience, she thrives for excellence.

Vishal Pathak is an AWS Data Lab Solutions Architect. Vishal works with customers on their use cases, architects solutions to solve their business problems, and helps them build scalable prototypes. Prior to his journey with AWS, Vishal helped customers implement BI, data warehousing, and data lake projects in the US and Australia.

Audit AWS service events with Amazon EventBridge and Amazon Kinesis Data Firehose

Post Syndicated from Anand Shah original https://aws.amazon.com/blogs/big-data/audit-aws-service-events-with-amazon-eventbridge-and-amazon-kinesis-data-firehose/

Amazon EventBridge is a serverless event bus that makes it easy to build event-driven applications at scale using events generated from your applications, integrated software as a service (SaaS) applications, and AWS services. Many AWS services generate EventBridge events. When an AWS service in your account emits an event, it goes to your account’s default event bus.

The following are a few event examples:

By default, these AWS service-generated events are transient and therefore not retained. This post shows how you can forward AWS service-generated events or custom events to Amazon Simple Storage Service (Amazon S3) for long-term storage, analysis, and auditing purposes using EventBridge rules and Amazon Kinesis Data Firehose.

Solution overview

In this post, we provide a working example of AWS service-generated events ingested to Amazon S3. To make sure we have some service events available in default event bus, we use Parameter Store, a capability of AWS Systems Manager to store new parameters manually. This action generates a new event, which is ingested by the following pipeline.

Architecture Diagram

The pipeline includes the following steps:

  1. AWS service-generated events (for example, a new parameter created in Parameter Store) goes to the default event bus at EventBridge.
  2. The EventBridge rule matches all events and forwards those to Kinesis Data Firehose.
  3. Kinesis Data Firehose delivers events to the S3 bucket partitioned by detail-type and receipt time using its dynamic partitioning capability.
  4. The S3 bucket stores the delivered events, and their respective event schema is registered to the AWS Glue Data Catalog using an AWS Glue crawler.
  5. You query events using Amazon Athena.

Deploy resources using AWS CloudFormation

We use AWS CloudFormation templates to create all the necessary resources for the ingestion pipeline. This removes opportunities for manual error, increases efficiency, and provides consistent configurations over time. The template is also available on GitHub.

Complete the following steps:

  1. Click here to
    Launch Stack
  2. Acknowledge that the template may create AWS Identity and Access Management (IAM) resources.
  3. Choose Create stack.

The template takes about 10 minutes to complete and creates the following resources in your AWS account:

  • An S3 bucket to store event data.
  • A Firehose delivery stream with dynamic partitioning configuration. Dynamic partitioning enables you to continuously partition streaming data in Kinesis Data Firehose by using keys within the data (for example, customer_id or transaction_id) and then deliver the data grouped by these keys into corresponding S3 prefixes.
  • An EventBridge rule that forwards all events from the default event bus to Kinesis Data Firehose.
  • An AWS Glue crawler that references the path to the event data in the S3 bucket. The crawler inspects data landed to Amazon S3 and registers tables as per the schema with the AWS Glue Data Catalog.
  • Athena named queries for you to query the data processed by this example.

Trigger a service event

After you create the CloudFormation stack, you trigger a service event.

  1. On the AWS CloudFormation console, navigate to the Outputs tab for the stack.
  2. Choose the link for the key CreateParameter.

Create Parameter

You’re redirected to the Systems Manager console to create a new parameter.

  1. For Name, enter a name (for example, my-test-parameter).
  2. For Value, enter the test value of your choice (for example, test-value).

My Test parameter

  1. Leave everything else as default and choose Create parameter.

This step saves the new Systems Manager parameter and pushes the parameter-created event to the default EventBridge event bus, as shown in the following code:

{
  "version": "0",
  "id": "6a7e4feb-b491-4cf7-a9f1-bf3703497718",
  "detail-type": "Parameter Store Change",
  "source": "aws.ssm",
  "account": "123456789012",
  "time": "2017-05-22T16:43:48Z",
  "region": "us-east-1",
  "resources": [
    "arn:aws:ssm:us-east-1:123456789012:parameter/foo"
  ],
  "detail": {
    "operation": "Create",
    "name": "my-test-parameter",
    "type": "String",
    "description": ""
  }
}

Discover the event schema

After the event is triggered by saving the parameter, wait at least 2 minutes for the event to be ingested via Kinesis Data Firehose to the S3 bucket. Now complete the following steps to run an AWS Glue crawler to discover and register the event schema in the Data Catalog:

  1. On the AWS Glue console, choose Crawlers in the navigation pane.
  2. Select the crawler with the name starting with S3EventDataCrawler.
  3. Choose Run crawler.

Run Crawler

This step runs the crawler, which takes about 2 minutes to complete. The crawler discovers the schema from all events and registers it as tables in the Data Catalog.

Query the event data

When the crawler is complete, you can start querying event data. To query the event, complete the following steps:

  1. On the AWS CloudFormation console, navigate to the Outputs tab for your stack.
  2. Choose the link for the key AthenaQueries.

Athena Queries

You’re redirected to the Saved queries tab on the Athena console. If you’re running Athena queries for the first time, set up your S3 output bucket. For instructions, see Working with Query Results, Recent Queries, and Output Files.

  1. Search for Blog to find the queries created by this post.
  2. Choose the query Blog – Query Parameter Store Events.

Find Athena Saved Queries

The query opens on the Athena console.

  1. Choose Run query.

You can update the query to search the event you created earlier.

  1. Apply a WHERE clause with the parameter name you selected earlier:
SELECT * FROM "AwsDataCatalog"."eventsdb-randomId"."parameter_store_change"
WHERE detail.name = 'Your event name'

You can also choose the link next to the key CuratedBucket from the CloudFormation stack outputs to see paths and the objects loaded to the S3 bucket from other event sources. Similarly, you can query them via Athena.

Clean up

Complete the following steps to delete your resources and stop incurring costs:

  1. On the AWS CloudFormation console, select the stack you created and choose Delete.
  2. On the Amazon S3 console, find the bucket with the name starting with eventbridge-firehose-blog-curatedbucket.
  3. Select the bucket and choose Empty.
  4. Enter permanently delete to confirm the choice.
  5. Select the bucket again and choose Delete.
  6. Confirm the action by entering the bucket name when prompted.
  7. On the Systems Manager console, go to the parameter store and delete the parameter you created earlier.

Summary

This post demonstrates how to use an EventBridge rule to redirect AWS service-generated events or custom events to Amazon S3 using Kinesis Data Firehose to use for long-term storage, analysis, querying, and audit purposes.

For more information, see the Amazon EventBridge User Guide. To learn more about AWS service events supported by EventBridge, see Events from AWS services.


About the Author

Anand ShahAnand Shah is a Big Data Prototyping Solution Architect at AWS. He works with AWS customers and their engineering teams to build prototypes using AWS analytics services and purpose-built databases. Anand helps customers solve the most challenging problems using the art of the possible technology. He enjoys beaches in his leisure time.

How to Audit and Report S3 Prefix Level Access Using S3 Access Analyzer

Post Syndicated from Somdeb Bhattacharjee original https://aws.amazon.com/blogs/architecture/how-to-audit-and-report-s3-prefix-level-access-using-s3-access-analyzer/

Data Services teams in all industries are developing centralized data platforms that provide shared access to datasets across multiple business units and teams within the organization. This makes data governance easier, minimizes data redundancy thus reducing cost, and improves data integrity. The central data platform is often built with AWS Simple Storage Service (S3).

A common pattern for providing access to this data is for you to set up cross-account IAM Users and IAM Roles to allow direct access to the datasets stored in S3 buckets. You then enforce the permission on these datasets with S3 Bucket Policies or S3 Access Point policies. These policies can be very granular and you can provide access at the bucket level, prefix level as well as object level within an S3 bucket.

To reduce risk and unintended access, you can use Access Analyzer for S3 to identify S3 buckets within your zone of trust (Account or Organization) that are shared with external identities. Access Analyzer for S3 provides a lot of useful information at the bucket level but you often need S3 audit capability one layer down, at the S3 prefix level, since you are most likely going to organize your data using S3 prefixes.

Common use cases

Many organizations need to ingest a lot of third-party/vendor datasets and then distribute these datasets within the organization in a subscription-based model. Irrespective of how the data is ingested, whether it is using AWS Transfer Family service or other mechanisms, all the ingested datasets are stored in a single S3 bucket with a separate prefix for each vendor dataset. The hierarchy can be represented as:

vendor-s3-bucket
       ->vendorA-prefix
               ->vendorA.dataset.csv
       ->vendorB-prefix
               ->vendorB.dataset.csv

Based on this design, access is also granted to the data subscribers at the S3 prefix level. Access Analyzer for S3 does not provide visibility at the S3 prefix level so you need to develop custom scripts to extract this information from the S3 policy documents. You also need the information in an easy-to-consume format, for example, a csv file, that can be queried, filtered, readily downloaded and shared across the organization.

To help address this requirement, we show how to implement a solution that builds on the S3 access analyzer findings to generate a csv file on a pre-configured frequency. This solution provides details about:

  • External Principals outside your trust zone that have access to your S3 buckets
  • Permissions granted to these external principals (read, write)
  • List of s3 prefixes these external principals have access to that is configured using S3 bucket policy and/or S3 access point policies.

Architecture Overview

Architecture Diagram showing How to Audit and Report S3 prefix level access using S3 Access Analyzer

Figure 1 – How to Audit and Report S3 prefix level access using S3 Access Analyzer

The solution entails the following steps:

Step 1 – The Access Analyzer ARN and the S3 bucket parameters are passed to an AWS Lambda function via Environment variables.

Step 2 – The Lambda code uses the Access Analyzer ARN to call the list-findings API to retrieve the findings information and store it in the S3 bucket (under json prefix) in JSON format.

Step 3 – The Lambda function then also parses the JSON file to get the required fields and store it as a csv file in the same S3 bucket (under report prefix). It also scans the bucket policy and/or the access point policies to retrieve the S3 prefix level permission granted to the external identity. That information is added to the csv file.

Steps 4 and 5 – As part of the initial deployment, an AWS Glue crawler is provided to discover and create the schema of the csv file and store it in the AWS Glue Data Catalog.

Step 6 – An Amazon Athena query is run to create a spreadsheet of the findings that can be downloaded and distributed for audit.

Prerequisites

For this walkthrough, you should have the following prerequisites:

  • An AWS account
  • S3 buckets that are shared with external identities via cross-account IAM roles or IAM users.  Follow these instructions in this user guide to set up cross-account S3 bucket access.
  • IAM Access Analyzer enabled for your AWS account. Follow these instructions to enable IAM Access Analyzer within your account.

Once the IAM Access Analyzer is enabled, you should be able to view the Analyzer findings from the S3 console by selecting the bucket name and clicking on the ‘View findings’ box or directly going to the Access Analyzer findings on the IAM console.

When you select a ‘Finding id’ for an S3 Bucket, a screen similar to the following will appear:

Figure 2 - IAM Console Screenshot

Figure 2 – IAM Console Screenshot

Setup

Now that your access analyzer is running, you can open the link below to deploy the CloudFormation template. Make sure to launch the CloudFormation in the same AWS Region where IAM Access Analyzer has been enabled.

Launch template

Specify a name for the stack and input the following parameters:

  • ARN of the Access Analyzer which you can find from the IAM Console.
  • New S3 bucket where your findings will be stored. The Cloudformation template will add a suffix to the bucket name you provide to ensure uniqueness.
Figure 3 - CloudFormation Template screenshot

Figure 3 – CloudFormation Template screenshot

  • Select Next twice and on the final screen check the box allowing CloudFormation to create the IAM resources before selecting Create Stack.
  • It will take a couple of minutes for the stack to create the resources and launch the AWS Lambda function.
  • Once the stack is in CREATE_COMPLETE status, go to the Outputs tab of the stack and note down the value against the DataS3BucketName key. This is the S3 bucket the template generated. It would be of the format analyzer-findings-xxxxxxxxxxxx. Go to the S3 console and view the contents of the bucket.
    There should be two folders archive/ and report/. In the report folder you should have the csv file containing the findings report.
  • You can download the csv directly and open it in a excel sheet to view the contents. If would like to query the csv based on different attributes, follow the next set of steps.
    Go to the AWS Glue console and click on Crawlers. There should be an analyzer-crawler created for you. Select the crawler to run it.
  • After the crawler runs successfully, you should see a new table, analyzer-report created under analyzerdb Glue database.
  • Select the tablename to view the table properties and schema.
  • To query the table, go to the Athena console and select the analyzerdb database. Then you can run a query like “Select * from analyzer-report where externalaccount = <<valid external account>>” to list all the S3 buckets the external account has access to.
Figure 4 - Amazon Athena Console screenshot

Figure 4 – Amazon Athena Console screenshot

The output of the query with a subset of columns is shown as follows:

Figure 5 - Output of Amazon Athena Query

Figure 5 – Output of Amazon Athena Query

This CloudFormation template also creates a Cloudwatch event rule, testanalyzer-ScheduledRule-xxxxxxx, that launches the Lambda function every Monday to generate a new version of the findings csv file. You can update the rule to set it to a frequency you desire.

Clean Up

To avoid incurring costs, remember to delete the resources you created. First, manually delete the folders ‘archive’ and ‘report’ in the S3 bucket and then delete the CloudFormation stack you deployed at the beginning of the setup.

Conclusion

In this blog, we showed how you can build audit capabilities for external principals accessing your S3 buckets at a prefix level. Organizations looking to provide shared access to datasets across multiple business units will find this solution helpful in improving their security posture. Give this solution a try and share your feedback!

Extract ServiceNow data using AWS Glue Studio in an Amazon S3 data lake and analyze using Amazon Athena

Post Syndicated from Navnit Shukla original https://aws.amazon.com/blogs/big-data/extract-servicenow-data-using-aws-glue-studio-in-an-amazon-s3-data-lake-and-analyze-using-amazon-athena/

Many different cloud-based software as a service (SaaS) offerings are available in AWS. ServiceNow is one of the common cloud-based workflow automation platforms widely used by AWS customers. In the past few years, we saw a lot of customers who wanted to extract and integrate data from IT service management (ITSM) tools like ServiceNow for various use cases:

  • Generate insight from data – When you combine ServiceNow data with data from other services like CRM (such as Salesforce) or Martech data (such as Amazon Pinpoint) to generate better insights (e.g., building complete customer 360 view).
  • Archive data for future business or regulatory requirements – You can archive the data in raw form in your data lake to work on future use cases or just keep it to satisfy regulatory requirements such as auditing.
  • Improve performance by decoupling reporting or machine learning use cases from ITSM – When you move your ITSM reporting from ServiceNow to an Amazon Simple Storage Service (Amazon S3) data lake, there is no performance impact on your ServiceNow instance.
  • Data democratization – You can extract the data and put it into a data lake so it can be available to other business users and units to explore and use.

Many customers have been building modern data architectures on AWS, which includes building data lakes on Amazon S3 and using broad and deep AWS analytics and an AI/ML services to extract meaningful information from data by combining data from different data sources.

In this post, we provide a step-by-step guide to bring data from ServiceNow to an S3 data lake using AWS Glue Studio and analyze the data with Amazon Athena.

Solution overview

In this solution, ServiceNow data is being extracted through AWS Glue using a Marketplace connector. AWS Glue provides built-in support for the most commonly used data stores (such as Amazon Redshift, Amazon Aurora, Microsoft SQL Server, MySQL, MongoDB, and PostgreSQL) using JDBC connections. AWS Glue also allows you to use custom JDBC drivers in your extract, transform, and load (ETL) jobs. For data stores that are not natively supported, such as SaaS applications, you can use connectors and stored in Amazon S3. The data is cataloged in the AWS Glue Data Catalog, and we use Athena to query the data.

AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning (ML), and application development. AWS Glue provides all the capabilities needed for data integration so you can start analyzing your data and put it to use in minutes instead of months.

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

ServiceNow is a cloud-based software platform for ITSM that helps to automate IT business management. It’s designed based on ITIL guidelines to provide service orientation for tasks, activities, and processes.

The following diagram illustrates our solution architecture.
aws glue blog

To implement the solution, we complete the following high-level steps:

  1. Subscribe to the AWS Glue Connector Marketplace for ServiceNow from AWS Marketplace.
  2. Create a connection in AWS Glue Studio.
  3. Create an AWS Identity and Access Management (IAM) role for AWS Glue.
  4. Configure and run an AWS Glue job that uses the connection.
  5. Run the query against the data lake (Amazon S3) using Athena.

Prerequisites

For this walkthrough, you should have the following:

  • An AWS account.
  • A ServiceNow account. To follow along with this post, you can sign up for a developer account, which is pre-populated with sample records in many of the ServiceNow objects.
  • ServiceNow connection properties credentials stored in AWS Secrets Manager. On the Secrets Manager console, create a new secret (select Other type of secrets) with a key-value pair for each property, for example:
    • Username – ServiceNow Instance account user name (for example, admin)
    • Password – ServiceNow Instance account password
    • Instance – ServiceNow instance name without https and .service-now.com

Copy the secret name to use when configuring the connection in AWS Glue Studio.

Subscribe to the AWS Glue Marketplace Connector for ServiceNow

To connect, we use the AWS Glue Marketplace Connector for ServiceNow. You need to subscribe to the connector from AWS Marketplace.

The AWS Glue Marketplace Connector for ServiceNow is provided by third-party independent software vendor (ISV) listed on AWS Marketplace. Associated subscription fees and AWS usage fees apply once subscribed.

To use the connector in AWS Glue, you need to activate the subscribed connector in AWS Glue Studio. The activation process creates a connector object and connection in your AWS account.

  1. On the AWS Glue console, choose AWS Glue Studio.
  2. Choose Connectors.
  3. Choose Marketplace.
  4. Search for the CData AWS Glue Connector for ServiceNow.


After you subscribe to the connector, a new config tab appears on the AWS Marketplace connector page.

  1. Review the pricing and other relevant information.
  2. Choose Continue to Subscribe.
  3. Choose Accept Terms.

After you subscribe to the connector, the next steps are to configure it.

  1. Retain the default selections for Delivery Method and Software Version to use the latest connector software version.
  2. Choose Continue to Launch.

  1. Choose Usage Instructions.


A pop-up appears with a hyperlink to activate the connector with AWS Glue Studio.

  1. Choose this link to start configuring the connection to your ServiceNow account in AWS Glue Studio.

Create a connection in AWS Glue Studio

Create a connection in AWS Glue Studio with the following steps:

  1. For Name, enter a unique name for your ServiceNow connection.
  2. For Connection credential type, choose username_password.
  3. For AWS Secret, choose the Secrets Manager secret you created as a prerequisite.

Don’t provide any additional details in the optional Credentials section because it retrieves the value from Secrets Manager.

  1. Choose Create connection and activate connector to finish creating the connection.

You should now be able to view the ServiceNow connector you subscribed to and its associated connection.

Create an IAM role for AWS Glue

The next step is to create an IAM role with the necessary permissions for the AWS Glue job. The name of the role must start with the string AWSGlueServiceRole for AWS Glue Studio to use it correctly. You need to grant your IAM role permissions that AWS Glue can assume when calling other services on your behalf. For more information, see Create an IAM Role for AWS Glue.

Attach the following AWS managed policies to the role:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetResourcePolicy",
                "secretsmanager:GetSecretValue",
                "secretsmanager:DescribeSecret",
                "secretsmanager:ListSecretVersionIds"
            ],
            "Resource": [
                "{secret name arn}"
            ]
        }
    ]
}

For more information about permissions, see Review IAM permissions needed for the AWS Glue Studio user.

Configure and run the AWS Glue job

After you configure your connection, you can create and run an AWS Glue job.

Create a job that uses the connection

To create a job, complete the following steps:

  1. In AWS Glue Studio, choose Connectors.
  2. Select the connection you created.
  3. Choose Create job.


The visual job editor appears. A new source node, derived from the connection, is displayed on the job graph. In the node details panel on the right, the Data source properties tab is selected for user input.

Configure the source node properties

You can configure the access options for your connection to the data source on the Data source properties tab. For this post, we provide a simple walkthrough. Refer to the AWS Glue Studio User Guide for more information.

  1. On the Source menu, choose CData AWS Glue Connector for ServiceNow.

  1. On the Data source properties – Connector tab, make sure the source node for your connector is selected.

The Connection field is populated automatically with the name of the connection associated with the marketplace connector.

  1. Enter either a source table name or a query to use to retrieve data from the data source. For this post, we enter the table name incident.

  1. On the Transform menu, choose Apply Mapping.
  2. In a Node Property Tab, Select Node Parents CData AWS Glue Connector for ServiceNow.
  3. As we are connecting to an external data source; when you first look into Transform and Output schema tab; you won’t find the schema extracted from the source.
  4. In order for you to retrieve schema, Go to Data Preview tab, click on Start data preview session and select the IAM role you have created for this job.
  5. Once the Data preview is done, go to Data Source section and click on Use datapreview schema.
  6. Go to Transform and Check all the columns where Data Type showing as NULL.

  1. On the Target menu, choose Amazon S3.
  2. On the Data target properties – S3 tab, for Format, choose Parquet.
  3. For Compression Type, choose GZIP.
  4. For S3 Target Location, enter the Amazon S3 location to store the data.
  5. For Data Catalog update options, select Create a table in the Data Catalog and on subsequent runs, keep existing schema and add new partitions.
  6. For Database, enter sampledb.
  7. For Table name, enter incident.

Edit, save, and run the job

Edit the job by adding and editing the nodes in the job graph. See Editing ETL jobs in AWS Glue Studio for more information.

After you edit the job, enter the job properties.

  1. Choose the Job details tab above the visual graph editor.
  2. For Name, enter a job name.
  3. For IAM Role, choose an IAM role with the necessary permissions, as described previously.
  4. For Type, choose Spark.
  5. For Glue version, choose Glue 3.0 – Supports spark 3.1, Scala 2, Python 3.
  6. For Language, choose Python 3.
  7. Worker type : G.1X
  8. Requested number of workers: 2
  9. Number of retries: 1
  10. Job timeout (minutes): 3
  11. Use the default values for the other parameters.

For more information about job parameters, see Defining Job Properties for Spark Jobs.

12. After you save the job, choose Run to run the job.

Note – Running the Glue Job incur cost. You can learn more about AWS Glue Pricing here.

To view the generated script for the job, choose the Script tab at the top of the visual editor. The Job runs tab shows the job run history for the job. For more information about job run details, see View information for recent job runs.

Query against the data lake using Athena

After the job is complete, you can query the data in Athena.

  1. On the Athena console, choose the sampledb database.

You can view the newly created table called incident.

  1. Choose the options icon (three vertical dots) and choose Preview table to view the data.

Now let’s perform some analyses.

  1. Find all the incident tickets that are escalated by running the following query:
    SELECT task_effective_number FROM "sampledb"."incident" 
    where escalation = 2;

  1. Find ticket count with priority:
    SELECT priority, count(distinct task_effective_number)  FROM "sampledb"."incident"
    group by priority
    order by priority asc

Conclusion

In this post, we demonstrated how you can use an AWS Glue Studio connector to connect from ServiceNow and bring data into your data lake for further use cases.

AWS Glue provides built-in support for the most commonly used data stores (such as Amazon Redshift, Amazon Aurora, Microsoft SQL Server, MySQL, MongoDB, and PostgreSQL) using JDBC connections. AWS Glue also allows you to use custom JDBC drivers in your extract, transform, and load (ETL) jobs. For data stores that are not natively supported, such as SaaS applications, you can use connectors.

To learn more, refer to the AWS Glue Studio ConnectorAWS Glue Studio User Guide and Athena User Guide.


About the Authors

Navnit Shukla is AWS Specialist Solution Architect in Analytics. He is passionate about helping customers uncover insights from their data. He builds solutions to help organizations make data-driven decisions.

Srikanth Sopirala is a Principal Solutions Architect at AWS. He is a seasoned leader with over 20 years of experience, who is passionate about helping customers build scalable data and analytic solutions to gain timely insights and make critical business decisions. In his spare time, he enjoys reading, spending time with his family, and road biking.

Naresh Gautam is a Principal Solutions Architect at AWS. His role is helping customers architect highly available, high-performance, and cost-effective data analytics solutions to empower customers with data-driven decision-making. In his free time, he enjoys meditation and cooking.