Tag Archives: AWS Glue

Enforce boundaries on AWS Glue interactive sessions

Post Syndicated from Nicolas Jacob Baer original https://aws.amazon.com/blogs/big-data/enforce-boundaries-on-aws-glue-interactive-sessions/

AWS Glue interactive sessions allow engineers to build, test, and run data preparation and analytics workloads in an interactive notebook. Interactive sessions provide isolated development environments, take care of the underlying compute cluster, and allow for configuration to stop idling resources.

Glue interactive sessions provides default recommended configurations, and also allows users to customize the session to meet their needs. For example, you can provision more workers to experiment on a larger dataset or set the idle timeout for long-running workloads. With the flexibility to change these options depending on the workload, you may need ensure that the options are changed within specific boundaries and apply a control mechanism.

In this post, we present the process of deploying a reusable solution to enforce AWS Glue interactive session limits on three options: connection, number of workers, and maximum idle time. The first option addresses the need for applying custom inspection and controls on traffic, for example by enforcing an interactive session to only be run inside a VPC. The other two enforce limits on costs and usage of AWS Glue resources by enforcing an upper boundary on the number of workers and idle time per session. You can further extend the solution for other properties or services within AWS Glue.

Overview of solution

The proposed architecture is built on serverless components and runs whenever a new AWS Glue interactive session is created.

Architecture Diagram of the Solution

The workflow steps are as follows:

  1. A data engineer creates a new AWS Glue interactive session either through the AWS Management Console or in a Jupyter notebook locally.
  2. The interactive session produces a new event to AWS CloudTrail for the CreateSession event with all relevant information to identify and inspect a session as soon as the session is initiated.
  3. An Amazon EventBridge rule filters the CloudTrail events and invokes an AWS Lambda function to inspect the CreateSession event.
  4. The Lambda function inspects the CreateSession event and checks for all defined boundary conditions. Currently, the boundaries configurable with this solution are limited to maximum number of workers, idle timeout in minutes, and deployment with connection enforced.
  5. If any of the defined boundary conditions are not met, for example too many workers are provisioned for the session, depending on the provided configuration, the function ends the interactive session immediately and sends an email via Amazon Simple Notification Service (Amazon SNS). If the session hasn’t started yet, the function will wait for it to start before taking any action.
  6. If the session was stopped, an email is sent to an SNS topic. There is no information available in the interactive session notebook on the reason for the ending of the session. Therefore, additional context information is provided through the SNS topic to the data engineers.
  7. If the function fails, the sessions are logged in a dead-letter queue inside Amazon Simple Queue Service (Amazon SQS). Furthermore, the queue is monitored and in case of a message, it will trigger an Amazon CloudWatch alarm.

The following steps walk you through how to build and deploy the solution. The code is available in the GitHub repo.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Overview of the deployed resources

All the necessary resources are defined in an AWS CloudFormation file located under cfn/template.yaml. To deploy those resources, we use AWS Serverless Application Model (AWS SAM), which enables us to conveniently build and package all the dependencies and also manages the AWS CloudFormation steps for us.

The CloudFormation stack deploys the following resources:

  • A Lambda function with its library, both defined under the directory src/functions. The function is the control. It will validate that the session is started within the limits defined.
  • An EventBridge rule. This event listens to CloudTrail and in case of a new interactive session, will trigger the control Lambda function.
  • An SQS dead-letter queue (DLQ) attached to the Lambda function. This keeps a record of events that triggered a Lambda function failure.
  • Two CloudWatch alarms monitoring the Lambda function failures and the messages in the DLQ.

If notification via email is enabled, two more resources are deployed:

Additionally, AWS CloudFormation deploys all the necessary AWS Identity and Access Management (IAM) roles and policies, and an AWS Key Management Service (AWS KMS) key to ensure that the exchanged data is encrypted.

Deploy the solution

To facilitate the deployment lifecycle, including the setup of the user local environment, we provide a Makefile that describes all the necessary steps. Make sure you have your AWS credentials renewed and have access to your account. For more information, refer to Configuration and credential file settings.

  • Explore the Makefile and adjust the Region and stack name as needed by modifying the values of the variables AWS_REGION and STACK_NAME.
  • Set KILL_SESSION = "True" if you want to immediately stop the interactive session that has been found out of boundaries. Allowed values are True or False; the default is True.
  • Set NOTIFICATION_EMAIL_ADDRESS = <[email protected]> in the Makefile if you want get notified when a session has been found out of boundaries.
  • Set values for your controls:
    • ENFORCE_VPC_CONNECTION to stop sessions not running inside a VPC (true or false).
    • MAX_WORKERS to set the maximum number of workers for a session (numeric).
    • MAX_IDLE_TIMEOUT_MINUTES to define the maximum idle time for sessions in minutes (numeric).
  • Install all the prerequisite libraries:
    make install-pre-requisites

    These will be installed under a newly created Python virtual environment inside this repository in the directory .venv.

  • Deploy the new stack:
    make deploy

    This command will complete the following tasks:

    • Check if the prerequisites are met.
    • Perform pytest unittest on the Python files.
    • Validate the CloudFormation template.
    • Build the artifacts (Lambda function and Lambda layers).
    • Deploy the resources via AWS SAM.

Test the solution

Refer to Introducing AWS Glue interactive sessions for Jupyter for information about running an interactive session. If you follow the instructions in the post (see the section Run your first code cell and author your AWS Glue notebook), the initialization of the interactive session should fail with an error similar to the following.

Example of code in the cell:

from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
sc3 = SparkContext.getOrCreate()
glueContext1 = GlueContext(sc3)
spark = glueContext1.spark_session
job = Job(glueContext1)

Received output:

Authenticating with profile=XXXXXXXX
glue_role_arn defined by user: arn:aws:iam::XXXXXXXXXX:role/XXXXXXXX
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: XXXXXXXXXXXXX
Applying the following default arguments:
--glue_kernel_version 0.35
--enable-glue-datacatalog true
Waiting for session xxxxxxxxx to get into ready status...
Session xxxxxxxxx has been created
Exception encountered while running statement: An error occurred (EntityNotFoundException) when calling the GetStatement operation: Session ID xxxxxxxxx not found

If you enabled the email feature, you should also get an email notification.

You can also check on the AWS Glue console that your session ID isn’t listed.

Clean up

Clean up the deployed resources by running the following command:

make clean-up

Note that the resources deployed from following the recommended post, Introducing AWS Glue interactive sessions for Jupyter, will not be removed with the previous command.

Limitations

The delivery guarantee for CloudTrail events to EventBridge are best effort. This means CloudTrail will attempt to deliver all events to EventBridge, but in some rare cases, an event might not be delivered. For more information, refer to Events from AWS services.

Conclusion

This post described how to build, deploy, and test a solution to enforce boundary conditions on AWS Glue interactive sessions in order to enforce constraints on the number of workers, idle timeouts, and AWS Glue connection.

You can adapt this solution based on your needs and further extend it to allow controls on other options.

To learn more about how to use AWS Glue interactive sessions, refer to Introducing AWS Glue interactive sessions for Jupyter and Author AWS Glue jobs with PyCharm using AWS Glue interactive sessions.


About the Authors

Nicolas Jacob Baer is a Senior Cloud Application Architect with a strong focus on data engineering and machine learning, based in Switzerland. He works closely with enterprise customers to design data platforms and build advanced analytics/ml use-cases.

Luca Mazzaferro is a Senior DevOps Architect at Amazon Web Services. He likes to have infrastructure automated, reproducible and secured. In his free time he likes to cook, especially pizza.

Kemeng Zhang is a Cloud Application Architect with a strong focus on machine learning and UX, based in Switzerland. She works closely with customers to design user experiences and build advanced analytics/ml use-cases.

Mark Walser, a Senior Global Data Architect at Amazon Web Services, collaborates with customers to develop innovative Big Data solutions that solve business problems and speed up the adoption of AWS services. Outside of work, he finds pleasure in running, swimming, and all things related to technology.

Gal blog picGal Heyne is a Product Manager for AWS Glue with a strong focus on AI/ML, data engineering and BI, based in California. She is passionate about developing a deep understanding of customer’s business needs and collaborating with engineers to design easy to use data products.

Get started managing partitions for Amazon S3 tables backed by the AWS Glue Data Catalog

Post Syndicated from Anderson dos Santos original https://aws.amazon.com/blogs/big-data/get-started-managing-partitions-for-amazon-s3-tables-backed-by-the-aws-glue-data-catalog/

Large organizations processing huge volumes of data usually store it in Amazon Simple Storage Service (Amazon S3) and query the data to make data-driven business decisions using distributed analytics engines such as Amazon Athena. If you simply run queries without considering the optimal data layout on Amazon S3, it results in a high volume of data scanned, long-running queries, and increased cost.

Partitioning is a common technique to lay out your data optimally for distributed analytics engines. By partitioning your data, you can restrict the amount of data scanned by downstream analytics engines, thereby improving performance and reducing the cost for queries.

In this post, we cover the following topics related to Amazon S3 data partitioning:

  • Understanding table metadata in the AWS Glue Data Catalog and S3 partitions for better performance
  • How to create a table and load partitions in the Data Catalog using Athena
  • How partitions are stored in the table
  • Different ways to add partitions in a table on the Data Catalog
  • Partitioning data stored in Amazon S3 while ingestion and catalog

Understanding table metadata in the Data Catalog and S3 partitions for better performance

A table in the AWS Glue Data Catalog is the metadata definition that organizes the data location, data type, and column schema, which represents the data in a data store. Partitions are data organized hierarchically, defining the location where the data for a particular partition resides. Partitioning your data allows you to limit the amount of data scanned by S3 SELECT, thereby improving performance and reducing cost.

There are a few factors to consider when deciding the columns on which to partition. For example, if you’re using columns as filters, don’t use a column that is partitioning too finely, or don’t choose a column where your data is heavily skewed to one partition value. You can partition your data by any column. Partition columns are usually designed by a common query pattern in your use case. For example, a common practice is to partition the data based on year/month/day because many queries tend to run time series analyses in typical use cases. This often leads to a multi-level partitioning scheme. Data is organized in a hierarchical directory structure based on the distinct values of one or more columns.

Let’s look at an example of how partitioning works.

Files corresponding to a single day’s worth of data are placed under a prefix such as s3://my_bucket/logs/year=2023/month=06/day=01/.

If your data is partitioned per day, every day you have a single file, such as the following:

  • s3://my_bucket/logs/year=2023/month=06/day=01/file1_example.json
  • s3://my_bucket/logs/year=2023/month=06/day=02/file2_example.json
  • s3://my_bucket/logs/year=2023/month=06/day=03/file3_example.json

We can use a WHERE clause to query the data as follows:

SELECT * FROM table WHERE year=2023 AND month=06 AND day=01

The preceding query reads only the data inside the partition folder year=2023/month=06/day=01 instead of scanning through the files under all partitions. Therefore, it only scans the file file1_example.json.

Systems such as Athena, Amazon Redshift Spectrum, and now AWS Glue can use these partitions to filter data by value, eliminating unnecessary (partition) requests to Amazon S3. This capability can improve the performance of applications that specifically need to read a limited number of partitions. For more information about partitioning with Athena and Redshift Spectrum, refer to Partitioning data in Athena and Creating external tables for Redshift Spectrum, respectively.

How to create a table and load partitions in the Data Catalog using Athena

Let’s begin by understanding how to create a table and load partitions using DDL (Data Definition Language) queries in Athena. Note that to demonstrate the various methods of loading partitions into the table, we need to delete and recreate the table multiple times throughout the following steps.

First, we create a database for this demo.

  1. On the Athena console, choose Query editor.

If this is your first time using the Athena query editor, you need to configure and specify an S3 bucket to store the query results.

  1. Create a database with the following command:
CREATE DATABASE partitions_blog;

  1. In the Data pane, for Database, choose the database partitions_blog.
  2. Create the table impressions following the example in Hive JSON SerDe. Replace <myregion> in s3://<myregion>.elasticmapreduce/samples/hive-ads/tables/impressions with the Region identifier where you run Athena (for example, s3://us-east-1.elasticmapreduce/samples/hive-ads/tables/impressions).
  3. Run the following query to create the table:
CREATE EXTERNAL TABLE impressions (
    requestbegintime string,
    adid string,
    impressionid string,
    referrer string,
    useragent string,
    usercookie string,
    ip string,
    number string,
    processid string,
    browsercookie string,
    requestendtime string,
    timers struct
                <
                 modellookup:string, 
                 requesttime:string
                >,
    threadid string, 
    hostname string,
    sessionid string
)   
PARTITIONED BY (dt string)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION 's3://us-east-1.elasticmapreduce/samples/hive-ads/tables/impressions';

The following screenshot shows the query in the query editor.

  1. Run the following query to review the data:
SELECT * FROM impressions;

You can’t see any results because the partitions aren’t loaded yet.

If the partition isn’t loaded into a partitioned table, when the application downloads the partition metadata, the application will not be aware of the S3 path that needs to be queried. For more information, refer to Why do I get zero records when I query my Amazon Athena table.

  1. Load the partitions using the command MSCK REPAIR TABLE.

The MSCK REPAIR TABLE command was designed to manually add partitions that are added to or removed from the file system, such as HDFS or Amazon S3, but are not present in the metastore.

  1. Query the table again to see the results.

After the MSCK REPAIR TABLE command scans Amazon S3 and adds partitions to AWS Glue for Hive-compatible partitions, the records under the registered partitions are now returned.

How partitions are stored in the table metadata

We can list the table partitions in Athena by running the SHOW PARTITIONS command, as shown in the following screenshot.

We also can see the partition metadata on the AWS Glue console. Complete the following steps:

  1. On the AWS Glue console, choose Tables in the navigation pane under Data Catalog.
  2. Choose the impressions table in the partitions_blog database.
  3. On the Partitions tab, choose View Properties next to a partition to view its details.

The following screenshot shows an example of the partition properties.

We can also get the partitions using the AWS Command Line Interface (AWS CLI) command get-partitions, as shown in the following screenshot.

From the get-partitions, the element “Values” defines the partition value and “Location” defines the S3 path to be queried by the application:

"Values": [
                "2009-04-12-19-05"
            ]

When querying the data from the partition dt="2009-04-12-19-05", the application lists and reads only the files in the S3 path s3://us-east-1.elasticmapreduce/samples/hive-ads/tables/impressions/dt="2009-04-12-19-05".

Different ways to add partitions in a table on the Data Catalog

There are multiple ways to load partitions into the table. You can create tables and partitions directly using the AWS Glue API, SDKs, AWS CLI, DDL queries on Athena, using AWS Glue crawlers, or using AWS Glue ETL jobs.

For the next examples, we need to drop and recreate the table. Run the following command in the Athena query editor:

DROP table impressions;

After that, recreate the table:

CREATE EXTERNAL TABLE impressions (
    requestbegintime string,
    adid string,
    impressionid string,
    referrer string,
    useragent string,
    usercookie string,
    ip string,
    number string,
    processid string,
    browsercookie string,
    requestendtime string,
    timers struct
                <
                 modellookup:string, 
                 requesttime:string
                >,
    threadid string, 
    hostname string,
    sessionid string
)   
PARTITIONED BY (dt string)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION 's3://us-east-1.elasticmapreduce/samples/hive-ads/tables/impressions';

Creating partitions individually

If the data arrives in an S3 bucket at a scheduled time, for example every hour or once a day, you can individually add partitions. One way of doing so is by running an ALTER TABLE ADD PARTITION DDL query on Athena.

We use Athena for this query as an example. You can do the same from Hive on Amazon EMR, Spark on Amazon EMR, AWS Glue for Apache Spark jobs, and more.

To load partitions using Athena, we need to use the ALTER TABLE ADD PARTITION command, which can create one or more partitions in the table. ALTER TABLE ADD PARTITION supports partitions created on Amazon S3 with camel case (s3://bucket/table/dayOfTheYear=20), Hive format (s3://bucket/table/dayoftheyear=20), and non-Hive style partitioning schemes used by AWS CloudTrail logs, which use separate path components for date parts, such as s3://bucket/data/2021/01/26/us/6fc7845e.json.

To load partitions into a table, you can run the following query in the Athena query editor:

ALTER TABLE impressions 
  ADD PARTITION (dt = '2009-04-12-19-05');


Refer to ALTER TABLE ADD PARTITION for more information.

Another option is using AWS Glue APIs. AWS Glue provides two APIs to load partitions into table create_partition() and batch_create_partition(). For the API parameters, refer to CreatePartition.

The following example uses the AWS CLI:

aws glue create-partition \
    --database-name partitions_blog \
    --table-name impressions \
    --partition-input '{
                            "Values":["2009-04-14-13-00"],
                            "StorageDescriptor":{
                                "Location":"s3://us-east-1.elasticmapreduce/samples/hive-ads/tables/impressions/dt=2009-04-14-13-00",
                                "InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
                                "SerdeInfo": {
                                    "SerializationLibrary": "org.apache.hive.hcatalog.data.JsonSerDe"
                                }
                            }
                        }'

Both commands (ALTER TABLE in Athena and the AWS Glue API create-partition) will create partition enhancing from the table definition.

Load multiple partitions using MSCK REPAIR TABLE

You can load multiple partitions in Athena. MSCK REPAIR TABLE is a DDL statement that scans the entire S3 path defined in the table’s Location property. Athena lists the S3 path searching for Hive-compatible partitions, then loads the existing partitions into the AWS Glue table’s metadata. A table needs to be created in the Data Catalog, and the data source must be from Amazon S3 before it can run. You can create a table with AWS Glue APIs or by running a CREATE TABLE statement in Athena. After the table creation, run MSCK REPAIR TABLE to load the partitions.

The parameter DDL query timeout in the service quotas defines how long a DDL statement can run. The runtime increases accordingly to the number of folders or partitions in the S3 path.

The MSCK REPAIR TABLE command is best used when creating a table for the first time or when there is uncertainty about parity between data and partition metadata. It supports folders created in lowercase and using Hive-style partitions format (for example, year=2023/month=6/day=01). Because MSCK REPAIR TABLE scans both the folder and its subfolders to find a matching partition scheme, you should keep data for separate tables in separate folder hierarchies.

Every MSCK REPAIR TABLE command lists the entire folder specified in the table location. If you add new partitions frequently (for example, every 5 minutes or every hour), consider scheduling an ALTER TABLE ADD PARTITION statement to load only the partitions defined in the statement instead of scanning the entire S3 path.

The partitions created in the Data Catalog by MSCK REPAIR TABLE enhance the schema from the table definition. Note that Athena doesn’t charge for DDL statements, making MSCK REPAIR TABLE a more straightforward and affordable way to load partitions.

Add multiple partitions using an AWS Glue crawler

An AWS Glue crawler offers more features when loading partitions into the table. A crawler automatically identifies partitions in Amazon S3, extracts metadata, and creates table definitions in the Data Catalog. Crawlers can crawl the following file-based and table-based data stores.

Crawlers can help automate table creation and loading partitions into tables. They are charged per hour, and bill per second. You can optimize the crawler’s performance by altering parameters like the sample size or by specifying it to crawl new folders only.

If the schema of the data changes, the crawler will update the table and partition schemas accordingly. The crawler configuration options have parameters such as update the table definition in the Data Catalog, add new columns only, and ignore the change and don’t update the table in the Data Catalog, which tell the crawler how to update the table when needed and evolve the table schema.

Crawlers can create and update multiple tables from the same data source. When an AWS Glue crawler scans Amazon S3 and detects multiple directories, it uses a heuristic to determine where the root for a table is in the directory structure and which directories are partitions for the table.

To create an AWS Glue crawler, complete the following steps:

  1. On the AWS Glue console, choose Crawlers in the navigation pane under Data Catalog.
  2. Choose Create crawler.
  3. Provide a name and optional description, then choose Next.
  4. Under Data source configuration, select Not yet and choose Add a data source.
  5. For Data source, choose S3.
  6. For S3 path, enter the path of the impression data (s3://us-east-1.elasticmapreduce/samples/hive-ads/tables/impressions).
  7. Select a preference for subsequent crawler runs.
  8. Choose Add an S3 data source.
  9. Select your data source and choose Next.
  10. Under IAM role, either choose an existing AWS Identity and Access Management (IAM) role or choose Create new IAM role.
  11. Choose Next.
  12. For Target database, choose partitions_blog.
  13. For Table name prefix, enter crawler_.

We use the table prefix to add a custom prefix in front of the table name. For example, if you leave the prefix field empty and start the crawler on s3://my-bucket/some-table-backup, it creates a table with the name some-table-backup. If you add crawler_ as a prefix, it a creates table called crawler_some-table-backup.

  1. Choose your crawler schedule, then choose Next.
  2. Review your settings and create the crawler.
  3. Select your crawler and choose Run.

Wait for the crawler to finish running.

You can go to Athena and check the table was created:

SHOW PARTITIONS crawler_impressions;

Partitioning data stored in Amazon S3 while ingestion and cataloging

The previous examples work with data that already exists in Amazon S3. If you’re using AWS Glue jobs to write data on Amazon S3, you have the option to create partitions with DynamicFrames by enabling the “enableUpdateCatalog=True” parameter. Refer to Creating tables, updating the schema, and adding new partitions in the Data Catalog from AWS Glue ETL jobs for more information.

DynamicFrame supports native partitioning using a sequence of keys, using the partitionKeys option when you create a sink. For example, the following Python code writes out a dataset to Amazon S3 in Parquet format into directories partitioned by the ‘year’ field. After ingesting the data and registering partitions from the AWS Glue job, you can utilize these partitions from queries running on other analytics engines such as Athena.

## Create partitioned table in Glue Data Catalog using DynamicFrame

#Read Dataset
datasource0 = glueContext.create_dynamic_frame.from_catalog(
      database = "default", 
      table_name = "flight_delays_pq", 
      transformation_ctx = "datasource0")

#Create Sink
sink = glueContext.getSink(
    connection_type="s3", 
    path="s3://BUCKET/glueetl/",
    enableUpdateCatalog=True,
    partitionKeys=[ "year"])
    
sink.setFormat("parquet", useGlueParquetWriter=True)

sink.setCatalogInfo(catalogDatabase="default", catalogTableName="test_table")

#Write data, create table and add partitions
sink.writeFrame(datasource0)
job.commit()

Conclusion

This post showed multiple methods for partitioning your Amazon S3 data, which helps reduce costs by avoiding unnecessary data scanning and also improves the overall performance of your processes. We further described how AWS Glue makes effective metadata management for partitions possible, allowing you to optimize your storage and query operations in AWS Glue and Athena. These partitioning methods can help optimize scanning high volumes of data or long-running queries, as well as reduce the cost of scanning.

We hope you try out these options!


About the authors

Anderson Santos is a Senior Solutions Architect at Amazon Web Services. He works with AWS Enterprise customers to provide guidance and technical assistance, helping them improve the value of their solutions when using AWS.

Arun Pradeep Selvaraj is a Senior Solutions Architect and is part of Analytics TFC at AWS. Arun is passionate about working with his customers and stakeholders on digital transformations and innovation in the cloud while continuing to learn, build and reinvent. He is creative, fast-paced, deeply customer-obsessed and leverages the working backwards process to build modern architectures to help customers solve their unique challenges.

Patrick Muller is a Senior Solutions Architect and a valued member of the Datalab. With over 20 years of expertise in analytics, data warehousing, and distributed systems, he brings extensive knowledge to the table. Patrick’s passion lies in evaluating new technologies and assisting customers with innovative solutions. During his free time, he enjoys watching soccer.

Build an Amazon Redshift data warehouse using an Amazon DynamoDB single-table design

Post Syndicated from Altaf Hussain original https://aws.amazon.com/blogs/big-data/build-an-amazon-redshift-data-warehouse-using-an-amazon-dynamodb-single-table-design/

Amazon DynamoDB is a fully managed NoSQL service that delivers single-digit millisecond performance at any scale. It’s used by thousands of customers for mission-critical workloads. Typical use cases for DynamoDB are an ecommerce application handling a high volume of transactions, or a gaming application that needs to maintain scorecards for players and games. In traditional databases, we would model such applications using a normalized data model (entity-relation diagram). This approach comes with a heavy computational cost in terms of processing and distributing the data across multiple tables while ensuring the system is ACID-compliant at all times, which can negatively impact performance and scalability. If these entities are frequently queried together, it makes sense to store them in a single table in DynamoDB. This is the concept of single-table design. Storing different types of data in a single table allows you to retrieve multiple, heterogeneous item types using a single request. Such requests are relatively straightforward, and usually take the following form:

SELECT * FROM TABLE WHERE Some_Attribute = 'some_value'

In this format, some_attribute is a partition key or part of an index.

Nonetheless, many of the same customers using DynamoDB would also like to be able to perform aggregations and ad hoc queries against their data to measure important KPIs that are pertinent to their business. Suppose we have a successful ecommerce application handling a high volume of sales transactions in DynamoDB. A typical ask for this data may be to identify sales trends as well as sales growth on a yearly, monthly, or even daily basis. These types of queries require complex aggregations over a large number of records. A key pillar of AWS’s modern data strategy is the use of purpose-built data stores for specific use cases to achieve performance, cost, and scale. Deriving business insights by identifying year-on-year sales growth is an example of an online analytical processing (OLAP) query. These types of queries are suited for a data warehouse.

The goal of a data warehouse is to enable businesses to analyze their data fast; this is important because it means they are able to gain valuable insights in a timely manner. Amazon Redshift is fully managed, scalable, cloud data warehouse. Building a performant data warehouse is non-trivial because the data needs to be highly curated to serve as a reliable and accurate version of the truth.

In this post, we walk through the process of exporting data from a DynamoDB table to Amazon Redshift. We discuss data model design for both NoSQL databases and SQL data warehouses. We begin with a single-table design as an initial state and build a scalable batch extract, load, and transform (ELT) pipeline to restructure the data into a dimensional model for OLAP workloads.

DynamoDB table example

We use an example of a successful ecommerce store allowing registered users to order products from their website. A simple ERD (entity-relation diagram) for this application will have four distinct entities: customers, addresses, orders, and products. For customers, we have information such as their unique user name and email address; for the address entity, we have one or more customer addresses. Orders contain information regarding the order placed, and the products entity provides information about the products placed in an order. As we can see from the following diagram, a customer can place one or more orders, and an order must contain one or more products.

We could store each entity in a separate table in DynamoDB. However, there is no way to retrieve customer details alongside all the orders placed by the customer without making multiple requests to the customer and order tables. This is inefficient from both a cost and performance perspective. A key goal for any efficient application is to retrieve all the required information in a single query request. This ensures fast, consistent performance. So how can we remodel our data to avoid making multiple requests? One option is to use single-table design. Taking advantage of the schema-less nature of DynamoDB, we can store different types of records in a single table in order to handle different access patterns in a single request. We can go further still and store different types of values in the same attribute and use it as a global secondary index (GSI). This is called index overloading.

A typical access pattern we may want to handle in our single table design is to get customer details and all orders placed by the customer.

To accommodate this access pattern, our single-table design looks like the following example.

By restricting the number of addresses associated with a customer, we can store address details as a complex attribute (rather than a separate item) without exceeding the 400 KB item size limit of DynamoDB.

We can add a global secondary index (GSIpk and GSIsk) to capture another access pattern: get order details and all product items placed in an order. We use the following table.

We have used generic attribute names, PK and SK, for our partition key and sort key columns. This is because they hold data from different entities. Furthermore, the values in these columns are prefixed by generic terms such as CUST# and ORD# to help us identify the type of data we have and ensure that the value in PK is unique across all records in the table.

A well-designed single table will not only reduce the number of requests for an access pattern, but will service many different access patterns. The challenge comes when we need to ask more complex questions of our data, for example, what was the year-on-year quarterly sales growth by product broken down by country?

The case for a data warehouse

A data warehouse is ideally suited to answer OLAP queries. Built on highly curated structured data, it provides the flexibility and speed to run aggregations across an entire dataset to derive insights.

To house our data, we need to define a data model. An optimal design choice is to use a dimensional model. A dimension model consists of fact tables and dimension tables. Fact tables store the numeric information about business measures and foreign keys to the dimension tables. Dimension tables store descriptive information about the business facts to help understand and analyze the data better. From a business perspective, a dimension model with its use of facts and dimensions can present complex business processes in a simple-to-understand manner.

Building a dimensional model

A dimensional model optimizes read performance through efficient joins and filters. Amazon Redshift automatically chooses the best distribution style and sort key based on workload patterns. We build a dimensional model from the single DynamoDB table based on the following star schema.

We have separated each item type into individual tables. We have a single fact table (Orders) containing the business measures price and numberofitems, and foreign keys to the dimension tables. By storing the price of each product in the fact table, we can track price fluctuations in the fact table without continually updating the product dimension. (In a similar vein, the DynamoDB attribute amount is a simple derived measure in our star schema: amount is the summation of product prices per orderid).

By splitting the descriptive content of our single DynamoDB table into multiple Amazon Redshift dimension tables, we can remove redundancy by only holding in each dimension the information pertinent to it. This allows us the flexibility to query the data under different contexts; for example, we may want to know the frequency of customer orders by city or product sales by date. The ability to freely join dimensions and facts when analyzing the data is one of the key benefits of dimensional modeling. It’s also good practice to have a Date dimension to allow us to perform time-based analysis by aggregating the fact by year, month, quarter, and so forth.

This dimensional model will be built in Amazon Redshift. When setting out to build a data warehouse, it’s a common pattern to have a data lake as the source of the data warehouse. The data lake in this context serves a number of important functions:

  • It acts as a central source for multiple applications, not just exclusively for data warehousing purposes. For example, the same dataset could be used to build machine learning (ML) models to identify trends and predict sales.
  • It can store data as is, be it unstructured, semi-structured, or structured. This allows you to explore and analyze the data without committing upfront to what the structure of the data should be.
  • It can be used to offload historical or less-frequently-accessed data, allowing you to manage your compute and storage costs more effectively. In our analytic use case, if we are analyzing quarterly growth rates, we may only need a couple of years’ worth of data; the rest can be unloaded into the data lake.

When querying a data lake, we need to consider user access patterns in order to reduce costs and optimize query performance. This is achieved by partitioning the data. The choice of partition keys will depend on how you query the data. For example, if you query the data by customer or country, then they are good candidates for partition keys; if you query by date, then a date hierarchy can be used to partition the data.

After the data is partitioned, we want to ensure it’s held in the right format for optimal query performance. The recommended choice is to use a columnar format such as Parquet or ORC. Such formats are compressed and store data column-wise, allowing for fast retrieval times, and are parallelizable, allowing for fast load times when moving the data into Amazon Redshift. In our use case, it makes sense to store the data in a data lake with minimal transformation and formatting to enable easy querying and exploration of the dataset. We partition the data by item type (Customer, Order, Product, and so on), and because we want to easily query each entity in order to move the data into our data warehouse, we transform the data into the Parquet format.

Solution overview

The following diagram illustrates the data flow to export data from a DynamoDB table to a data warehouse.

We present a batch ELT solution using AWS Glue for exporting data stored in DynamoDB to an Amazon Simple Storage Service (Amazon S3) data lake and then a data warehouse built in Amazon Redshift. AWS Glue is a fully managed extract, transform, and load (ETL) service that allows you to organize, cleanse, validate, and format data for storage in a data warehouse or data lake.

The solution workflow has the following steps:

  1. Move any existing files from the raw and data lake buckets into corresponding archive buckets to ensure any fresh export from DynamoDB to Amazon S3 isn’t duplicating data.
  2. Begin a new DynamoDB export to the S3 raw layer.
  3. From the raw files, create a data lake partitioned by item type.
  4. Load the data from the data lake to landing tables in Amazon Redshift.
  5. After the data is loaded, we take advantage of the distributed compute capability of Amazon Redshift to transform the data into our dimensional model and populate the data warehouse.

We orchestrate the pipeline using an AWS Step Functions workflow and schedule a daily batch run using Amazon EventBridge.

For simpler DynamoDB table structures you may consider skipping some of these steps by either loading data directly from DynamoDB to Redshift or using Redshift’s auto-copy or copy command to load data from S3.

Prerequisites

You must have an AWS account with a user who has programmatic access. For setup instructions, refer to AWS security credentials.

Use the AWS CloudFormation template cf_template_ddb-dwh-blog.yaml to launch the following resources:

  • A DynamoDB table with a GSI and point-in-time recovery enabled.
  • An Amazon Redshift cluster (we use two nodes of RA3.4xlarge).
  • Three AWS Glue database catalogs: raw, datalake, and redshift.
  • Five S3 buckets: two for the raw and data lake files; two for their respective archives, and one for the Amazon Athena query results.
  • Two AWS Identity and Access Management (IAM) roles: An AWS Glue role and a Step Functions role with the requisite permissions and access to resources.
  • A JDBC connection to Amazon Redshift.
  • An AWS Lambda function to retrieve the s3-prefix-list-id for your Region. This is required to allow traffic from a VPC to access an AWS service through a gateway VPC endpoint.
  • Download the following files to perform the ELT:
    • The Python script to load sample data into our DynamoDB table: load_dynamodb.py.
    • The AWS Glue Python Spark script to archive the raw and data lake files: archive_job.py.
    • The AWS Glue Spark scripts to extract and load the data from DynamoDB to Amazon Redshift: GlueSparkJobs.zip.
    • The DDL and DML SQL scripts to create the tables and load the data into the data warehouse in Amazon Redshift: SQL Scripts.zip.

Launch the CloudFormation template

AWS CloudFormation allows you to model, provision, and scale your AWS resources by treating infrastructure as code. We use the downloaded CloudFormation template to create a stack (with new resources).

  1. On the AWS CloudFormation console, create a new stack and select Template is ready.
  2. Upload the stack and choose Next.

  1. Enter a name for your stack.
  2. For MasterUserPassword, enter a password.
  3. Optionally, replace the default names for the Amazon Redshift database, DynamoDB table, and MasterUsername (in case these names are already in use).
  4. Reviewed the details and acknowledge that AWS CloudFormation may create IAM resources on your behalf.
  5. Choose Create stack.

Load sample data into a DynamoDB table

To load your sample data into DynamoDB, complete the following steps:

  1. Create an AWS Cloud9 environment with default settings.
  2. Upload the load DynamoDB Python script. From the AWS Cloud9 terminal, use the pip install command to install the following packages:
    1. boto3
    2. faker
    3. faker_commerce
    4. numpy
  3. In the Python script, replace all placeholders (capital letters) with the appropriate values and run the following command in the terminal:
python load_dynamodb.py

This command loads the sample data into our single DynamoDB table.

Extract data from DynamoDB

To extract the data from DynamoDB to our S3 data lake, we use the new AWS Glue DynamoDB export connector. Unlike the old connector, the new version uses a snapshot of the DynamoDB table and doesn’t consume read capacity units of your source DynamoDB table. For large DynamoDB tables exceeding 100 GB, the read performance of the new AWS Glue DynamoDB export connector is not only consistent but also significantly faster than the previous version.

To use this new export connector, you need to enable point-in-time recovery (PITR) for the source DynamoDB table in advance. This will take continuous backups of the source table (so be mindful of cost) and ensures that each time the connector invokes an export, the data is fresh. The time it takes to complete an export depends on the size of your table and how uniformly the data is distributed therein. This can range from a few minutes for small tables (up to 10 GiB) to a few hours for larger tables (up to a few terabytes). This is not a concern for our use case because data lakes and data warehouses are typically used to aggregate data at scale and generate daily, weekly, or monthly reports. It’s also worth noting that each export is a full refresh of the data, so in order to build a scalable automated data pipeline, we need to archive the existing files before beginning a fresh export from DynamoDB.

Complete the following steps:

  1. Create an AWS Glue job using the Spark script editor.
  2. Upload the archive_job.py file from GlueSparkJobs.zip.

This job archives the data files into timestamped folders. We run the job concurrently to archive the raw files and the data lake files.

  1. In Job details section, give the job a name and choose the AWS Glue IAM role created by our CloudFormation template.
  2. Keep all defaults the same and ensure maximum concurrency is set to 2 (under Advanced properties).

Archiving the files provides a backup option in the event of disaster recovery. As such, we can assume that the files will not be accessed frequently and can be kept in Standard_IA storage class so as to save up to 40% on costs while providing rapid access to the files when needed.

This job typically runs before each export of data from DynamoDB. After the datasets have been archived, we’re ready to (re)-export the data from our DynamoDB table.

We can use AWS Glue Studio to visually create the jobs needed to extract the data from DynamoDB and load into our Amazon Redshift data warehouse. We demonstrate how to do this by creating an AWS Glue job (called ddb_export_raw_job) using AWS Glue Studio.

  1. In AWS Glue Studio, create a job and select Visual with a blank canvas.
  2. Choose Amazon DynamoDB as the data source.

  1. Choose our DynamoDB table to export from.
  2. Leave all other options as is and finish setting up the source connection.

We then choose Amazon S3 as our target. In the target properties, we can transform the output to a suitable format, apply compression, and specify the S3 location to store our raw data.

  1. Set the following options:
    1. For Format, choose Parquet.
    2. For Compression type, choose Snappy.
    3. For S3 Target Location, enter the path for RawBucket (located on the Outputs tab of the CloudFormation stack).
    4. For Database, choose the value for GlueRawDatabase (from the CloudFormation stack output).
    5. For Table name, enter an appropriate name.

  1. Because our target data warehouse requires data to be in a flat structure, verify that the configuration option dynamodb.unnestDDBJson is set to True on the Script tab.

  1. On the Job details tab, choose the AWS Glue IAM role generated by the CloudFormation template.
  2. Save and run the job.

Depending on the data volumes being exported, this job may take a few minutes to complete.

Because we’ll be adding the table to our AWS Glue Data Catalog, we can explore the output using Athena after the job is complete. Athena is a serverless interactive query service that makes it simple to analyze data directly in Amazon S3 using standard SQL.

  1. In the Athena query editor, choose the raw database.

We can see that the attributes of the Address structure have been unnested and added as additional columns to the table.

  1. After we export the data into the raw bucket, create another job (called raw_to_datalake_job) using AWS Glue Studio (select Visual with a blank canvas) to load the data lake partitioned by item type (customer, order, and product).
  2. Set the source as the AWS Glue Data Catalog raw database and table.

  1. In the ApplyMapping transformation, drop the Address struct because we have already unnested these attributes into our flattened raw table.

  1. Set the target as our S3 data lake.

  1. Choose the AWS Glue IAM role in the job details, then save and run the job.

Now that we have our data lake, we’re ready to build our data warehouse.

Build the dimensional model in Amazon Redshift

The CloudFormation template launches a two-node RA3.4xlarge Amazon Redshift cluster. To build the dimensional model, complete the following steps:

  1. In Amazon Redshift Query Editor V2, connect to your database (default: salesdwh) within the cluster using the database user name and password authentication (MasterUserName and MasterUserPassword from the CloudFormation template).
  2. You may be asked to configure your account if this is your first time using Query Editor V2.
  3. Download the SQL scripts SQL Scripts.zip to create the following schemas and tables (run the scripts in numbered sequence).

In the landing schema:

  • address
  • customer
  • order
  • product

In the staging schema:

  • staging.address
  • staging.address_maxkey
  • staging.addresskey
  • staging.customer
  • staging.customer_maxkey
  • staging.customerkey
  • staging.date
  • staging.date_maxkey
  • staging.datekey
  • staging.order
  • staging.order_maxkey
  • staging.orderkey
  • staging.product
  • staging.product_maxkey
  • staging.productkey

In the dwh schema:

  • dwh.address
  • dwh.customer
  • dwh.order
  • dwh.product

We load the data from our data lake to the landing schema as is.

  1. Use the JDBC connector to Amazon Redshift to build an AWS Glue crawler to add the landing schema to our Data Catalog under the ddb_redshift database.

  1. Create an AWS Glue crawler with the JDBC data source.

  1. Select the JDBC connection you created and choose Next.

  1. Choose the IAM role created by the CloudFormation template and choose Next.

  1. Review your settings before creating the crawler.

The crawler adds the four landing tables in our AWS Glue database ddb_redshift.

  1. In AWS Glue Studio, create four AWS Glue jobs to load the landing tables (these scripts are available to download, and you can use the Spark script editor to upload these scripts individually to create the jobs):
    1. land_order_job
    2. land_product_job
    3. land_customer_job
    4. land_address_job

Each job has the structure as shown in the following screenshot.

  1. Filter the S3 source on the partition column type:
    1. For product, filter on type=‘product’.
    2. For order, filter on type=‘order’.
    3. For customer and address, filter on type=‘customer’.

  1. Set the target for the data flow as the corresponding table in the landing schema in Amazon Redshift.
  2. Use the built-in ApplyMapping transformation in our data pipeline to drop columns and, where necessary, convert the data types to match the target columns.

For more information about built-in transforms available in AWS Glue, refer to AWS Glue PySpark transforms reference.

The mappings for our four jobs are as follows:

  • land_order_job:
    mappings=[
    ("pk", "string", "pk", "string"),
    ("orderid", "string", "orderid", "string"),
    ("numberofitems", "string", "numberofitems", "int"),
    ("orderdate", "string", "orderdate", "timestamp"),
    ]

  • land_product_job:
    mappings=[
    ("orderid", "string", "orderid", "string"),
    ("category", "string", "category", "string"),
    ("price", "string", "price", "decimal"),
    ("productname", "string", "productname", "string"),
    ("productid", "string", "productid", "string"),
    ("color", "string", "color", "string"),
    ]

  • land_address_job:
    mappings=[
    ("username", "string", "username", "string"),
    ("email", "string", "email", "string"),
    ("fullname", "string", "fullname", "string"),
    ]

  • land_customer_job:
    mappings=[
    ("username", "string", "username", "string"),
    ("email", "string", "email", "string"),
    ("fullname", "string", "fullname", "string"),
    ]

  1. Choose the AWS Glue IAM role, and under Advanced properties, verify the JDBC connector to Amazon Redshift as a connection.
  2. Save and run each job to load the landing tables in Amazon Redshift.

Populate the data warehouse

From the landing schema, we move the data to the staging layer and apply the necessary transformations. Our dimensional model has a single fact table, the orders table, which is the largest table and as such needs a distribution key. The choice of key depends on how the data is queried and the size of the dimension tables being joined to. If you’re unsure of your query patterns, you can leave the distribution keys and sort keys for your tables unspecified. Amazon Redshift automatically assigns the correct distribution and sort keys based on your queries. This has the advantage that if and when query patterns change over time, Amazon Redshift can automatically update the keys to reflect the change in usage.

In the staging schema, we keep track of existing records based on their business key (the unique identifier for the record). We create key tables to generate a numeric identity column for each table based on the business key. These key tables allow us to implement an incremental transformation of the data into our dimensional model.

CREATE TABLE IF NOT EXISTS staging.productkey ( 
    productkey integer identity(1,1), 
    productid character varying(16383), 
    CONSTRAINT products_pkey PRIMARY KEY(productkey));   

When loading the data, we need to keep track of the latest surrogate key value to ensure that new records are assigned the correct increment. We do this using maxkey tables (pre-populated with zero):

CREATE TABLE IF NOT EXISTS staging.product_maxkey ( 
    productmaxkey integer);

INSERT INTO staging.product_maxkey
select 0;    

We use staging tables to store our incremental load, the structure of which will mirror our final target model in the dwh schema:

---staging tables to load data from data lake 
   
CREATE TABLE IF NOT EXISTS staging.product ( 
    productkey integer,
    productname character varying(200), 
    color character varying(50), 
    category character varying(100),
    PRIMARY KEY (productkey));
---dwh tables to load data from staging schema
     
CREATE TABLE IF NOT EXISTS dwh.product ( 
    productkey integer,
    productname character varying(200), 
    color character varying(50), 
    category character varying(100),
    PRIMARY KEY (productkey)); 

Incremental processing in the data warehouse

We load the target data warehouse using stored procedures to perform upserts (deletes and inserts performed in a single transaction):

CREATE OR REPLACE PROCEDURE staging.load_order() LANGUAGE plpgsql AS $$
DECLARE
BEGIN

TRUNCATE TABLE staging.order;

--insert new records to get new ids
insert into staging.orderkey
(
orderid
)
select
c.orderid
from landing.order c
LEFT JOIN staging.orderkey i
ON c.orderid=i.orderid
where i.orderid IS NULL;

--update the max key
update staging.order_maxkey
set ordermaxkey = (select max(orderkey) from staging.orderkey);


insert into staging.order
(
orderkey,
customerkey,
productkey,
addresskey,
datekey,
numberofitems,
price
)
select
xid.orderkey,
cid.customerkey,
pid.productkey,
aid.addresskey,
d.datekey,
o.numberofitems,
p.price
from
landing.order o
join staging.orderkey xid on o.orderid=xid.orderid
join landing.customer c on substring(o.pk,6,length(o.pk))=c.username   ---order table needs username
join staging.customerkey cid on cid.username=c.username
join landing.address a on a.username=c.username
join staging.addresskey aid on aid.pk=a.buildingnumber::varchar+'||'+a.postcode  ---maybe change pk to addressid
join staging.datekey d on d.orderdate=o.orderdate
join landing.product p on p.orderid=o.orderid
join staging.productkey pid on pid.productid=p.productid;

COMMIT;

END;
$$ 
CREATE OR REPLACE PROCEDURE dwh.load_order() LANGUAGE plpgsql AS $$
DECLARE
BEGIN

---delete old records 
delete from dwh.order
using staging.order as stage
where dwh.order.orderkey=stage.orderkey;

--insert new and modified
insert into dwh.order
(
orderkey,
customerkey,  
productkey,
addresskey,
price,
datekey  
)
select
orderkey,
customerkey,  
productkey,
addresskey,
price,
datekey
from staging.order;

COMMIT;
END;
$$

Use Step Functions to orchestrate the data pipeline

So far, we have stepped through each component in our workflow. We now need to stitch them together to build an automated, idempotent data pipeline. A good orchestration tool must manage failures, retries, parallelization, service integrations, and observability, so developers can focus solely on the business logic. Ideally, the workflow we build is also serverless so there is no operational overhead. Step Functions is an ideal choice to automate our data pipeline. It allows us to integrate the ELT components we have built on AWS Glue and Amazon Redshift and conduct some steps in parallel to optimize performance.

  1. On the Step Functions console, create a new state machine.
  2. Select Write your workflow in code.

  1. Enter the stepfunction_workflow.json code into the definition, replacing all placeholders with the appropriate values:
    1. [REDSHIFT-CLUSTER-IDENTIFIER] – Use the value for ClusterName (from the Outputs tab in the CloudFormation stack).
    2. [REDSHIFT-DATABASE] – Use the value for salesdwh (unless changed, this is the default database in the CloudFormation template).

We use the Step Functions IAM role from the CloudFormation template.

This JSON code generates the following pipeline.

Starting from the top, the workflow contains the following steps:

  1. We archive any existing raw and data lake files.
  2. We add two AWS Glue StartJobRun tasks that run sequentially: first to export the data from DynamoDB to our raw bucket, then from the raw bucket to our data lake.
  3. After that, we parallelize the landing of data from Amazon S3 to Amazon Redshift.
  4. We transform and load the data into our data warehouse using the Amazon Redshift Data API. Because this is asynchronous, we need to check the status of the runs before moving down the pipeline.
  5. After we move the data load from landing to staging, we truncate the landing tables.
  6. We load the dimensions of our target data warehouse (dwh) first, and finally we load our single fact table with its foreign key dependency on the preceding dimension tables.

The following figure illustrates a successful run.

After we set up the workflow, we can use EventBridge to schedule a daily midnight run, where the target is a Step Functions StartExecution API calling our state machine. Under the workflow permissions, choose Create a new role for this schedule and optionally rename it.

Query the data warehouse

We can verify the data has been successfully loaded into Amazon Redshift with a query.

After we have the data loaded into Amazon Redshift, we’re ready to answer the query asked at the start of this post: what is the year-on-year quarterly sales growth by product and country? The query looks like the following code (depending on your dataset, you may need to select alternative years and quarters):

with sales2021q2
as
(
  select d.year, d.quarter,a.country,p.category,sum(o.price) as revenue2021q2
  from dwh.order o
  join dwh.date d on o.datekey=d.datekey
  join dwh.product p on o.productkey=p.productkey
  join dwh.address a on a.addresskey=o.addresskey
  where d.year=2021 and d.quarter=2
  group by d.year, d.quarter,a.country,p.category
  ),
sales2022q2
as
(
  select d.year, d.quarter,a.country,p.category,sum(o.price) as revenue2022q2
  from dwh.order o
  join dwh.date d on o.datekey=d.datekey
  join dwh.product p on o.productkey=p.productkey
  join dwh.address a on a.addresskey=o.addresskey
  where d.year=2022 and d.quarter=2
  group by d.year, d.quarter,a.country,p.category
  )

select a.country,a.category, ((revenue2022q2 - revenue2021q2)/revenue2021q2)*100 as quarteronquartergrowth
from sales2022q2 a
join sales2021q2 b on a.country=b.country and a.category=b.category
order by a.country,a.category

We can visualize the results in Amazon Redshift Query Editor V2 by toggling the chart option and setting Type as Pie, Values as quarteronquartergrowth, and Labels as category.

Cost considerations

We give a brief outline of the indicative costs associated with the key services covered in our solution based on us-east-1 Region pricing using the AWS Pricing Calculator:

  • DynamoDB – With on-demand settings for 1.5 million items (average size of 355 bytes) and associated write and read capacity plus PITR storage, the cost of DynamoDB is approximately $2 per month.
  • AWS Glue DynamoDB export connector – This connector utilizes the DynamoDB export to Amazon S3 feature. This has no hourly cost—you only pay for the gigabytes of data exported to Amazon S3 ($0.11 per GiB).
  • Amazon S3 – You pay for storing objects in your S3 buckets. The rate you’re charged depends on your objects’ size, how long you stored the objects during the month, and the storage class. In our solution, we used S3 Standard for our data lake and S3 Standard – Infrequent Access for archive. Standard-IA storage is $0.0125 per GB/month; Standard storage is $0.023 per GB/month.
  • AWS Glue Jobs – With AWS Glue, you only pay for the time your ETL job takes to run. There are no resources to manage, no upfront costs, and you are not charged for startup or shutdown time. AWS charges you an hourly rate based on the number of Data Processing Units (DPUs) used to run your ETL job. A single DPU provides 4 vCPU and 16 GB of memory. Every one of our nine Spark jobs uses 10 DPUs and has an average runtime of 3 minutes. This gives an approximate cost of $0.29 per job.
  • Amazon Redshift – We provisioned two RA3.4xlarge nodes for our Amazon Redshift cluster. If run on-demand, each node costs $3.26 per hour. If utilized 24/7, our monthly cost would be approximately $4,759.60. You should evaluate your workload to determine what cost savings can be achieved by using Amazon Redshift Serverless or using Amazon Redshift provisioned reserved instances.
  • Step Functions – You are charged based on the number of state transitions required to run your application. Step Functions counts a state transition as each time a step of your workflow is run. You’re charged for the total number of state transitions across all your state machines, including retries. The Step Functions free tier includes 4,000 free state transitions per month. Thereafter, it’s $0.025 per 1,000 state transitions.

Clean up

Remember to delete any resources created through the CloudFormation stack. You first need to manually empty and delete the S3 buckets. Then you can delete the CloudFormation stack using the AWS CloudFormation console or AWS Command Line Interface (AWS CLI). For instructions, refer to Clean up your “hello, world!” application and related resources.

Summary

In this post, we demonstrated how you can export data from DynamoDB to Amazon S3 and Amazon Redshift to perform advanced analytics. We built an automated data pipeline that you can use to perform a batch ELT process that can be scheduled to run daily, weekly, or monthly and can scale to handle very large workloads.

Please leave your feedback or comments in the comments section.


About the Author

Altaf Hussain is an Analytics Specialist Solutions Architect at AWS. He helps customers around the globe design and optimize their big data and data warehousing solutions.


Appendix

To extract the data from DynamoDB and load it into our Amazon Redshift database, we can use the Spark script editor and upload the files from GlueSparkJobs.zip to create each individual job necessary to perform the extract and load. If you choose to do this, remember to update, where appropriate, the account ID and Region placeholders in the scripts. Also, on the Job details tab under Advanced properties, add the Amazon Redshift connection.

Efficiently crawl your data lake and improve data access with an AWS Glue crawler using partition indexes

Post Syndicated from Srividya Parthasarathy original https://aws.amazon.com/blogs/big-data/efficiently-crawl-your-data-lake-and-improve-data-access-with-aws-glue-crawler-using-partition-indexes/

In today’s world, customers manage vast amounts of data in their Amazon Simple Storage Service (Amazon S3) data lakes, which requires convoluted data pipelines to continuously understand the changes in the data layout and make them available to consuming systems. AWS Glue crawlers provide a straightforward way to catalog data in the AWS Glue Data Catalog that removes the heavy lifting when it comes to schema management and data classification. AWS Glue crawlers extract the data schema and partitions from Amazon S3 to automatically populate the Data Catalog, keeping the metadata current.

But with data growing exponentially over time, the number of partitions in a given table can grow significantly. Because analytics services like Amazon Athena query a table containing millions of partitions, the time needed to retrieve the partition increases and can cause query runtime to increase.

Today, AWS Glue crawler support has been expanded to automatically add partition indexes for newly discovered tables to optimize query processing on the partitioned dataset. Now, when the crawler creates a new Data Catalog table during a crawler run, it also creates a partition index by default, with the largest permutation of all numeric and string type partition columns as keys. The Data Catalog then creates a searchable index based on these keys, reducing the time required to retrieve and filter partition metadata on tables with millions of partitions. The creation of partition indexes benefits the analytics workloads running on Athena, Amazon EMR, Amazon Redshift Spectrum, and AWS Glue.

In this post, we describe how to create partition indexes with an AWS Glue crawler and compare the query performance improvement when accessing the crawled data with and without a partition index from Athena.

Solution overview

We use an AWS CloudFormation template to create our solution resources. In the following steps, we demonstrate how to configure the AWS Glue crawler to create a partition index using either the AWS Glue console or the AWS Command Line Interface (AWS CLI). Then we compare the query performance improvements using Athena.

Prerequisites

To follow along with this post, you must have access to an AWS Identity and Access Management (IAM) administrator role to create resources using AWS CloudFormation.

Set up your solution resources

The CloudFormation template generates the following resources:

  • IAM roles and policies
  • An AWS Glue database to hold the schema
  • An AWS Glue crawler pointing to a highly partitioned dataset
  • An Athena workgroup and bucket to store query results

Complete the following steps to set up the solution resources:

  1. Log in to the AWS Management Console as an IAM administrator.
  2. Choose Launch Stack to deploy the CloudFormation template:
  3. For DatabaseName, keep the default blog_partition_index_crawlerdb.
  4. Choose Next.
  5. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  6. Choose Create stack.
  7. When the stack is complete, on the AWS CloudFormation console, navigate to the Outputs tab of the stack.
  8. Note down values of DatabaseName and GlueCrawlerName.

Some of the resources that this stack deploys incur costs when in use.

Edit and run the AWS Glue crawler

To configure and run the AWS Glue crawler, complete the following steps:

  1. On the AWS Glue console, choose Crawlers in the navigation pane.
  2. Locate the crawler blog-partition-index-crawler and choose Edit.
  3. In the Set output and scheduling section, under Advanced options, select Create partition indexes automatically.
  4. Review and update the crawler settings.

Alternatively, you can configure your crawler using the AWS CLI (provide your IAM role and Region):

aws glue create-crawler --name blog-partition-index-crawler --targets '{ "S3Targets": [{ "Path": "s3://awsglue-datasets/examples/highly-partitioned-table/"}] }' --database-name "blog_partition_index_crawlerdb" --role <Crawler_IAM_role> --configuration "{\"Version\":1.0,\"CreatePartitionIndex\":true}" --region <region_name>
  1. Now run the crawler and verify that the crawler run is complete.

This is highly partitioned dataset and will take approximately 90 minutes to complete.

Verify the partitioned table

In the AWS Glue database blog_partition_index_crawlerdb, verify that the table highly_partitioned_table is created.

By default, the crawler determines an index based on the largest permutation of partition columns of valid column types in the same order of partition columns, which are either numeric or string. For the table created by the crawler (highly_partitioned_table), we have partition columns year (string), month (string), day (string), and hour (string).

Based on this definition, the crawler created an index on the permutation of year, month, day, and hour. The crawler created the indexes prefixed with crawler_ on any partition index created by default.

Verify the same by navigating to the table highly_partitioned_table on the AWS Glue console and choosing the Indexes tab.

The crawler was able to crawl the S3 data source and successfully populate the partition indexes for the table.

Compare the query performance improvements using Athena

First, we query the table in Athena without using the partition index. To verify the tables using Athena, complete the following steps:

  1. On the Athena console, choose crawler-primary-workgroup as the Athena workgroup and choose Acknowledge.
  2. Run the following query:
    select count(*), sum(value) from blog_partition_index_crawlerdb.highly_partitioned_table where year='1980' and month='01' and day ='01'

The following screenshot shows the query took approximately 32 seconds without filtering enabled using the partition index.

  1. Now we enable the partition index on the Athena query:
    ALTER TABLE blog_partition_index_crawlerdb.highly_partitioned_table
    SET TBLPROPERTIES ('partition_filtering.enabled' = 'true')

  2. Run the following query again and note the runtime:
    select count(*), sum(value) from blog_partition_index_crawlerdb.highly_partitioned_table where year=‘1980’ and month=‘01’ and day =‘01’

The following screenshot shows the query took only 700 milliseconds, which is much faster with filtering enabled using the partition index.

Clean up

To avoid unwanted charges to your AWS account, you can delete the AWS resources:

  1. Sign in to the CloudFormation console as the IAM admin used for creating the CloudFormation stack.
  2. Delete the CloudFormation stack you created.

Conclusion

In this post, we explained how to configure an AWS crawler to create partition indexes and compared the query performance when accessing the data with indexes from Athena.

If no partition indexes are present on the table, AWS Glue loads all the partitions of the table, and then filters the loaded partitions, which results in inefficient retrieval of metadata. Analytics services like Redshift Spectrum, Amazon EMR, and AWS Glue ETL Spark DataFrames can now utilize indexes for fetching partitions, resulting in significant query performance.

For more information on partition indexes and query performance across various analytical engines, refer to Improve Amazon Athena query performance using AWS Glue Data Catalog partition indexes and Improve query performance using AWS Glue partition indexes.

Special thanks to everyone who contributed to this crawler feature launch: Yuhang Chen, Kyle Duong,and Mita Gavade.


About the authors

Srividya Parthasarathy is a Senior Big Data Architect on the AWS Lake Formation team. She enjoys building data mesh solutions and sharing them with the community.

Sandeep Adwankar is a Senior Technical Product Manager at AWS. Based in the California Bay Area, he works with customers around the globe to translate business and technical requirements into products that enable customers to improve how they manage, secure, and access data.

AWS Week in Review – Automate DLQ Redrive for SQS, Lambda Supports Ruby 3.2, and More – June 12, 2023

Post Syndicated from Marcia Villalba original https://aws.amazon.com/blogs/aws/aws-week-in-review-automate-dlq-redrive-for-sqs-lambda-supports-ruby-3-2-and-more-june-12-2023/

Today I’m boarding a plane for Madrid. I will attend the AWS Summit Madrid this Thursday, and I will take Serverlesspresso with me. Serverlesspresso is a demo that we take to events, in where you can learn how to build event-driven architectures with serverless. If you are visiting an AWS Summit, most probably you will find one of our booths.

Serverlesspresso at Madrid

Last Week’s Launches
Here are some launches that got my attention during the previous week.

Amazon SQS – Customers were very excited when we announced the DLQ redrive for Amazon SQS as that feature helped them to easily redirect the failed messages. This week we added support for AWS SDK and CLI for this feature, allowing you to redrive the messages on the DLQ automatically, making it even easier to use this feature. You can read Seb’s blog post about this new feature to learn how to get started.

AWS Lambda – AWS Lambda now supports Ruby 3.2. Ruby 3.2 has many new improvements, for example, passing anonymous arguments to functions or having endless methods. Check out this blog post that goes in depth into each of the new features.

Amazon Fraud DetectorAmazon Fraud Detector supports event orchestration with Amazon EventBridge. This is a very important feature because now you can act on the different events that Fraud Detector emits, for example, send notifications to different stakeholders.

AWS Glue – This week, AWS Glue made two important announcements. First, it announced the general availability of AWS Glue for Ray, a new data integration engine option for AWS Glue. Ray is a popular new open-source compute framework that helps developers to scale their Python workloads. In addition, AWS Glue announced AWS Glue Data Quality, a new capability that automatically measures and monitors data lake and data pipeline quality.

Amazon Elastic Container Registry (Amazon ECR)AWS Signer and Amazon ECR announced a new feature that allows you to sign and verify container images. You can use Signer to validate that only container images you have approved are deployed in your Amazon Elastic Kubernetes Service (Amazon EKS) clusters.

Amazon QuickSightAmazon QuickSight now supports APIs to automate asset deployment, so you can replicate the same QuickSight assets in multiple Regions and account easily. You can read more on how to use those APIs in this blog post.

For a full list of AWS announcements, be sure to keep an eye on the What’s New at AWS page.

Other AWS News
Some other updates and news that you may have missed:

Upcoming AWS Events
Check your calendars and sign up for these AWS events:

  • AWS Silicon Innovation Day (June 21) – A one-day virtual event that focuses on AWS Silicon and how you can take advantage of AWS’s unique offerings. Learn more and register here.
  • AWS Global Summits – There are many summits going on right now around the world: Toronto (June 14), Madrid (June 15), and Milano (June 22).
  • AWS Community Day – Join a community-led conference run by AWS user group leaders in your region: Chicago (June 15), Manila (June 29–30), Chile (July 1), and Munich (September 14).
  • CDK Day CDK Day is happening again this year on September 29. The call for papers for this event is open, and this year we are also accepting talks in Spanish. Submit your talk here.

That’s all for this week. Check back next Monday for another Week in Review!

This post is part of our Week in Review series. Check back each week for a quick roundup of interesting news and announcements from AWS!

— Marcia

AWS Glue streaming application to process Amazon MSK data using AWS Glue Schema Registry

Post Syndicated from Vivekanand Tiwari original https://aws.amazon.com/blogs/big-data/aws-glue-streaming-application-to-process-amazon-msk-data-using-aws-glue-schema-registry/

Organizations across the world are increasingly relying on streaming data, and there is a growing need for real-time data analytics, considering the growing velocity and volume of data being collected. This data can come from a diverse range of sources, including Internet of Things (IoT) devices, user applications, and logging and telemetry information from applications, to name a few. By harnessing the power of streaming data, organizations are able to stay ahead of real-time events and make quick, informed decisions. With the ability to monitor and respond to real-time events, organizations are better equipped to capitalize on opportunities and mitigate risks as they arise.

One notable trend in the streaming solutions market is the widespread use of Apache Kafka for data ingestion and Apache Spark for streaming processing across industries. Amazon Managed Streaming for Apache Kafka (Amazon MSK) is a fully managed Apache Kafka service that offers a seamless way to ingest and process streaming data.

However, as data volume and velocity grow, organizations may need to enrich their data with additional information from multiple sources, leading to a constantly evolving schema. The AWS Glue Schema Registry addresses this complexity by providing a centralized platform for discovering, managing, and evolving schemas from diverse streaming data sources. Acting as a bridge between producer and consumer apps, it enforces the schema, reduces the data footprint in transit, and safeguards against malformed data.

To process data effectively, we turn to AWS Glue, a serverless data integration service that provides an Apache Spark-based engine and offers seamless integration with numerous data sources. AWS Glue is an ideal solution for running stream consumer applications, discovering, extracting, transforming, loading, and integrating data from multiple sources.

This post explores how to use a combination of Amazon MSK, the AWS Glue Schema Registry, AWS Glue streaming ETL jobs, and Amazon Simple Storage Service (Amazon S3) to create a robust and reliable real-time data processing platform.

Overview of solution

In this streaming architecture, the initial phase involves registering a schema with the AWS Glue Schema Registry. This schema defines the data being streamed while providing essential details like columns and data types, and a table is created in the AWS Glue Data Catalog based on this schema. This schema serves as a single source of truth for producer and consumer and you can leverage the schema evolution feature of AWS Glue Schema Registry to keep it consistent as the data changes over time. Refer appendix section for more information on this feature. The producer application is able to retrieve the schema from the Schema Registry, and uses it to serialize the records into the Avro format and ingest the data into an MSK cluster. This serialization ensures that the records are properly structured and ready for processing.

Next, an AWS Glue streaming ETL (extract, transform, and load) job is set up to process the incoming data. This job extracts data from the Kafka topics, deserializes it using the schema information from the Data Catalog table, and loads it into Amazon S3. It’s important to note that the schema in the Data Catalog table serves as the source of truth for the AWS Glue streaming job. Therefore, it’s crucial to keep the schema definition in the Schema Registry and the Data Catalog table in sync. Failure to do so may result in the AWS Glue job being unable to properly deserialize records, leading to null values. To avoid this, it’s recommended to use a data quality check mechanism to identify such anomalies and take appropriate action in case of unexpected behavior. The ETL job continuously consumes data from the Kafka topics, so it’s always up to date with the latest streaming data. Additionally, the job employs checkpointing, which keeps track of the processed records and allows it to resume processing from where it left off in the event of a restart. For more information about checkpointing, see the appendix at the end of this post.

After the processed data is stored in Amazon S3, we create an AWS Glue crawler to create a Data Catalog table that acts as a metadata layer for the data. The table can be queried using Amazon Athena, a serverless, interactive query service that enables running SQL-like queries on data stored in Amazon S3.

The following diagram illustrates our solution architecture.

architecture diagram

For this post, we are creating the solution resources in the us-east-1 region using AWS CloudFormation templates. In the following sections, we will show you how to configure your resources and implement the solution.

Prerequisites

Create and download a valid key to SSH into an Amazon Elastic Compute Cloud (Amazon EC2) instance from your local machine. For instructions, see Create a key pair using Amazon EC2.

Configure resources with AWS CloudFormation

To create your solution resources, complete the following steps:

  1. Launch the stack vpc-subnet-and-mskclient using the CloudFormation template vpc-subnet-and-mskclient.template. This stack creates an Amazon VPC, private and public subnets, security groups, interface endpoints, an S3 bucket, an AWS Secrets Manager secret, and an EC2 instance.
    launch stack 1
  2. Provide parameter values as listed in the following table.

    Parameters Description
    EnvironmentName Environment name that is prefixed to resource names.
    VpcCIDR IP range (CIDR notation) for this VPC.
    PublicSubnet1CIDR IP range (CIDR notation) for the public subnet in the first Availability Zone.
    PublicSubnet2CIDR IP range (CIDR notation) for the public subnet in the second Availability Zone.
    PrivateSubnet1CIDR IP range (CIDR notation) for the private subnet in the first Availability Zone.
    PrivateSubnet2CIDR IP range (CIDR notation) for the private subnet in the second Availability Zone.
    KeyName Key pair name used to log in to the EC2 instance.
    SshAllowedCidr CIDR block for allowing SSH connection to the instance. Check your public IP using http://checkip.amazonaws.com/ and add /32 at the end of the IP address.
    InstanceType Instance type for the EC2 instance.
  3. When stack creation is complete, retrieve the EC2 instance PublicDNS and S3 bucket name (for key BucketNameForScript) from the stack’s Outputs tab.Cloudformation stack 1 - output
  4. Log in to the EC2 instance using the key pair you created as a prerequisite.
  5. Clone the GitHub repository, and upload the ETL script from the glue_job_script folder to the S3 bucket created by the CloudFormation template:
    $ git clone https://github.com/aws-samples/aws-glue-msk-with-schema-registry.git 
    $ cd aws-glue-msk-with-schema-registry 
    $ aws s3 cp glue_job_script/mskprocessing.py s3://{BucketNameForScript}/

  6. Launch another stack amazon-msk-and-glue using template amazon-msk-and-glue.template. This stack creates an MSK cluster, schema registry, schema definition, database, table, AWS Glue crawler, and AWS Glue streaming job.
    launch stack 1
  7. Provide parameter values as listed in the following table.

    Parameters Description Sample value
    EnvironmentName Environment name that is prefixed to resource names. amazon-msk-and-glue
    VpcId ID of the VPC for security group. Use the VPC ID created with the first stack. Refer to the first stack’s output.
    PrivateSubnet1 Subnet used for creating the MSK cluster and AWS Glue connection. Refer to the first stack’s output.
    PrivateSubnet2 Second subnet for the MSK cluster. Refer to the first stack’s output.
    SecretArn Secrets Manager secret ARN for Amazon MSK SASL/SCRAM authentication. Refer to the first stack’s output.
    SecurityGroupForGlueConnection Security group used by the AWS Glue connection. Refer to the first stack’s output.
    AvailabilityZoneOfPrivateSubnet1 Availability Zone for the first private subnet used for the AWS Glue connection.
    SchemaRegistryName Name of the AWS Glue schema registry. test-schema-registry
    MSKSchemaName Name of the schema. test_payload_schema
    GlueDataBaseName Name of the AWS Glue Data Catalog database. test_glue_database
    GlueTableName Name of the AWS Glue Data Catalog table. output
    ScriptPath AWS Glue ETL script absolute S3 path. For example, s3://bucket-name/mskprocessing.py. Use the target S3 path from the previous steps.
    GlueWorkerType Worker type for AWS Glue job. For example, Standard, G.1X, G.2X, G.025X. G.1X
    NumberOfWorkers Number of workers in the AWS Glue job. 5
    S3BucketForOutput Bucket name for writing data from the AWS Glue job. aws-glue-msk-output-{accId}-{region}
    TopicName MSK topic name that needs to be processed. test

    The stack creation process can take around 15–20 minutes to complete. You can check the Outputs tab for the stack after the stack is created.

    Cloudformation stack 2 output

    The following table summarizes the resources that are created as a part of this post.

    Logical ID Type
    VpcEndoint AWS::EC2::VPCEndpoint
    VpcEndoint AWS::EC2::VPCEndpoint
    DefaultPublicRoute AWS::EC2::Route
    EC2InstanceProfile AWS::IAM::InstanceProfile
    EC2Role AWS::IAM::Role
    InternetGateway AWS::EC2::InternetGateway
    InternetGatewayAttachment AWS::EC2::VPCGatewayAttachment
    KafkaClientEC2Instance AWS::EC2::Instance
    KeyAlias AWS::KMS::Alias
    KMSKey AWS::KMS::Key
    KmsVpcEndoint AWS::EC2::VPCEndpoint
    MSKClientMachineSG AWS::EC2::SecurityGroup
    MySecretA AWS::SecretsManager::Secret
    PrivateRouteTable1 AWS::EC2::RouteTable
    PrivateSubnet1 AWS::EC2::Subnet
    PrivateSubnet1RouteTableAssociation AWS::EC2::SubnetRouteTableAssociation
    PrivateSubnet2 AWS::EC2::Subnet
    PrivateSubnet2RouteTableAssociation AWS::EC2::SubnetRouteTableAssociation
    PublicRouteTable AWS::EC2::RouteTable
    PublicSubnet1 AWS::EC2::Subnet
    PublicSubnet1RouteTableAssociation AWS::EC2::SubnetRouteTableAssociation
    PublicSubnet2 AWS::EC2::Subnet
    PublicSubnet2RouteTableAssociation AWS::EC2::SubnetRouteTableAssociation
    S3Bucket AWS::S3::Bucket
    S3VpcEndoint AWS::EC2::VPCEndpoint
    SecretManagerVpcEndoint AWS::EC2::VPCEndpoint
    SecurityGroup AWS::EC2::SecurityGroup
    SecurityGroupIngress AWS::EC2::SecurityGroupIngress
    VPC AWS::EC2::VPC
    BootstrapBrokersFunctionLogs AWS::Logs::LogGroup
    GlueCrawler AWS::Glue::Crawler
    GlueDataBase AWS::Glue::Database
    GlueIamRole AWS::IAM::Role
    GlueSchemaRegistry AWS::Glue::Registry
    MSKCluster AWS::MSK::Cluster
    MSKConfiguration AWS::MSK::Configuration
    MSKPayloadSchema AWS::Glue::Schema
    MSKSecurityGroup AWS::EC2::SecurityGroup
    S3BucketForOutput AWS::S3::Bucket
    CleanupResourcesOnDeletion AWS::Lambda::Function
    BootstrapBrokersFunction AWS::Lambda::Function

Build and run the producer application

After successfully creating the CloudFormation stack, you can now proceed with building and running the producer application to publish records on MSK topics from the EC2 instance, as shown in the following code. Detailed instructions including supported arguments and their usage are outlined in the README.md page in the GitHub repository.

$ cd amazon_msk_producer 
$ mvn clean package 
$ BROKERS={OUTPUT_VAL_OF_MSKBootstrapServers – Ref. Step 6}
$ REGISTRY_NAME={VAL_OF_GlueSchemaRegistryName - Ref. Step 6}
$ SCHEMA_NAME={VAL_OF_SchemaName– Ref. Step 6}
$ TOPIC_NAME="test"
$ SECRET_ARN={OUTPUT_VAL_OF_SecretArn – Ref. Step 3}
$ java -jar target/amazon_msk_producer-1.0-SNAPSHOT-jar-with-dependencies.jar -brokers $BROKERS -secretArn $SECRET_ARN -region us-east-1 -registryName $REGISTRY_NAME -schema $SCHEMA_NAME -topic $TOPIC_NAME -numRecords 10

If the records are successfully ingested into the Kafka topics, you may see a log similar to the following screenshot.

kafka log

Grant permissions

Confirm if your AWS Glue Data Catalog is being managed by AWS Lake Formation and grant necessary permissions. To check if Lake Formation is managing the permissions for the newly created tables, we can navigate to the Settings page on the Lake Formation console, or we can use the Lake Formation CLI command get-data-lake-settings.

If the check boxes on the Lake Formation Data Catalog settings page are unselected (see the following screenshot), that means that the Data Catalog permissions are being managed by LakeFormation.

Lakeformation status

If using the Lake Formation CLI, check if the values of CreateDatabaseDefaultPermissions and CreateTableDefaultPermissions are NULL in the output. If so, this confirms that the Data Catalog permissions are being managed by AWS Lake Formation.

If we can confirm that the Data Catalog permissions are being managed by AWS Lake Formation, we have to grant DESCRIBE and CREATE TABLE permissions for the database, and SELECT, ALTER, DESCRIBE and INSERT permissions for the table to the AWS Identity and Access Management role (IAM role) used by AWS Glue streaming ETL job before starting the job. Similarly, we have to grant DESCRIBE permissions for the database and DESCRIBE AND SELECT permissions for the table to the IAM principals using Amazon Athena to query the data. We can get the AWS Glue service IAM role, database, table, streaming job name, and crawler names from the Outputs tab of the CloudFormation stack amazon-msk-and-glue. For instructions on granting permissions via AWS Lake Formation, refer to Granting Data Catalog permissions using the named resource method.

Run the AWS Glue streaming job

To process the data from the MSK topic, complete the following steps:

  1. Retrieve the name of the AWS Glue streaming job from the amazon-msk-and-glue stack output.
  2. On the AWS Glue console, choose Jobs in the navigation pane.
  3. Choose the job name to open its details page.
  4. Choose Run job to start the job.

Because this is a streaming job, it will continue to run indefinitely until manually stopped.

Run the AWS Glue crawler

Once AWS Glue streaming job starts processing the data, you can use the following steps to check the processed data, and create a table using AWS Glue Crawler to query it

  1. Retrieve the name of the output bucket S3BucketForOutput from the stack output and validate if output folder has been created and contains data.
  2. Retrieve the name of the Crawler from the stack output.
  3. Navigate to the AWS Glue Console.
  4. In the left pane, select Crawlers.
  5. Run the crawler.

In this post, we run the crawler one time to create the target table for demo purposes. In a typical scenario, you would run the crawler periodically or create or manage the target table another way. For example, you could use the saveAsTable() method in Spark to create the table as part of the ETL job itself, or you could use enableUpdateCatalog=True in the AWS Glue ETL job to enable Data Catalog updates. For more information about this AWS Glue ETL feature, refer to Creating tables, updating the schema, and adding new partitions in the Data Catalog from AWS Glue ETL jobs.

Validate the data in Athena

After the AWS Glue crawler has successfully created the table for the processed data in the Data Catalog, follow these steps to validate the data using Athena:

  1. On the Athena console, navigate to the query editor.
  2. Choose the Data Catalog as the data source.
  3. Choose the database and table that the crawler created.
  4. Enter a SQL query to validate the data.
  5. Run the query.

The following screenshot shows the output of our example query.

Athena output

Clean up

To clean up your resources, complete the following steps:

  1. Delete the CloudFormation stack amazon-msk-and-glue.
  2. Delete the CloudFormation stack vpc-subnet-and-mskclient.

Conclusion

This post provided a solution for building a robust streaming data processing platform using a combination of Amazon MSK, the AWS Glue Schema Registry, an AWS Glue streaming job, and Amazon S3. By following the steps outlined in this post, you can create and control your schema in the Schema Registry, integrate it with a data producer to ingest data into an MSK cluster, set up an AWS Glue streaming job to extract and process data from the cluster using the Schema Registry, store processed data in Amazon S3, and query it using Athena.

Let’s start using AWS Glue Schema Registry to manage schema evolution for streaming data ETL with AWS Glue. If you have any feedback related to this post, please feel free to leave them in the comments section below.

Appendix

This appendix section provides more information about Apache Spark Structured Streaming Checkpointing feature and a brief summary on how schema evolution can be handled using AWS Glue Schema Registry.

Checkpointing

Checkpointing is a mechanism in Spark streaming applications to persist enough information in a durable storage to make the application resilient and fault-tolerant. The items stored in checkpoint locations are mainly the metadata for application configurations and the state of processed offsets. Spark uses synchronous checkpointing, meaning it ensures that the checkpoint state is updated after every micro-batch run. It stores the end offset value of each partition under the offsets folder for the corresponding micro-batch run before processing, and logs the record of processed batches under the commits folder. In the event of a restart, the application can recover from the last successful checkpoint, provided the offset hasn’t expired in the source Kafka topic. If the offset has expired, we have to set the property failOnDataLoss to false so that the streaming query doesn’t fail as a result of this.

Schema evolution

As the schema of data evolves over time, it needs to be incorporated into producer and consumer applications to avert application failure due to data encoding issues. The AWS Glue Schema Registry offers a rich set of options for schema compatibility such as backward, forward, and full to update the schema in the Schema Registry. Refer to Schema versioning and compatibility for the full list.

The default option is backward compatibility, which satisfies the majority of use cases. This option allows you to delete any existing fields and add optional fields. Steps to implement schema evolution using the default compatibility are as follows:

  1. Register the new schema version to update the schema definition in the Schema Registry.
  2. Upon success, update the AWS Glue Data Catalog table using the updated schema.
  3. Restart the AWS Glue streaming job to incorporate the changes in the schema for data processing.
  4. Update the producer application code base to build and publish the records using the new schema, and restart it.

About the Authors

Author Headshot - Vivekanand TiwariVivekanand Tiwari is a Cloud Architect at AWS. He finds joy in assisting customers on their cloud journey, especially in designing and building scalable, secure, and optimized data and analytics workloads on AWS. During his leisure time, he prioritizes spending time with his family.

Author Headshot - Subramanya VajirayaSubramanya Vajiraya is a Sr. Cloud Engineer (ETL) at AWS Sydney specialized in AWS Glue. He is passionate about helping customers solve issues related to their ETL workload and implement scalable data processing and analytics pipelines on AWS. Outside of work, he enjoys going on bike rides and taking long walks with his dog Ollie, a 2-year-old Corgi.

Author Headshot - Akash DeepAkash Deep is a Cloud Engineer (ETL) at AWS with a specialization in AWS Glue. He is dedicated to assisting customers in resolving issues related to their ETL workloads and creating scalable data processing and analytics pipelines on AWS. In his free time, he prioritizes spending quality time with his family.

How Cargotec uses metadata replication to enable cross-account data sharing

Post Syndicated from Sumesh M R original https://aws.amazon.com/blogs/big-data/how-cargotec-uses-metadata-replication-to-enable-cross-account-data-sharing/

This is a guest blog post co-written with Sumesh M R from Cargotec and Tero Karttunen from Knowit Finland.

Cargotec (Nasdaq Helsinki: CGCBV) is a Finnish company that specializes in cargo handling solutions and services. They are headquartered in Helsinki, Finland, and operates globally in over 100 countries. With its leading cargo handling solutions and services, they are pioneers in their field. Through their unique position in ports, at sea, and on roads, they optimize global cargo flows and create sustainable customer value.

Cargotec captures terabytes of IoT telemetry data from their machinery operated by numerous customers across the globe. This data needs to be ingested into a data lake, transformed, and made available for analytics, machine learning (ML), and visualization. For this, Cargotec built an Amazon Simple Storage Service (Amazon S3) data lake and cataloged the data assets in AWS Glue Data Catalog. They chose AWS Glue as their preferred data integration tool due to its serverless nature, low maintenance, ability to control compute resources in advance, and scale when needed.

In this blog, we discuss the technical challenges faced by Cargotec in replicating their AWS Glue metadata across AWS accounts, and how they navigated these challenges successfully to enable cross-account data sharing.  By sharing their story, we hope to inspire readers facing similar challenges and provide insights into how our services can be customized to meet your specific needs.

Challenges

Like many customers, Cargotec’s data lake is distributed across multiple AWS accounts that are owned by different teams. Cargotec wanted to find a solution to share datasets across accounts and use Amazon Athena to query them. To share the datasets, they needed a way to share access to the data and access to catalog metadata in the form of tables and views. Cargotec’s use cases also required them to create views that span tables and views across catalogs. Cargotec’s implementation covers three discrete AWS accounts, 25 databases, 150 tables, and 10 views.

Solution overview

Cargotec required a single catalog per account that contained metadata from their other AWS accounts. The solution that best fit their needs was to replicate metadata using an in-house version of a publicly available utility called Metastore Migration utility. Cargotec extended the utility by changing the overall orchestration layer by adding an Amazon SQS notification and an AWS Lambda. The approach was to programmatically copy and make available each catalog entity (databases, tables, and views) to all consumer accounts. This makes the tables or views local to the account where the query is being run, while the data still remains in its source S3 bucket.

Cargotec’s solution architecture

The following diagram summarizes the architecture and overall flow of events in Cargotec’s design.

Solution Architecture

Catalog entries from a source account are programmatically replicated to multiple target accounts using the following series of steps.

  1. An AWS Glue job (metadata exporter) runs daily on the source account. It reads the table and partition information from the source AWS Glue Data Catalog. Since the target account is used for analytical purposes and does not require real-time schema changes, the metadata exporter runs only once a day. Cargotec uses partition projection, which ensures that the new partitions are available in real-time.
  2. The job then writes the metadata to an S3 bucket in the same account. Please note that the solution doesn’t involve movement of the data across accounts. The target accounts read data from the source account S3 buckets. For guidance on setting up the right permissions, please see the Amazon Athena User Guide.
  3. After the metadata export has been completed, the AWS Glue job pushes a notification to an Amazon Simple Notification Service (Amazon SNS) topic. This message contains the S3 path to the latest metadata export. The SNS notification is Cargotec’s customization to the existing open-source utility.
  4. Every target account runs an AWS Lambda function that is notified when the source account SNS topic receives a push. In short, there are multiple subscriber Lambda functions (one per target account) for the source account SNS topics that get triggered when an export job is completed.
  5. Once triggered, the Lambda function then initiates an AWS Glue job (metadata importer) on the respective target account. The job receives as input the source account’s S3 path to the metadata that has been recently exported.
  6. Based on the path provided, the metadata importer reads the exported metadata from the source S3 bucket.
  7. The metadata importer now uses this information to create or update the corresponding catalog information in the target account.

All along the way, any errors are published to a separate SNS topic for logging and monitoring purposes. With this approach, Cargotec was able to create and consume views that span tables and views from multiple catalogs spread across different AWS accounts.

Implementation

The core of the catalog replication utility is two AWS Glue scripts:

  • Metadata exporter – An AWS Glue job that reads the source data catalog and creates an export of the databases, tables, and partitions in an S3 bucket in the source account.
  • Metadata importer – An AWS Glue job that reads the export that was created by the metadata exporter and applies the metadata to target databases. This code is triggered by a Lambda function once files are written to S3. The job runs in the target account.

Metadata exporter

This section provides details on the AWS Glue job that exports the AWS Glue Data Catalog into an S3 location. The source code for the application is hosted the AWS Glue GitHub. Though this may need to be customized to suit your needs, we will go over the core components of the code in this blog.

Metadata exporter inputs

The application takes a few job input parameters as described below:

  • --mode key accepts either to-s3 or to-jdbc. The latter is used when the code is moving the metadata directly into a JDBC Hive Metastore. In the case of Cargotec, since we are moving the metadata to files on S3, the value for --mode will remain to-s3.
  • --output-path accepts an S3 location to which the exported metadata should be written. The code creates subdirectories corresponding to databases, tables, and partitions.
  • --database-names accepts a semicolon-separated list of databases on the source catalog that need to be replicated to the target

Reading the catalog

The metadata about the database, tables, and partitions are read from the AWS Glue catalog.

dyf = glue_context.create_dynamic_frame.from_options(
 connection_type=’com.amazonaws.services.glue.connections.DataCatalogConnection‘,
            connection_options = {
                            'catalog.name': ‘datacatalog’,
                            'catalog.database': database,
                            'catalog.region': region
                                 })

The above code snippet reads the metadata into an AWS Glue DynamicFrame. The frame is then converted to a Spark DataFrame. It is filtered into individual DataFrames based on it being either part of a database, table, or partition. A schema is attached to the data frame using one of the below:

DATACATALOG_DATABASE_SCHEMA = 
    StructType([
        StructField('items', ArrayType(
            DATACATALOG_DATABASE_ITEM_SCHEMA, False),
                    True),
        StructField('type', StringType(), False)
    ])
DATACATALOG_TABLE_SCHEMA = 
    StructType([
        StructField('database', StringType(), False),
        StructField('type', StringType(), False),
        StructField('items', ArrayType(DATACATALOG_TABLE_ITEM_SCHEMA, False), True)
    ])
DATACATALOG_PARTITION_SCHEMA = 
    StructType([
        StructField('database', StringType(), False),
        StructField('table', StringType(), False),
        StructField('items', ArrayType(DATACATALOG_PARTITION_ITEM_SCHEMA, False), True),
        StructField('type', StringType(), False)
    ])

For details on the individual item schema, refer to the schema definition on GitHub.

Persisting the metadata

After converting to a DataFrame with schema, it is persisted to the S3 location marked by the output-path parameter

databases.write.format('json').mode('overwrite').save(output_path + 'databases')
tables.write.format('json').mode('overwrite').save(output_path + 'tables')
partitions.write.format('json').mode('overwrite').save(output_path + 'partitions')

Exploring the output

Navigate to the S3 bucket that contains the output location, and you should be able to see the output metadata in format. An example export for a table would look like the following code snippet.

{
    "database": "default",
    "type": "table",
    "item": {
        "createTime": "1651241372000",
        "lastAccessTime": "0",
        "owner": "spark",
        "retention": 0,
        "name": "an_example_table",
        "tableType": "EXTERNAL_TABLE",
        "parameters": {
            "totalSize": "2734148",
            "EXTERNAL": "TRUE",
            "last_commit_time_sync": "20220429140907",
            "spark.sql.sources.schema.part.0": "{redacted_schema}",
            "numFiles": "1",
            "transient_lastDdlTime": "1651241371",
            "spark.sql.sources.schema.numParts": "1",
            "spark.sql.sources.provider": "hudi"
        },
        "partitionKeys": [],
        "storageDescriptor": {
            "inputFormat": "org.apache.hudi.hadoop.HoodieParquetInputFormat",
            "compressed": false,
            "storedAsSubDirectories": false,
            "location": "s3://redacted_bucket_name/table/an_example_table",
            "numberOfBuckets": -1,
            "outputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
            "bucketColumns": [],
            "columns": [{
                    "name": "_hoodie_commit_time",
                    "type": "string"
                },
                {
                    "name": "_hoodie_commit_seqno",
                    "type": "string"
                }
            ],
            "parameters": {},
            "serdeInfo": {
                "serializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
                "parameters": {
                    "hoodie.query.as.ro.table": "false",
                    "path": "s3://redacted_bucket_name/table/an_example_table",
                    "serialization.format": "1"
                }
            },
            "skewedInfo": {
                "skewedColumnNames": [],
                "skewedColumnValueLocationMaps": {},
                "skewedColumnValues": []
            },
            "sortColumns": []
        }
    }
}

Once the export job is complete, the output S3 path will be pushed to an SNS topic. A Lambda function at the target account processes this message and invokes the import AWS Glue job by passing the S3 import location.

Metadata importer

The import job runs on the target account. The code for the job is available on GitHub. As with the exporter, you may need to customize it to suit your specific requirements, but the code as-is should work for most scenarios.

Metadata importer inputs

The inputs to the application are provided as job parameters. Below is a list of parameters that are used for the import process:

  • --mode key accepts either from-s3 or from-jdbc. The latter is used when migration is from a JDBC source to the AWS Glue Data Catalog. At Cargotec, the metadata is already written to Amazon S3, and hence the value for this key is always set to from-s3.
  • --region key accepts a valid AWS Region for the AWS Glue Catalog. The target Region is specified using this key.
  • --database-input-path key accepts the path to the file containing the database metadata. This is the output of the previous import job.
  • --table-input-path key accepts the path to the file containing the table metadata. This is the output of the previous import job.
  • --partition-input-path key accepts the path to the file containing the partition metadata. This is the output of the previous import job.

Reading the metadata

The metadata, as previously discussed, are files on Amazon S3. They are read into individual spark data frames with their respective schema information

databases = sql_context.read.json(path=db_input_dir, schema=METASTORE_DATABASE_SCHEMA)
tables = sql_context.read.json(path=tbl_input_dir, schema=METASTORE_TABLE_SCHEMA)
partitions = sql_context.read.json(path=parts_input_dir, schema=METASTORE_PARTITION_SCHEMA)

Loading the catalog

Once the spark data frames are read, they are converted to AWS Glue DynamicFrame and then loaded to the catalog, as shown in the following snippet.

glue_context.write_dynamic_frame.from_options(
        frame=dyf_databases, 
        connection_type='catalog',
        connection_options={
               'catalog.name': datacatalog_name, 
               'catalog.region': region
         }
)
glue_context.write_dynamic_frame.from_options(
        frame=dyf_tables, 
        connection_type='catalog',
        connection_options={
                'catalog.name': datacatalog_name, 
                'catalog.region': region
        }
)
glue_context.write_dynamic_frame.from_options(
        frame=dyf_partitions, 
        connection_type='catalog',
        connection_options={
                 'catalog.name': datacatalog_name, 
                 'catalog.region': region
         }
)

Once the job concludes, you can query the target AWS Glue catalog to ensure the tables from the source have been synced with the destination. To keep things simple and easy to manage, instead of implementing a mechanism to identify tables that change over time, Cargotec updates the catalog information of all databases or tables that are configured in the export job.

Considerations

Though the setup works effectively for Cargotec’s current business requirements, there are a few drawbacks to this approach, which are highlighted below:

  1. The solution involves code. Customizations were made to the existing open-source utility to be able to publish an SNS notification once an export is complete and a Lambda function to trigger the import process.
  2. The export process on the source account is a scheduled job. Hence there is no real-time sync between the source and target accounts. This was not a requirement for Cargotec’s business process.
  3. For tables that don’t use Athena partition projection, query results may be outdated until the new partitions are added to the metastore through MSCK REPAIR TABLE, ALTER TABLE ADD PARTITION, AWS Glue crawler, and so on.
  4. The current approach requires syncing all the tables across the source and target. If the requirement is to capture only the ones that changed instead of a scheduled daily export, the design needs to change and could benefit from the Amazon EventBridge integration with AWS Glue. An example implementation of using AWS Glue APIs to identify changes is shown in Identify source schema changes using AWS Glue.

Conclusion

In this blog post, we have explored a solution for cross-account sharing of data and tables that makes it possible for Cargotec to create views that combine data from multiple AWS accounts. We’re excited to share Cargotec’s success and believe the post has provided you with valuable insights and inspiration for your own projects.

We encourage you to explore our range of services and see how they can help you achieve your goals. Lastly, for more data and analytics blogs, feel free to bookmark the AWS Blogs.


About the Authors

Sumesh M R is a Full Stack Machine Learning Architect at Cargotec. He has several years of software engineering and ML background. Sumesh is an expert in Sagemaker and other AWS ML/Analytics services. He is passionate about data science and loves to explore the latest ML libraries and techniques. Before joining Cargotec, he worked as a Solution Architect at TCS. In his spare time, he loves to play cricket and badminton.

 Tero Karttunen is a Senior Cloud Architect at Knowit Finland. He advises clients on architecting and adopting Data Architectures that best serve their Data Analytics and Machine Learning needs. He has helped Cargotec in their data journey for more than two years. Outside of work, he enjoys running, winter sports, and role-playing games.

Arun A K is a Big Data Specialist Solutions Architect at AWS.  He works with customers to provide architectural guidance for running analytics solutions on AWS Glue, AWS Lake Formation, Amazon Athena, and Amazon EMR. In his free time, he likes to spend time with his friends and family.

AWS Glue Data Quality is Generally Available

Post Syndicated from Shiv Narayanan original https://aws.amazon.com/blogs/big-data/aws-glue-data-quality-is-generally-available/

We are excited to announce the General Availability of AWS Glue Data Quality.

Our journey started by working backward from our customers who create, manage, and operate data lakes and data warehouses for analytics and machine learning. To make confident business decisions, the underlying data needs to be accurate and recent. Otherwise, data consumers lose trust in the data and make suboptimal or incorrect decisions. For example, medical researchers found that across 79,000 emergency department encounters of pediatric patients in a hospital, incorrect or missing patient weight measurements led to medication dosing errors in 34% of cases. A data quality check to identify missing patient weight measurements or a check to ensure patients’ weights are trending within certain thresholds would have alerted respective teams to identify these discrepancies.

For our customers, setting up these data quality checks is manual, time consuming, and error prone. It takes days for data engineers to identify and implement data quality rules. They have to gather detailed data statistics, such as minimums, maximums, averages, and correlations. They have to then review the data statistics to identify data quality rules, and write code to implement these checks in their data pipelines. Data engineers must then write code to monitor data pipelines, visualize quality scores, and alert them when anomalies occur. They have to repeat these processes across thousands of datasets and the hundreds of data pipelines populating them. Some customers adopt commercial data quality solutions; however, these solutions require time-consuming infrastructure management and are expensive. Our customers needed a simple, cost-effective, and automatic way to manage data quality.

In this post, we discuss the capabilities and features of AWS Glue Data Quality.

Capabilities of AWS Glue Data Quality

AWS Glue Data Quality accelerates your data quality journey with the following key capabilities:

  • Serverless – AWS Glue Data Quality is a feature of AWS Glue, which eliminates the need for infrastructure management, patching, and maintenance.
  • Reduced manual efforts with recommending data quality rules and out-of-the-box rules – AWS Glue Data Quality computes data statistics such as minimums, maximums, histograms, and correlations for datasets. It then uses these statistics to automatically recommend data quality rules that check for data freshness, accuracy, and integrity. This reduces manual data analysis and rule identification efforts from days to hours. You can then augment recommendations with out-of-the-box data quality rules. The following table lists the rules that are supported by AWS Glue Data Quality as of writing. For an up-to-date list, refer to Data Quality Definition Language (DQDL).
Rule Type Description
AggregateMatch Checks if two datasets match by comparing summary metrics like total sales amount. Useful for customers to compare if all data is ingested from source systems.
ColumnCorrelation Checks how well two columns are corelated.
ColumnCount Checks if any columns are dropped.
ColumnDataType Checks if a column is compliant with a data type.
ColumnExists Checks if columns exist in a dataset. This allows customers building self-service data platforms to ensure certain columns are made available.
ColumnLength Checks if length of data is consistent.
ColumnNamesMatchPattern Checks if column names match defined patterns. Useful for governance teams to enforce column name consistency.
ColumnValues Checks if data is consistent per defined values. This rule supports regular expressions.
Completeness Checks for any blank or NULLs in data.
CustomSql Customers can implement almost any type of data quality checks in SQL.
DataFreshness Checks if data is fresh.
DatasetMatch Compares two datasets and identifies if they are in sync.
DistinctValuesCount Checks for duplicate values.
Entropy Checks for entropy of the data.
IsComplete Checks if 100% of the data is complete.
IsPrimaryKey Checks if a column is a primary key (not NULL and unique).
IsUnique Checks if 100% of the data is unique.
Mean Checks if the mean matches the set threshold.
ReferentialIntegrity Checks if two datasets have referential integrity.
RowCount Checks if record counts match a threshold.
RowCountMatch Checks if record counts between two datasets match.
StandardDeviation Checks if standard deviation matches the threshold.
SchemaMatch Checks if schema between two datasets match.
Sum Checks if sum matches a set threshold.
Uniqueness Checks if uniqueness of dataset matches a threshold.
UniqueValueRatio Checks if the unique value ration matches a threshold.
  • Embedded in customer workflow – AWS Glue Data Quality has to blend into customer workflows for it to be useful. Disjointed experiences create friction in getting started. You can access AWS Glue Data Quality from the AWS Glue Data Catalog, allowing data stewards to set up rules while they are using the Data Catalog. You can also access AWS Glue Data Quality from Glue Studio (AWS Glue’s visual authoring tool), Glue Studio notebooks (a notebook-based interface for coders to create data integration pipelines), and interactive sessions, an API where data engineers can submit jobs from their choice of code editor.
  • Pay-as-you-go and cost-effective – AWS Glue Data Quality is charged based on the compute used. This simple pricing model doesn’t lock you into annual licenses. AWS Glue ETL-based data quality checks can use Flex execution, which is 34% cheaper for non-SLA sensitive data quality checks. Additionally, AWS Glue Data Quality rules on data pipelines can help you save costs because you don’t have to waste compute resources on bad quality data when detected early. Also, when data quality checks are configured as part of data pipelines, you only incur an incremental cost because the data is already read and mostly in memory.
  • Built on open-source – AWS Glue Data Quality is built on open-source DeeQu, a library that is used internally by Amazon to manage the quality of data lakes over 60 PB. DeeQu is optimized to run data quality rules in minimal passes that makes it efficient. The rules that are authored in AWS Glue Data Quality can be run in any environment that can run DeeQu, allowing you to stay in an open-source solution.
  • Simplified rule authoring language – As part of AWS Glue Data Quality, we announced Data Quality Definition Language (DQDL). DQDL attempts to standardize data quality rules so that you can use the same data quality rules across different databases and engines. DQDL is simple to author and read, and brings the goodness of code that developers like, such as version control and deployment. To demonstrate the simplicity of this language, the following example shows three rules that check if record counts are greater than 10, and ensures that VendorID doesn’t have any empty values and VendorID has a certain range of values:
    Rules = [
        RowCount > 10,
        IsComplete "VendorID",
        ColumnValues "VendorID" in ["1", "2", "3", "4"]
        ]

General Availability features

AWS Glue Data Quality has several key enhancements from the preview version:

  • Error record identification – You need to know which records failed data quality checks. We have launched this capability in AWS Glue ETL, where the data quality transform now enriches the input dataset with new columns that identify which records failed data quality checks. This can help you quarantine bad data so that only good records flow into your data repositories.
  • New rule types that validate data across multiple datasets – With new rule types like ReferentialIntegrity, DatasetMatches, RowCountMatches, and AggregateMatches, you can compare two datasets to ensure that data integrity is maintained. The SchemaMatch rule type ensures that the dataset accurately matches a set schema, preventing downstream errors that may be caused by schema changes.
  • Amazon EventBridge integration – Integration with Amazon EventBridge enables you to simplify how you set up alerts when quality rules fail. A one-time setup is now sufficient to alert data consumers about data quality failures.
  • AWS CloudFormation support – With support for AWS CloudFormation, AWS Glue Data Quality now enables you to easily deploy data quality rules in many environments
  • Join support in CustomSQL rule type – You can now join datasets in CustomSQL rule types to write complex business rules.
  • New data source support – You can check data quality on open transactional formats such as Apache HUDI, Apache Iceberg, and Delta Lake. Additionally, you can set up data quality rules on Amazon Redshift and Amazon Relational Database Service (Amazon RDS) data sources cataloged in the AWS Glue Data Catalog.

Summary

AWS Data Quality is now Generally Available. To help you get started, we have created a five-part blog series:

Get started today with AWS Glue Data Quality and tell us what you think.


About the authors

Shiv Narayanan is a Technical Product Manager for AWS Glue’s data management capabilities like data quality, sensitive data detection and streaming capabilities. Shiv has over 20 years of data management experience in consulting, business development and product management.

Tome Tanasovski is a Technical Manager at AWS, for a team that manages capabilities into Amazon’s big data platforms via AWS Glue. Prior to working at AWS, Tome was an executive for a market-leading global financial services firm in New York City where he helped run the Firm’s Artificial Intelligence & Machine Learning Center of Excellence. Prior to this role he spent nine years in the Firm focusing on automation, cloud, and distributed computing. Tome has a quarter-of-a-century worth of experience in technology in the Tri-state area across a wide variety of industries including big tech, finance, insurance, and media.

Brian Ross is a Senior Software Development Manager at AWS.  He has spent 24 years building software at scale and currently focuses on serverless data integration with AWS Glue. In his spare time, he studies ancient texts, cooks modern dishes and tries to get his kids to do both.

Alona Nadler is AWS Glue Head of Product and is responsible for AWS Glue Service. She has a long history of working in the enterprise software and data services spaces. When not working, Alona enjoys traveling and playing tennis.

Visualize data quality scores and metrics generated by AWS Glue Data Quality

Post Syndicated from Zack Zhou original https://aws.amazon.com/blogs/big-data/visualize-data-quality-scores-and-metrics-generated-by-aws-glue-data-quality/

AWS Glue Data Quality allows you to measure and monitor the quality of data in your data repositories. It’s important for business users to be able to see quality scores and metrics to make confident business decisions and debug data quality issues. AWS Glue Data Quality generates a substantial amount of operational runtime information during the evaluation of rulesets.

An operational scorecard is a mechanism used to evaluate and measure the quality of data processed and validated by AWS Glue Data Quality rulesets. It provides insights and metrics related to the performance and effectiveness of data quality processes.

In this post, we highlight the seamless integration of Amazon Athena and Amazon QuickSight, which enables the visualization of operational metrics for AWS Glue Data Quality rule evaluation in an efficient and effective manner.

This post is Part 5 of a five-post series to explain how to build dashboards to measure and monitor your data quality:

Solution overview

The solution allows you to build your AWS Glue Data Quality score and metrics dashboard using QuickSight in an easy and straightforward manner. The following architecture diagram shows an overview of the complete pipeline.

These are six main steps in the data pipeline:

  1. Amazon EventBridge triggers an AWS Lambda function when the event pattern for AWS Glue Data Quality matches the defined rule. (Refer to Set up alerts and orchestrate data quality rules with AWS Glue Data Quality)
  2. The Lambda function writes the AWS Glue Data Quality result to an Amazon Simple Storage Service (Amazon S3) bucket.
  3. An AWS Glue crawler crawls the results.
  4. The crawler builds a Data Catalog, so the data can be queried using Athena.
  5. We can analyze the data quality score and metrics using Athena SQL queries.
  6. We can query and submit the Athena data to QuickSight to create visuals for the dashboard.

In the following sections, we discuss these steps in more detail.

Prerequisites

To follow along with this post, complete the following prerequisites:

  1. Have an AWS Identity and Access Management (IAM) role with permissions to extract data from an S3 bucket and write to the AWS Glue Data Catalog.
  2. Similarly, have a Lambda function execution role with access to AWS Glue and  S3 buckets.
  3. Set up the Athena query result location. For more information, refer to Working with Query Results, Output Files, and Query History.
  4. Set up QuickSight permissions and enable Athena table and S3 bucket access.

Set up and deploy the Lambda pipeline

To test the solution, we can use the following AWS CloudFormation template. The CloudFormation template creates the EventBridge rule, Lambda function, and S3 bucket to store the data quality results.

If you deployed the CloudFormation template in the previous post, you don’t need to deploy it again in this step.

The following screenshot shows a line of code in which the Lambda function writes the results from AWS Glue Data Quality to an S3 bucket. As depicted, the data will be stored in JSON format and organized according to the time horizon, facilitating convenient access and analysis of the data over time.

Set up the AWS Glue Data Catalog using a crawler

Complete the following steps to create an AWS Glue crawler and set up the Data Catalog:

  1. On the AWS Glue console, choose Crawlers in the navigation pane.
  2. Choose Create crawler.
  3. For Name, enter data-quality-result-crawler, then choose Next.
  4. Under Data sources, choose Add a data source.
  5. For Data source, choose S3.
  6. For S3 path, enter the S3 path to your data source. (s3://<AWS CloudFormation outputs key:DataQualityS3BucketNameOutputs>/gluedataqualitylogs/). Refer to Set up alerts and orchestrate data quality rules with AWS Glue Data Quality for details.
  7. Choose Add an S3 data source and choose Next.
  8. For Existing IAM role, choose your IAM role (GlueDataQualityLaunchBlogDemoRole-xxxx). Refer to Set up alerts and orchestrate data quality rules with AWS Glue Data Quality for details. Then choose Next.
  9. For Target database, choose Add database.
  10. For Database name, enter data-quality-result-database, then choose Create.
  11. For Table name prefix, enter dq_, then choose Next.
  12. Choose Create crawler.
  13. On the Crawlers page, select data-quality-result-crawler and choose Run.

When the crawler is complete, you can see the AWS Glue Data Catalog table definition.

After you create the table definition on the AWS Glue Data Catalog, you can use Athena to query the Data Catalog table.

Query the Data Catalog table using Athena

Athena is an interactive query service that makes it easy to analyze data in Amazon S3 and the AWS Glue Data Catalog using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run on datasets at petabyte scale.

The purpose of this step is to understand our data quality statistics at the table level as well as at the ruleset level. Athena provides simple queries to assist you with this task. Use the queries in this section to analyze your data quality metrics and create an Athena view to use to build a QuickSight dashboard in the next step.

Query 1

The following is a simple SELECT query on the Data Catalog table:

SELECT * FROM "data-quality-result-database"."dq_gluedataqualitylogs" limit 10;

The following screenshot shows the output.

Before we run the second query, let’s check the schema for the table dq_gluedataqualitylogs.

The following screenshot shows the output.

The table shows that one of the columns, resultrun, is the array data type. In order to work with this column in QuickSight, we need to perform an additional step to transform it into multiple strings. This is necessary because QuickSight doesn’t support the array data type.

Query 2

Use the following query to review the data in the resultrun column:

SELECT resultrun FROM "data-quality-result-database"."dq_gluedataqualitylogs" limit 10;

The following screenshot shows the output.

Query 3

The following query flattens an array into multiple rows using CROSS JOIN in conjunction with the unnest operator and creates a view on the selected columns:

CREATE OR REPLACE VIEW data_quality_result_view AS
SELECT "databasename","tablename", 
"ruleset_name","runid", "resultid", 
"state", "numrulessucceeded", 
"numrulesfailed", "numrulesskipped", 
"score", "year","month",
"day",runs.name,runs.result,
runs.evaluationmessage,runs.Description
FROM "dq_gluedataqualitylogs"
CROSS JOIN unnest(resultrun) AS t(runs)

The following screenshot shows the output.

Verify the columns that were created using the unnest operator.

The following screenshot shows the output.

Query 4

Verify the Athena view created in the previous query:

SELECT * FROM data_quality_result_view LIMIT 10

The following screenshot shows the output.

Visualize the data with QuickSight

Now that you can query your data in Athena, you can use QuickSight to visualize the results. Complete the following steps:

  1. Sign in to the QuickSight console.
  2. In the upper right corner of the console, choose Admin/username, then choose Manage QuickSight.
  3. Choose Security and permissions.
  4. Under QuickSight access to AWS services, choose Add or remove.
  5. Choose Amazon Athena, then choose Next.
  6. Give QuickSight access to the S3 bucket where your data quality result is stored.

Create your datasets

Before you can analyze and visualize the data in QuickSight, you must create datasets for your Athena view (data_quality_result_view). Complete the following steps:

  1. On the Datasets page, choose New dataset, then choose Athena.
  2. Choose the AWS Glue database that you created earlier.
  3. Select Import to SPICE (alternatively, you can select Directly query your data).
  4. Choose Visualize.

Build your dashboard

Create your analysis with one donut chart, one pivot table, one vertical stacked bar, and one funnel chart that use the different fields in the dataset. QuickSight offers a wide range of charts and visuals to help you create your dashboard. For more information, refer to Visual types in Amazon QuickSight.

Clean up

To avoid incurring future charges, delete the resources created in this post.

Conclusion

In this post, we provide insights into running Athena queries and building customized dashboards in QuickSight to understand data quality metrics. This gives you a great starting point for using this solution with your datasets and applying business rules to build a complete data quality framework to monitor issues within your datasets.

To dive into the AWS Glue Data Quality APIs, refer to Data Quality API. To learn more about AWS Glue Data Quality, see the AWS Glue Data Quality Developer Guide. To learn more about QuickSight dashboards, refer to the Amazon QuickSight Developer Guide.


About the authors

Zack Zhou is a Software Development Engineer on the AWS Glue team.

Deenbandhu Prasad is a Senior Analytics Specialist at AWS, specializing in big data services. He is passionate about helping customers build modern data architecture on the AWS Cloud. He has helped customers of all sizes implement data management, data warehouse, and data lake solutions.

Avik Bhattacharjee is a Senior Partner Solutions Architect at AWS. He works with customers to build IT strategy, making digital transformation through the cloud more accessible, focusing on big data and analytics and AI/ML.

Amit Kumar Panda is a Data Architect at AWS Professional Services who is passionate about helping customers build scalable data analytics solutions to enable making critical business decisions.

Set up alerts and orchestrate data quality rules with AWS Glue Data Quality

Post Syndicated from Avik Bhattacharjee original https://aws.amazon.com/blogs/big-data/set-up-alerts-and-orchestrate-data-quality-rules-with-aws-glue-data-quality/

Alerts and notifications play a crucial role in maintaining data quality because they facilitate prompt and efficient responses to any data quality issues that may arise within a dataset. By establishing and configuring alerts and notifications, you can actively monitor data quality and receive timely alerts when data quality issues are identified. This proactive approach helps mitigate the risk of making decisions based on inaccurate information. Furthermore, it allows for necessary actions to be taken, such as rectifying errors in the data source, refining data transformation processes, and updating data quality rules.

We are excited to announce that AWS Glue Data Quality is now generally available, offering built-in integration with Amazon EventBridge and AWS Step Functions to streamline event-driven data quality management. You can access this feature today in the available Regions. It simplifies your experience of monitoring and evaluating the quality of your data.

This post is Part 4 of a five-post series to explain how to set up alerts and orchestrate data quality rules with AWS Glue Data Quality:

Solution overview

In this post, we provide a comprehensive guide on enabling alerts and notifications using Amazon Simple Notification Service (Amazon SNS) We walk you through the step-by-step process of using EventBridge to establish rules that activate an AWS Lambda function when the data quality outcome aligns with the designated pattern. The Lambda function is responsible for converting the data quality metrics and dispatching them to the designated email addresses via Amazon SNS.

To expedite the implementation of the solution, we have prepared an AWS CloudFormation template for your convenience. AWS CloudFormation serves as a powerful management tool, enabling you to define and provision all necessary infrastructure resources within AWS using a unified and standardized language.

The solution aims to automate data quality evaluation for AWS Glue Data Catalog tables (data quality at rest) and allows you to configure email notifications when the AWS Glue Data Quality results become available.

The following architecture diagram provides an overview of the complete pipeline.

The data pipeline consists of the following key steps:

  1. The first step involves AWS Glue Data Quality evaluations that are automated using Step Functions. The workflow is designed to start the evaluations based on the rulesets defined on the dataset (or table). The workflow accepts input parameters provided by the user.
  2. An EventBridge rule receives an event notification from the AWS Glue Data Quality evaluations including the results. The rule evaluates the event payload based on the predefined rule and then triggers a Lambda function for notification.
  3. The Lambda function sends an SNS notification containing data quality statistics to the designated email address. Additionally, the function writes the customized result to the specified Amazon Simple Storage Service (Amazon S3) bucket, ensuring its persistence and accessibility for further analysis or processing.

The following sections discuss the setup for these steps in more detail.

Deploy resources with AWS CloudFormation

We create several resources with AWS CloudFormation, including a Lambda function, EventBridge rule, Step Functions state machine, and AWS Identity and Access Management (IAM) role. Complete the following steps:

  1. To launch the CloudFormation stack, choose Launch Stack:
  2. Provide your email address for EmailAddressAlertNotification, which will be registered as the target recipient for data quality notifications.
  3. Leave the other parameters at their default values and create the stack.

The stack takes about 4 minutes to complete.

  1. Record the outputs listed on the Outputs tab on the AWS CloudFormation console.
  2. Navigate to the S3 bucket created by the stack (DataQualityS3BucketNameStaging) and upload the file yellow_tripdata_2022-01.parquet file.
  3. Check your email for a message with the subject “AWS Notification – Subscription Confirmation” and confirm your subscription.

Now that the CloudFormation stack is complete, let’s update the Lambda function code before running the AWS Glue Data Quality pipeline using Step Functions.

Update the Lambda function

This section explains the steps to update the Lambda function. We modify the ARN of Amazon SNS and the S3 output bucket name based on the resources created by AWS CloudFormation.

Complete the following steps:

  1. On the Lambda console, choose Functions in the navigation pane.
  2. Choose the function GlueDataQualityBlogAlertLambda-xxxx (created by the CloudFormation template in the previous step).
  3. Modify the values for sns_topic_arn and s3bucket with the corresponding values from the CloudFormation stack outputs for SNSTopicNameAlertNotification and DataQualityS3BucketNameOutputs, respectively.
  4. On the File menu, choose Save.
  5. Choose Deploy.

Now that we’ve updated the Lambda function, let’s check the EventBridge rule created by the CloudFormation template.

Review and analyze the EventBridge rule

This section explains the significance of the EventBridge rule and how rules use event patterns to select events and send them to specific targets. In this section, we create a rule with an event pattern set as Data Quality Evaluations Results Available and configure the target as a Lambda function.

  1. On the EventBridge console, choose Rules in the navigation pane.
  2. Choose the rule GlueDataQualityBlogEventBridge-xxxx.

On the Event pattern tab, we can review the source event pattern. Event patterns are based on the structure and content of the events generated by various AWS services or custom applications.

  1. We set the source as aws-glue-dataquality with the event pattern detail type Data Quality Evaluations Results Available.

On the Targets tab, you can review the specific actions or services that will be triggered when an event matches a specified pattern.

  1. Here, we configure EventBridge to invoke a specific Lambda function when an event matches the defined pattern.

This allows you to run serverless functions in response to events.

Now that you understand the EventBridge rule, let’s review the AWS Glue Data Quality pipeline created by Step Functions.

Set up and deploy the Step Functions state machine

AWS CloudFormation created the StateMachineGlueDataQualityCustomBlog-xxxx state machine to orchestrate the evaluation of existing AWS Glue Data Quality rules, creation of custom rules if needed, and subsequent evaluation of the ruleset. Complete the following steps to configure and run the state machine:

  1. On the Step Functions console, choose State machines in the navigation pane.
  2. Open the state machine StateMachineGlueDataQualityCustomBlog-xxxx.
  3. Choose Edit.
  4. Modify row 80 with the IAM role ARN starting with GlueDataQualityBlogStepsFunctionRole-xxxx and choose Save.

Step Functions needs certain permissions (least priviledge) to run the state machine and evaluate the AWS Glue Data Quality ruleset.

  1. Choose Start execution.
  2. Provide the following input:
    {
        "ruleset_name": "<AWS CloudFormation outputs key:GlueDataQualityCustomRulesetName>",
      	"database_name" : "<AWS CloudFormation outputs key:DataQualityDatabase>" ,
      	"table_name" : " <AWS CloudFormation outputs key:DataQualityTable>" ,
      	"dq_output_location" : "s3://<AWS CloudFormation outputs key:DataQualityS3BucketNameOutputs>/defaultlogs"
    }

This step assumes the existence of the ruleset and runs the workflow as depicted in the following screenshot. It runs the data quality ruleset evaluation and writes results to the S3 bucket.

If it doesn’t find the ruleset name in the data quality rules, it will create a custom ruleset for you and perform the data quality ruleset evaluation. AWS Step Functions is creating the custom ruleset. Below is a code snippet from the state machine code.


State machine results and run options

The Step Functions state machine has run AWS the Glue Data Quality evaluation. Now EventBridge matches the pattern Data Quality Evaluations Results Available and triggers the Lambda function. The Lambda function writes customized AWS Glue Data Quality metrics results to the S3 bucket and sends an email notification via Amazon SNS.

The following sample email provides operational metrics for the AWS Glue Data Quality ruleset evaluation. It provides details about the ruleset name, the number of rules passed or failed, and the score. This helps you visualize the results of each rule along with the evaluation message if a rule fails.

You have the flexibility to choose between two run modes for the Step Functions workflow:

  • The first option is on-demand mode, where you manually trigger the Step Functions workflow whenever you want to initiate the AWS Glue Data Quality evaluation.
  • Alternatively, you can schedule the entire Step Functions workflow using EventBridge. With EventBridge, you can define a schedule or specific triggers to automatically initiate the workflow at predetermined intervals or in response to specific events. This automated approach reduces the need for manual intervention and streamlines the data quality evaluation process. For more details, refer to Schedule a Serverless Workflow.

Clean up

To avoid incurring future charges and to clean up unused roles and policies, delete the resources you created:

  1. On the AWS CloudFormation console, choose Stacks in the navigation pane.
  2. Select your stack and delete it.

If you’re continuing to Part 5 in this series, you can skip this step.

Conclusion

In this post, we discussed three key steps that organizations can take to optimize data quality and reliability on AWS:

  • Create a CloudFormation template to ensure consistency and reproducibility in deploying AWS resources.
  • Integrate AWS Glue Data Quality ruleset evaluation and Lambda to automatically evaluate data quality and receive event-driven alerts and email notifications via Amazon SNS. This significantly enhances the accuracy and reliability of your data.
  • Use Step Functions to orchestrate AWS Glue Data Quality ruleset actions. You can create and evaluate custom and recommended rulesets, optimizing data quality and accuracy.

These steps form a comprehensive approach to data quality and reliability on AWS, helping organizations maintain high standards and achieve their goals.

To dive into the AWS Glue Data Quality APIs, refer to Data Quality APIs. To learn more about AWS Glue Data Quality, check out the AWS Glue Data Quality Developer Guide.

If you require any assistance in constructing this pipeline within the AWS Lake Formation environment or if you have any inquiries regarding this post, please inform us in the comments section or initiate a new thread on the Lake Formation forum.


About the authors

Avik Bhattacharjee is a Senior Partner Solution Architect at AWS. He works with customers to build IT strategy, making digital transformation through the cloud more accessible, focusing on big data and analytics and AI/ML.

Amit Kumar Panda is a Data Architect at AWS Professional Services who is passionate about helping customers build scalable data analytics solutions to enable making critical business decisions.

Neel Patel is a software engineer working within GlueML. He has contributed to the AWS Glue Data Quality feature and hopes it will expand the repertoire for all AWS CloudFormation users along with displaying the power and usability of AWS Glue as a whole.

Edward Cho is a Software Development Engineer at AWS Glue. He has contributed to the AWS Glue Data Quality feature as well as the underlying open-source project Deequ.

Set up advanced rules to validate quality of multiple datasets with AWS Glue Data Quality

Post Syndicated from Navnit Shukla original https://aws.amazon.com/blogs/big-data/set-up-advanced-rules-to-validate-quality-of-multiple-datasets-with-aws-glue-data-quality/

Data is the lifeblood of modern businesses. In today’s data-driven world, companies rely on data to make informed decisions, gain a competitive edge, and provide exceptional customer experiences. However, not all data is created equal. Poor-quality data can lead to incorrect insights, bad decisions, and lost opportunities.

AWS Glue Data Quality measures and monitors the quality of your dataset. It supports both data quality at rest and data quality in AWS Glue extract, transform, and load (ETL) pipelines. Data quality at rest focuses on validating the data stored in data lakes, databases, or data warehouses. It ensures that the data meets specific quality standards before it is consumed. Data quality in ETL pipelines, on the other hand, ensures the quality of data as it moves through the ETL process. It helps identify data quality issues during the ETL pipeline, allowing for early detection and correction of problems and prevents the failure of the data pipeline because of data quality issues.

This is Part 3 of a five-post series on AWS Glue Data Quality. In this post, we demonstrate the advanced data quality checks that you can typically perform when bringing data from a database to an Amazon Simple Storage Service (Amazon S3) data lake. Check out the other posts in this series:

Use case overview

Let’s consider an example use case where we have a database named classicmodels that contains retail data for a car dealership. This example database includes sample data for various entities, such as Customers, Products, ProductLines, Orders, OrderDetails, Payments, Employees, and Offices. You can find more details about this example database in MySQL Sample Database.

In this scenario, we assume the role of a data engineer who is responsible for building a data pipeline. The primary objective is to extract data from a relational database, specifically an Amazon RDS for MySQL database, and store it in Amazon S3, which serves as a data lake. After the data is loaded into the data lake, the data engineer is also responsible for performing data quality checks to ensure that the data in the data lake maintains its quality. To achieve this, the data engineer uses the newly launched AWS Glue Data Quality evaluation feature.

The following diagram illustrates the entity relationship model that describes the relationships between different tables. In this post, we use the employees, customers, and products table.

Solution overview

This solution focuses on transferring data from an RDS for MySQL database to Amazon S3 and performing data quality checks using the AWS Glue ETL pipeline and AWS Glue Data Catalog. The workflow involves the following steps:

  1. Data is extracted from the RDS for MySQL database using AWS Glue ETL.
  2. The extracted data is stored in Amazon S3, which serves as the data lake.
  3. The Data Catalog and AWS Glue ETL pipeline are utilized to validate the successful completion of data ingestion by performing data quality checks on the data stored in Amazon S3.

The following diagram illustrates the solution architecture.

To implement the solution, we complete the following steps:

  1. Set up resources with AWS CloudFormation.
  2. Establish a connection to the RDS for MySQL instance from AWS Cloud9.
  3. Run an AWS Glue crawler on the RDS for MySQL database.
  4. Validate the Data Catalog.
  5. Run an AWS Glue ETL job to bring data from Amazon RDS for MySQL to Amazon S3.
  6. Evaluate the advanced data quality rules in the ETL job.
  7. Evaluate the advanced data quality rules in the Data Catalog.

Set up resources with AWS CloudFormation

This post includes a CloudFormation template for a quick setup. You can review and customize it to suit your needs. The template generates the following resources:

  • An RDS for MySQL database instance (source)
  • An S3 bucket for the data lake (destination)
  • An AWS Glue ETL job to bring data from source to destination
  • An AWS Glue crawler to crawl the RDS for MySQL databases and create a centralized Data Catalog
  • AWS Identity and Access Management (IAM) users and policies
  • An AWS Cloud9 environment to connect to the RDS DB instance and create a sample dataset
  • An Amazon VPC, public subnet, two private subnets, internet gateway, NAT gateway, and route tables

To launch the CloudFormation stack, complete the following steps:

  1. Sign in to the AWS CloudFormation console.
  2. Choose Launch Stack:
    BDB-2063-launch-cloudformation-stack
  3. Choose Next.
  4. For DatabaseUserPassword, enter your preferred password.
  5. Choose Next.
  6. Scroll to the end and choose Next.
  7. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names and choose Submit.

This stack can take around 10 minutes to complete, after which you can view the deployed stack on the AWS CloudFormation console.

Establish a connection to the RDS for MySQL instance from AWS Cloud9

To connect to the RDS for MySQL instance, complete the following steps:

  1. On the AWS Cloud9 console, choose Open under Cloud9 IDE for your environment.
  2. Run the following command to the AWS Cloud9 terminal. Provide your values for the MySQL endpoint (located on the CloudFormation stack’s Outputs tab), database user name, and database user password:
    $ mysql --host=<MySQLEndpoint> --user=<DatabaseUserName> password=<password>

  3. Download the SQL file.
  4. On the File menu, choose Upload from Local Files and upload the file to AWS Cloud9.
  5. Run the following SQL commands within the downloaded file:
    MySQL [(none)]> source mysqlsampledatabase.sql

  6. Retrieve a list of tables using the following SQL statement and make sure that eight tables are loaded successfully:
    use classicmodels;
    show tables;

Run an AWS Glue crawler on the RDS for MySQL database

To run your crawler, complete the following steps:

  1. On the AWS Glue console, choose Crawlers under Data Catalog in the navigation pane.
  2. Locate and run the crawler dq-rds-crawler.

The crawler will take a few minutes to crawl all the tables from the classicmodels database.

Validate the AWS Glue Data Catalog

To validate the Data Catalog when the crawler is complete, complete the following steps:

  1. On the AWS Glue console, choose Databases under Data Catalog in the navigation pane.
  2. Choose the mysql_private_classicmodels database.

You will able to see all the RDS tables available under mysql_private_classicmodels.

Run an AWS Glue ETL job to bring data from Amazon RDS for MySQL to Amazon S3

To run your ETL job, complete the following steps:

  1. On the AWS Glue console, choose Visual ETL under ETL jobs in the navigation pane.
  2. Select dq-rds-to-s3 from the job list and choose Run job.

When the job is complete, you will able to see three new tables under mysql_s3_db. It may take a few minutes to complete.

Now let’s dive into evaluating the data quality rules.

Evaluate the advanced data quality rules in the ETL job

In this section, we evaluate the results of different data quality rules.

ReferentialIntegrity

Let’s start with referential integrity. The ReferentialIntegrity data quality ruleset is currently supported in ETL jobs. This feature ensures that the relationships between tables in a database are maintained. It checks if the foreign key relationships between tables are valid and consistent, helping to identify any referential integrity violations.

  1. On the AWS Glue console, choose Visual ETL under ETL jobs in the navigation pane.
  2. In AWS Glue Studio, select Visual with a blank canvas.
  3. Provide a name for your job; for example, RDS ReferentialIntegrity.
  4. Choose the plus sign in the AWS Glue Studio canvas and on the Data tab, choose AWS Glue Data Catalog.
  5. For Name, enter a name for your data source; for example, employees.
  6. For Database, choose mysql_private_classicmodels.
  7. For Table, choose mysql_classicmodels_employees.
  8. Choose the plus sign in the AWS Glue Studio canvas and on the Data tab, choose AWS Glue Data Catalog.
  9. For Name, enter a name for your data source; for example, customers.
  10. For Database, choose mysql_private_classicmodels.
  11. For Table, choose mysql_classicmodels_employees.
  12. Choose the plus sign in the AWS Glue Studio canvas and on the Transform tab, choose Evaluate Data Quality.
  13. For Node parents, choose employees and customers.
  14. For Aliases for referenced data source, select Primary source for employees and for customers, enter the alias customers.

All other datasets are used as references to ensure that the primary dataset has good-quality data.

  1. Search for ReferentialIntegrity under Rule types and choose the plus sign to add an example ReferentialIntegrity rule.
  2. Replace the rule with the following code and keep the remaining options as default:
    Rules = [
        ReferentialIntegrity "employeenumber" "customers.salesRepEmployeeNumber" between 0.6 to 0.7
    ]

  3. Under Data quality action, select Publish results to Amazon CloudWatch and select Fail job without loading target data.
  4. On the Job details tab, choose GlueServiceRole-for-gluedq-blog for IAM role and keep the remaining options as default.
  5. Choose Run and wait for the job to complete.

It will take a few minutes to complete.

  1. When the job is complete, navigate to the Data quality tab and locate the Data quality results section.

You can confirm if the job completed successfully and which data quality rules it passed. In this example, it indicates that 60–70% of EmployeeNumber from the employees table are present in the customers table.

You can identify which records failed the referential integrity using AWS Glue Studio. To learn more, refer to Getting started with AWS Glue Data Quality for ETL Pipelines.

Similarly, if you are checking if all the EmployeeNumber from the employees table are present in the customers table, you can pass the following rule:

Rules = [
    ReferentialIntegrity "employeenumber" "customers.salesRepEmployeeNumber" = 1
]

DatasetMatch

DatasetMatch compares two datasets to identify differences and similarities. You can use it to detect changes between datasets or to find duplicates, missing values, or inconsistencies across datasets.

  1. On the AWS Glue console, choose Visual ETL under ETL jobs in the navigation pane.
  2. In AWS Glue Studio, select Visual with a blank canvas.
  3. Provide a name for your job; for example, RDS DatasetMatch.
  4. Choose the plus sign in the AWS Glue Studio canvas and on the Data tab, choose AWS Glue Data Catalog.
  5. For Name, enter a name for your data source; for example, rds_employees_primary.
  6. For Database, choose mysql_private_classicmodels.
  7. For Table, choose mysql_classicmodels_employees.
  8. Choose the plus sign in the AWS Glue Studio canvas and on the Data tab, choose AWS Glue Data Catalog.
  9. For Name, enter a name for your data source; for example, s3_employees_reference.
  10. For Database, choose mysql_s3_db.
  11. For Table, choose s3_employees.
  12. Choose the plus sign in the AWS Glue Studio canvas and on the Transform tab, choose Evaluate Data Quality.
  13. For Node parents, choose employees and customers.
  14. For Aliases for referenced data source, select Primary source for rds_employees_primary and for s3_employees_reference, enter the alias reference.
  15. Replace the default example rules with the following code and keep the remaining options as default:
    Rules = [
        DatasetMatch "reference" "employeenumber,employeenumber" = 1
    ]

  16. On the Job details tab, choose GlueServiceRole-for-gluedq-blog for IAM role and keep the remaining options as default.
  17. Choose Run and wait for the job to complete.
  18. When the job is complete, navigate to the Data quality tab and locate the Data quality results section.

In this example, it indicates both datasets are identical.

AggregateMatch

AggregateMatch verifies the accuracy of aggregated data. It compares the aggregated values in a dataset against the expected results to identify any discrepancies, such as incorrect sums, averages, counts, or other aggregate calculations. This is a performant option to evaluate if two datasets match at an aggregate level. For this rule, we clone the previous job we created for DatasetMatch.

  1. On the AWS Glue console, choose Visual ETL under ETL jobs in the navigation pane.
  2. Select RDS DatasetMatch and on the Actions menu, choose Clone job.
  3. Change the job name to DQ AggregateMatch.
  4. Change the dataset rds_employees_primary to rds_products_primary and the table to mysql_classicmodels_products.
  5. Change the dataset s3_orders_reference to s3_products_reference and the table to s3_products.
  6. Choose Evaluate Data Quality, and under Node parents, choose rds_products_primary and s3_products_reference.
  7. Replace the rules with the following code:
    AggregateMatch "avg(MSRP)" "avg(reference.MSRP)" = 1

  8. Choose Run and wait for the job to complete.
  9. When the job is complete, navigate to the Data quality tab and locate the Data quality results section.

The results indicate that the avg(msrp) on both datasets is the same.

RowCountMatch

RowCountMatch checks the number of rows in a dataset and compares it to an expected count. It helps identify missing or extra rows in a dataset, ensuring data completeness. For this rule, we edit the job we created earlier for AggregateMatch.

  1. On the AWS Glue console, choose Visual ETL under ETL jobs in the navigation pane.
  2. Select RDS AggregateMatch and on the Actions menu, choose Edit job.
  3. Choose Evaluate Data Quality and choose the plus sign next to RowCountMatch.
  4. Keep the default data quality rules and choose Save:
    RowCountMatch "reference" = 1.0

  5. Choose Run and wait for the job to complete.
  6. When the job is complete, navigate to the Data quality tab and locate the Data quality results section.

It shows that the DQ RowCountMatch rule failed, indicating a mismatch between the row count of the source RDS table and the target S3 table. Further investigation reveals that the ETL job ran four times for the Products table, and the row counts didn’t match.

SchemaMatch

SchemaMatch validates the schema of two datasets matches. It checks if the actual data types match the expected data types and flags any inconsistencies, such as a numeric column containing non-numeric values. For this rule, we edit the job we used for AggregateMatch.

  1. On the AWS Glue console, choose Visual ETL under ETL jobs in the navigation pane.
  2. Select RDS AggregateMatch and on the Actions menu, choose Edit job.
  3. Choose Evaluate Data Quality and choose the plus sign next to RowCountMatch.
  4. Update the default rules with the following code and save the job:
    SchemaMatch "reference" = 1.0

  5. Choose Run and wait for the job to complete.
  6. When the job is complete, navigate to the Data quality tab and locate the Data quality results section.

It should show a successful completion with a Rule passed status, indicating that the schemas of both datasets are identical.

Evaluate the advanced data quality rules in the Data Catalog

The AWS Glue Data Catalog also supports advanced data quality rules. For this post, we show one example of an aggregate match between Amazon S3 and Amazon RDS.

  1. On the AWS Glue console, choose Databases in the navigation pane.
  2. Choose the mysql_private_classicmodels database to view the three tables created under it.
  3. Choose the mysql_classicmodels_products table.
  4. On the Data quality tab, choose Create data quality rules.
  5. Search for AggregateMatch and choose the plus sign to view the default example rule.
  6. Add the following rules:
    Rules = [
        AggregateMatch "avg(msrp)" "avg(mysql_s3_db.s3_products.msrp)" >= 0.9,
        ReferentialIntegrity "productname,productcode" "mysql_s3_db.s3_products.{productname,productcode}" = 1
        ]

reference is the alias of the secondary dataset defined in the AWS Glue ETL job. For the Data Catalog, you can use <database_name>.<table_name>.<column_name> to reference secondary datasets.

  1. Choose Save ruleset and provide the name production_catalog_dq_check.
  2. Choose GlueServiceRole-for-gluedq-blog for IAM role and keep the remaining options as default.
  3. Choose Run and wait for the data quality check to complete.

When the job is complete, you can confirm that both data quality checks passed.

With these advanced data quality features of AWS Glue Data Quality, you can enhance the reliability, accuracy, and consistency of your data, leading to better insights and decision-making.

Clean up

To clean up your resources, complete the following steps:

  1. Delete the AWS Glue job.
  2. Delete the CloudFormation stack.

Conclusion

Data quality refers to the accuracy, completeness, consistency, timeliness, and validity of the information being collected, processed, and analyzed. High-quality data is essential for businesses to make informed decisions, gain valuable insights, and maintain their competitive advantage. As data complexity increases, advanced rules are critical to handle complex data quality challenges. The rules we demonstrated in this post can help you manage the quality of data that lives in disparate data sources, providing you the capabilities to reconcile them. Try them out and provide your feedback on what other use cases you need to solve!


About the authors

Navnit Shukla is AWS Specialist Solutions Architect in Analytics. He is passionate about helping customers uncover insights from their data. He builds solutions to help organizations make data-driven decisions.

Rahul Sharma is a Software Development Engineer at AWS Glue. He focuses on building distributed systems to support features in AWS Glue. He has a passion for helping customers build data management solutions on the AWS Cloud.

Edward Cho is a Software Development Engineer at AWS Glue. He has contributed to the AWS Glue Data Quality feature as well as the underlying open-source project Deequ.

Shriya Vanvari is a Software Developer Engineer in AWS Glue. She is passionate about learning how to build efficient and scalable systems to provide better experience for customers. Outside of work, she enjoys reading and chasing sunsets.

Getting started with AWS Glue Data Quality from the AWS Glue Data Catalog

Post Syndicated from Stuti Deshpande original https://aws.amazon.com/blogs/big-data/getting-started-with-aws-glue-data-quality-from-the-aws-glue-data-catalog/

AWS Glue is a serverless data integration service that makes it simple to discover, prepare, and combine data for analytics, machine learning (ML), and application development. You can use AWS Glue to create, run, and monitor data integration and ETL (extract, transform, and load) pipelines and catalog your assets across multiple data stores.

Hundreds of thousands of customers use data lakes for analytics and ML to make data-driven business decisions. Data consumers lose trust in data if it isn’t accurate and recent, making data quality essential for undertaking optimal and correct decisions.

Evaluation of the accuracy and freshness of data is a common task for engineers. Currently, various tools are available to evaluate data quality. However, these tools often require manual processes of data discovery and expertise in data engineering and coding.

AWS Glue Data Quality is a new feature of AWS Glue that measures and monitors the data quality of Amazon Simple Storage Service (Amazon S3)-based data lakes, data warehouses, and other data repositories. AWS Glue Data Quality can be accessed in the AWS Glue Data Catalog and in AWS Glue ETL jobs.

This is Part 1 of a five-part series of posts to explain how AWS Glue Data Quality works. Check out the next posts in the series:

In this post, we explore using the AWS Glue Data Quality feature by generating data quality recommendations and running data quality evaluations on your table in the Data Catalog. Then we demonstrate how to analyze your AWS Glue Data Quality run results through Amazon Athena.

Solution overview

We guide you through the following steps:

  1. Provision resources with AWS CloudFormation.
  2. Explore the generated recommendation rulesets and define rulesets to evaluate your table in the Data Catalog.
  3. Review the AWS Glue Data Quality recommendations.
  4. Analyze your AWS Glue Data Quality evaluation results with Athena.
  5. Operationalize the solution by setting up alerts and notifications using integration with Amazon EventBridge and Amazon Simple Notification Service (Amazon SNS).

For this post, we use the NYC Taxi dataset yellow_tripdata_2022-01.parquet.

Set up resources with AWS CloudFormation

The provided CloudFormation template creates the following resources for you:

  • The AWS Identity and Access Management (IAM) role required to run AWS Glue Data Quality evaluations
  • An S3 bucket to store the NYC Taxi dataset
  • An S3 bucket to store and analyze the results of AWS Glue Data Quality evaluations
  • An AWS Glue database and table created from the NYC Taxi dataset

Launch your CloudFormation stack

To create your resources for this use case, complete the following steps:

  1. Launch your CloudFormation stack in us-east-1:
    BDB-2063-launch-cloudformation-stack
  2. Under Parameters:
    • For Stack name, proceed with the default value myDQStack.
    • For DataQualityDatabase, proceed with the default value data_quality_catalog.
    • For DataQualityS3BucketName, provide a bucket name of your choice.
    • For DataQualityTable, proceed with the default value data_quality_tripdata_table

  3. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  4. Choose Create stack.

After the stack is successfully created, you can see all the resources created on the Resources tab.

  1. Navigate to the S3 bucket created by the stack and upload the yellow_tripdata_2022-01.parquet file.

Explore recommendation rulesets and define rulesets to evaluate your table

In this section, we generate data quality rule recommendations from AWS Glue Data Quality. We use these recommendations to run a data quality task against our dataset to obtain an analysis of our data.

Complete the following steps:

  1. On the AWS Glue console, under Data Catalog in the navigation pane, choose Tables.
  2. Choose the data_quality_tripdata_table table created via the CloudFormation stack.
  3. Choose the Data quality tab.

In this section, you will find a video to get you started with AWS Glue Data Quality. It also lists features, pricing, and documentation.

  1. Choose Create data quality rules.

This is the ruleset console. You will find a Request data quality rule recommendations banner at the top. AWS Glue will scan your data and automatically generate rule recommendations.

  1. Choose Recommend rules.
  2. For IAM role, choose the IAM role created as part of the Cloud Formation template (GlueDataQualityBlogRole).
  3. Optionally, you can filter your data before reading on column values. This feature is available for Amazon S3-based data sources.
  4. For Requested number of workers, allocate the number of workers to run the recommendation task. For this post, we use the default value of 5.
  5. For Task timeout, set the runtime for this task. For this post, we use the default of 120 minutes.
  6. Choose Recommend rules.

The recommendation task will start instantly, and you will observe the status on the top changes to Starting.

Next, we add some of these recommended rules into our ruleset.

  1. When you see the recommendation run as Completed, choose Insert rule recommendations to select the rules that are recommended for you.

Make sure to place the cursor inside the brackets Rules = [ ].

  1. Select the following rules:
    • ColumnValues “VendorID” <=6
    • Completeness “passenger_count”>=0.97
  2. Choose Add selected rules.

You can see that these rules were automatically added to the ruleset.

Understanding AWS Glue Data Quality recommendations

AWS Glue Data Quality recommendations are suggestions generated by the AWS Glue Data Quality service and are based on the shape of your data. These recommendations automatically take into account aspects like row counts, mean, standard deviation, and so on, and generate a set of rules for you to use as a starting point.

The dataset used here was the NYC Taxi dataset. Based on this, the columns in this dataset, and the values of those columns, AWS Glue Data Quality recommends a set of rules. In total, the recommendation service automatically took into consideration all the columns of the dataset, and recommended 55 rules.

Some of these rules are:

  • ColumnValues “VendorID” <=6 – The ColumnValues rule type checks the percentage of complete (non-null) values in a column against a given expression. This rule resolves to true if the rule type response is less than or equal to value 6.
  • Completeness “passenger_count”>=0.97 – The Completeness rule type checks the percentage of complete (non-null) values in a column against a given expression. In this case, the rule checks if more than 97% of the values in a column are complete.

In addition to adding auto-generated recommendation rules, we manually add some rules to the ruleset. AWS Glue Data Quality provides some out-of-the-box rule types to choose from. For this post, we manually add the IsComplete rule for VendorID.

  1. In the left pane, on the Rule types tab, search IsComplete rule type and choose the plus sign next to IsComplete to add this rule.
  2. For the value within the quotation marks, enter VendorID.

Conversely, you could navigate to the Schema tab and add IsComplete to VendorID.

  1. Choose Save ruleset.

Next, we add a CustomSQL rule by selecting the rule type CustomSql, that validates that there are no fares charged for a trip if there are no passengers. This is to identify if there are any fraudulent transactions for fare_amount > 0 where passenger_count = 0. The rule is:

CustomSql "select count(*) from primary where passenger_count=0 and fare_amount > 0" = 0

There are two ways to provide the table name:

  • Either you can use the keyword “primary” for the table under consideration
  • You can use the full path such as database_name.table_name
  1. On the Rule types tab, choose the plus sign next to CustomSQL and enter the SQL statement.

The final ruleset looks like the following screenshot.

  1. Choose Save ruleset.
  2. For Ruleset name, enter a name.
  3. Choose Save ruleset.
  4. On the ruleset page, choose Run.
  5. For IAM role¸ choose GlueDataQualityBlogRole.
  6. Select Publish run metrics to Amazon CloudWatch.
  7. For Data quality result location, enter the S3 bucket location for the data quality results which is already created for you as part of Cloud Formation template (for this post, data-quality-tripdata-results).
  8. For Run frequency, choose On demand.
  9. Expand Additional configurations.
  10. For Requested number of workers, enter 5.
  11. Leave the remaining fields as is and choose Run.

The status changes to Starting.

  1. When it’s complete, choose the ruleset and choose Run history.
  2. Choose the run ID to find more about the run details.

Under Data quality result, you will also observe the result shows as DQ passed or DQ failed.

In the Evaluation run details section, you will find all the details about the data quality task run and rules that passed or failed. You can either view these results by navigating to the S3 bucket or downloading the results. Observe that the data quality task failed because one of the rules failed.

For the first section, AWS Glue Data Quality suggested 51 rules, based on the column values and the data within our NYC Taxi dataset. We selected a few rules out of the 51 rules into a ruleset and ran an AWS Glue Data Quality evaluation task using our ruleset against our dataset. In our results, we see the status of each rule within the run details of the data quality task.

You can also utilize the AWS Glue Data Quality APIs to carry out these steps.

Analyze your AWS Glue Data Quality evaluation results with Athena

If you have multiple AWS Glue Data Quality evaluation results against a dataset, you might want to track the trends of the dataset’s quality over a period of time. To achieve this, we can export our AWS Glue Data Quality evaluation results to Amazon S3, and use Athena to run analytical queries against the exported results. You could further use the results in Amazon QuickSight to build dashboards to have a graphical representation of your data quality trends

In Part 3 of this series, we show the steps needed to start tracking data on your dataset’s quality.

For our data quality runs that we set up in the previous sections, we set the Data quality results location parameter to the bucket location specified by the CloudFormation stack. After each successful run, you should see a Parquet format file that contains a single JSONL file being exported to your selected S3 location, corresponding to that particular run.

Complete the following steps to analyze the data:

  1. Navigate to Amazon Athena and on the console, navigate to Query Editor.
  2. Run the following CREATE TABLE statement (replace the <my_table_name> with a relevant value of your choice and <GlueDataQualityResultsS3Bucket_from_cfn> with the S3 bucket name to store data-quality results; the bucket name will have trailing keyword results, for example <given-name-results>. For this post, it is data-quality-tripdata-results.
    CREATE EXTERNAL TABLE `<my_table_name>`(
    `catalogid` string,
    `databasename` string,
    `tablename` string,
    `dqrunid` string,
    `evaluationstartedon` timestamp,
    `evaluationcompletedon` timestamp,
    `rule` string,
    `outcome` string,
    `failurereason` string,
    `evaluatedmetrics` string)
    PARTITIONED BY (
    `year` string,
    `month` string,
    `day` string)
    ROW FORMAT SERDE
    'org.openx.data.jsonserde.JsonSerDe'
    WITH SERDEPROPERTIES (
    'paths'='catalogId,databaseName,dqRunId,evaluatedMetrics,evaluationCompletedOn,evaluationStartedOn,failureReason,outcome,rule,tableName')
    STORED AS INPUTFORMAT
    'org.apache.hadoop.mapred.TextInputFormat'
    OUTPUTFORMAT
    'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION
    's3://<GlueDataQualityResultsS3Bucket_from_cfn>/'
    TBLPROPERTIES (
    'classification'='json',
    'compressionType'='none',
    'typeOfData'='file')
    MSCK REPAIR TABLE `<my_table_name>`

After you create the table, you should be able to run queries to analyze your data quality results.

For example, consider the following query that shows the passed AWS Glue Data Quality evaluations against the table data_quality_tripdata_table within a certain time window. You can select the datetime values from the data quality results table (that you created above) <my_table_name> from the columns evaluationcompletedon to specify values for parse_datetime() within a certain duration in the following query:

SELECT * from `<my_table_name>`
WHERE "outcome" = 'Passed'
AND "tablename" = `data_quality_tripdata_table`
AND "evaluationcompletedon" between
parse_datetime('2023-05-12 00:00:00:000', 'yyyy-MM-dd HH:mm:ss:SSS') AND parse_datetime('2023-05-12 21:16:21:804', 'yyyy-MM-dd HH:mm:ss:SSS');

The output of the preceding query shows us details about all the runs with “outcome” = ‘Passed’ that ran against the NYC Taxi dataset table (“tablename” = ‘data_quality_tripdata_table’). The output also provides details about the rules passed and evaluated metrics.

As you can see, we are able to get detailed information about our AWS Glue Data Quality evaluations via the results uploaded to Amazon S3 and perform more detailed analysis.

Set up alerts and notifications using EventBridge and Amazon SNS

Alerts and notifications are important for data quality to enable timely and effective responses to data quality issues that arise in the dataset. By setting up alerts and notifications, you can proactively monitor the data quality and be alerted as soon as any data quality issues are detected. This reduces the risk of making decisions based on incorrect information.

AWS Glue Data Quality also offers integration with EventBridge for alerting and notification by triggering an AWS Lambda function that sends a customized SNS notification when the AWS Glue Data Quality ruleset evaluation is complete. Now you can receive event-driven alerts and email notifications via Amazon SNS. This integration significantly enhances the accuracy and reliability of data.

Clean up

To clean up your resources, complete the following steps:

  1. On the Athena console, delete the table created for data quality analysis.
  2. On the CloudWatch console, delete the alarms created.
  3. If you deployed the sample CloudFormation stack, delete the stack via the AWS CloudFormation console. You will need to empty the S3 bucket before you delete the bucket.
  4. If you enabled your AWS Glue Data Quality runs to output to Amazon S3, empty those buckets as well.

Conclusion

In this post, we talked about the ease and speed of incorporating data quality rules using AWS Glue Data Quality into your Data Catalog tables. We also talked about how to run recommendations and evaluate data quality against your tables. We then discussed analyzing the data quality results via Athena, and discussed integrations with EventBridge and Amazon SNS for alerts and notifications to get notified for data quality issues.

To dive into the AWS Glue Data Quality APIs, refer to Data Quality API documentation. To learn more about AWS Glue Data Quality, check out AWS Glue Data Quality.


About the authors

Stuti Deshpande is an Analytics Specialist Solutions Architect at AWS. She works with customers around the globe, providing them strategic and architectural guidance on implementing analytics solutions using AWS. She has extensive experience in Big Data, ETL, and Analytics. In her free time, Stuti likes to travel, learn new dance forms, and enjoy quality time with family and friends.

Aniket Jiddigoudar is a Big Data Architect on the AWS Glue team. He works with customers to help improve their big data workloads. In his spare time, he enjoys trying out new food, playing video games, and kickboxing.

Joseph Barlan is a Frontend Engineer at AWS Glue. He has over 5 years of experience helping teams build reusable UI components and is passionate about frontend design systems. In his spare time, he enjoys pencil drawing and binge watching tv shows.

Jesus Max Hernandez is a Software Development Engineer at AWS Glue. He joined the team in August after graduating from The University of Texas at El Paso. Outside of work, you can find him practicing guitar or playing softball in Central Park.

Divya Gaitonde

is a UX designer at AWS Glue. She has over 8 years of experience driving impact through data-driven products and seamless experiences. Outside of work, you can find her catching up on reading or people watching at a museum.

How SumUp made digital analytics more accessible using AWS Glue

Post Syndicated from Mira Daniels original https://aws.amazon.com/blogs/big-data/how-sumup-made-digital-analytics-more-accessible-using-aws-glue/

This is a guest blog post by Mira Daniels and Sean Whitfield from SumUp.

SumUp is a leading global financial technology company driven by the purpose of leveling the playing field for small businesses. Founded in 2012, SumUp is the financial partner for more than 4 million small merchants in over 35 markets worldwide, helping them start, run and grow their business. Through its Super App, SumUp provides merchants with a free business account and card, an online store, and an invoicing solution – as well as in-person and remote payments seamlessly integrated with SumUp’s card terminals and point-of-sale registers. For more information, please visit sumup.co.uk.

As most organizations, that have turned to Google Analytics (GA) as a digital analytics solution, mature they discover a more pressing need to integrate this data silo with the rest of their organization’s data to enable better analytics and resulting product development and fraud detection. Unless, of course, the rest of their data also resides in the Google Cloud. In this post we showcase how we used AWS Glue to move siloed digital analytics data, with inconsistent arrival times, to AWS S3 (our Data Lake) and our central data warehouse (DWH), Snowflake. AWS Glue gave us a cost-efficient option to migrate the data and we further optimized storage cost by pruning cold data. What was essential to the solution development was a good understanding of the nature of the data, the source of it (export from GA) as well as the form and scope of the data useful to the data consumers.

Business context

At SumUp we use GA and Firebase as our digital analytics solutions and AWS as our main Cloud Provider. In order to mature our data marts, it became clear that we needed to provide Analysts and other data consumers with all tracked digital analytics data in our DWH as they depend on it for analyses, reporting, campaign evaluation, product development and A/B testing. We further use the Digital Analytics data for our reverse ETL pipelines that ingest merchant behavior data back into the Ad tools. As the SumUp merchants user journey only starts onsite (with a Sign up or product purchase), but extends to offline card reader transactions or usage of our products from within our app and web dashboard, it is important to combine Digital Analytics data with other (backend) data sources in our Data Lake or DWH. The Data Science teams also use this data for churn prediction and CLTV modeling.

Given that the only source to access all raw data is by exporting it to BigQuery (first), data accessibility becomes challenging if BigQuery isn’t your DWH solution. What we needed was a data pipeline from BigQuery to our Data Lake and the DWH. The pipeline further needed to run based on the trigger of new data arriving in BigQuery and could not run on a simple schedule as data would not arrive at the same time consistently.

We experimented with some no-code tools that allowed for the direct export of Google Analytics data (not available for Firebase) from BigQuery directly to Snowflake, but due to our growing data volume, this wasn’t financially scalable. Other no-code tools, even if they can move data to S3, did not meet our technical requirements. The solutions we experimented with did not give us the flexibility to monitor and scale resources per pipeline run and optimize the pipeline ourselves.

We had a solid business case to build our own internal digital analytics pipeline to not only reduce spending on our existing outsourced Google Analytics pipeline (that moved from BigQuery to Snowflake directly), but also to make GA and Firebase data available in both the Data Lake and DWH.

Technical challenges

Data source specifics: The data in BigQuery is the export of GA 360 data and Firebase Analytics data. It consists of full-day and intraday tables. BigQuery uses a columnar storage format that can efficiently query semi-structured data, in the case of GA and Firebase data as arrays of structs.

Full-day: Daily tables that do not change retroactively for GA data, but for Firebase data

Intraday: Until the daily tables (full-day are created the intraday tables are populated with new and corrected records.

Update interval: intraday is refreshed and overwritten at least 3x a day. When the full-day table is created, the intraday table from that day is deleted.

Since we started exporting GA tracking data to BigQuery in 2015 the amount of data tracked and stored has grown 70x (logical bytes) and is >3TB in total. Our solution needs not only be able to ingest new data but also backfill historical data from the last 7 years. Firebase data has been exported to BigQuery since 2019 and grew 10x in size (logical bytes) over time.

Our major challenge with ingesting Firebase data to the DWH was the combination of its size (historically >35TB) with the arrival of late data upstream. Since BigQuery processes hits with timestamps up to 72 hours earlier for Firebase data, historical data must be updated for each daily run of the pipeline for the last 3 days. Consequently, this greatly increases compute and storage resources required for the pipeline.

The intraday source data usually arrives every 2-4 hours so real-time downstream pipelines are currently not needed with our GA and Firebase data export (into BigQuery) setup.

Querying the dataset (containing nested and repeated fields) in Snowflake presented an accessibility problem for our users, as it required unfriendly verbose query syntax and greatly strained our compute resources. We used AWS Glue to run a UDF to transform this nested data into a Snowflake object (key-value) data structure that is both more user friendly and requires less compute resources to access.

Regarding version control of the Glue script, our team wanted to contain all of the pipeline logic in the same repository for simplicity and ease of local development. Since we used MWAA for orchestrating our platform pipelines, we wanted our DAG code to be close to the Glue Script. This required us to add an initial first step in our pipeline to push the script to the respective bucket that AWS Glue is synced with.

Our ELT design pattern required that we overwrite/update data stored in S3 before loading it into snowflake which required us to use a Spark DF.

In order to save on Snowflake storage costs, we decided to only keep hot data in a materialised table and have access to colder historical data through external tables.

Solution overview

We chose AWS Glue for the first step in the pipeline (moving data to S3) as it nicely integrates into our serverless AWS infrastructure and Pyspark makes it very flexible to script transformation steps and add partitions to the data storage in S3. It already provided a connector to BigQuery that was easy to configure and the complete Glue job was nicely abstracting the underlying system infrastructure.

We used the existing Google BigQuery Connector for Glue following parts of this blog post.

Ingestion

The code for the ingestion pipeline was set up in a way that it is easily extendable and reusable by splitting jobs/tasks in different methods and classes building a framework for all (future) ingestions from BigQuery to Snowflake.

Initialise Glue Job: Firstly, we push the glue script to S3 as the boto3 client expects it to be there for job execution. We then check if the glue job already exists. If it doesn’t exist, it’s created, otherwise it is updated with the most recent job parameters. We made sure to add appropriate tags for cost monitoring.

Pull From Google Cloud Platform: Based on the data source, GA or Firebase, we pick dynamic (or hard coded for backfilling) days to be pulled from the source. We then check if those selected dates exist as sharded tables in BigQuery, in case the data is late we cannot pull it and will wait to try again a few minutes later (using a generous retry schedule). If the dates we chose to be ingested can be found in BigQuery we run the Glue job.

Dump to S3: Some transformation steps are carried out within the Glue job before the data is moved into S3. It is saved in day-partitioned folders and repartitioned into 300-500MB files for better table query performance in Snowflake.

Data Catalog: We also wanted to automate a Glue Crawler to have metadata in a Data Catalog and be able to explore our files in S3 with Athena. With the help of the boto3 library, our pipeline contains a step which runs the crawler at the end of the pipeline. In order to simplify schema evolution, we moved the Data Catalog creation out of the Glue script.

DWH: The solution loading the data to Snowflake consists of tasks that are scheduled to access storage integrations with S3 and materialise the data in Snowflake tables. The jobs are batch jobs, we haven’t tested any streaming solutions as the data arrives batched in BigQuery. After loading new data into Snowflake we use a table pruner function that deletes the respective days that fall outside of the number of days of retention we defined for the table. This way we make sure to provide the data timeframe used by our stakeholders and avoid unnecessary storage costs.

In addition to the materialised tables in Snowflake we use External tables to make more historical data available (at slower query speed), these External tables are refreshed at the end of the data pipeline in Airflow using the Snowflake operator.

Development and Testing: Before the Glue job script was integrated into the task orchestration process and code, we tested the functionality of the job within Glue Studio using the Spark script Editor and Jupyter Notebook. It was great for quick iteration over a new feature. We used boto3 to access AWS infrastructure services and create, update and run the Glue job

Data Quality is ensured on the one hand by deduplicating Firebase events and adding a new hash key within the Glue script and on the other hand by comparing total row counts per day between Snowflake and BigQuery (the comparison is currently done in tableau). We also get alerted if our Airflow pipeline or the Snowflake tasks fail.

In the future we will add Data Quality checks with DBT.

Conclusion

As SumUp expands its product suite through innovation and acquisition, so does its infrastructure. AWS Glue has proven to be a scalable solution to ingest and transform siloed data in different clouds using marketplace connectors and the flexibility provided by pyspark.

What surprised us was how easily Glue can be customized (from no code to highly customised script and execution setup) and fit our volatile source data (size and shape). In the future we can think of further customization of the Glue script in terms of transformation of the data (more useful unnesting) as well as Glue job resource optimization.

The consumers of Digital Analytics data appreciate being able to make use of the full source data schema in both Data Lake and DWH. The transparency around the process being managed by the central Data Platform team facilitates trust and reliance on the data. Through pipeline abstraction, we’re now able to provide data products in high demand to all our data consumers.

The pipeline framework we built can easily be extended if needed as its components were built separately and are only forming the final pipeline during the last step, the orchestration.

What we plan to test and optimize in the future is loading historical data in automated batch jobs which is dependent on both API limits on the source side and compute resource orchestration on Glue’s side (we did not test automated batching in the past and were manually chunking data into jobs for backfilling). Additionally, we will incorporate these pipeline features into our main ingestion framework, which would allow our stakeholders to define digital analytics pipelines themselves in a self-service manner.

If your organization faces similar challenges with digital analytics data silos, we recommend developing a proof of concept to migrate your data to S3 using the Google BigQuery Connector for Glue and this blog post. Experiment with the Glue job settings and PySpark script options to find the best match for the size and latency requirements of your data. For migrating to a DWH like Snowflake, leverage COPY INTO statements as they are a cheaper alternative to Snowpipes for this volume of data. Once you have prototyped, develop a proper well tested solution with Amazon Managed Workflows for Apache Airflow MWAA that includes version control for the infrastructure and the pipeline logic.


About the Authors

Mira Daniels (Data Engineer in Data Platform team), recently moved from Data Analytics to Data Engineering to make quality data more easily accessible for data consumers. She has been focusing on Digital Analytics and marketing data in the past.

Sean Whitfield (Senior Data Engineer in Data Platform team), a data enthusiast with a life science background who pursued his passion for data analysis into the realm of IT. His expertise lies in building robust data engineering and self-service tools. He also has a fervor for sharing his knowledge with others and mentoring aspiring data professionals.

Advanced patterns with AWS SDK for pandas on AWS Glue for Ray

Post Syndicated from Abdel Jaidi original https://aws.amazon.com/blogs/big-data/advanced-patterns-with-aws-sdk-for-pandas-on-aws-glue-for-ray/

AWS SDK for pandas is a popular Python library among data scientists, data engineers, and developers. It simplifies interaction between AWS data and analytics services and pandas DataFrames. It allows easy integration and data movement between 22 types of data stores, including Amazon Simple Storage Service (Amazon S3), Amazon Athena, Amazon Redshift, and Amazon OpenSearch Service.

In the previous post, we discussed how you can use AWS SDK for pandas to scale your workloads on AWS Glue for Ray. We explained how using both Ray and Modin within the library enabled us to distribute workloads across a compute cluster. To illustrate these capabilities, we explored examples of writing Parquet files to Amazon S3 at scale and querying data in parallel with Athena.

In this post, we show some more advanced ways to use this library on AWS Glue for Ray. We cover features and APIs from AWS services such as S3 Select, Amazon DynamoDB, and Amazon Timestream.

Solution overview

The Ray and Modin frameworks allow scaling of pandas workloads easily. You can write code on your laptop that uses the SDK for pandas to get data from an AWS data or analytics service to a pandas DataFrame, transform it using pandas, and then write it back to the AWS service. By using the distributed version of the SDK for pandas and replacing pandas with Modin, exactly the same code will scale on a Ray runtime—all logic about task coordination and distribution is hidden. Taking advantage of these abstractions, the AWS SDK for pandas team has made considerable use of Ray primitives to distribute some of the existing APIs (for the full list, see Supported APIs).

In this post, we show how to use some of these APIs in an AWS Glue for Ray job, namely querying with S3 Select, writing to and reading from a DynamoDB table, and writing to a Timestream table. Because AWS Glue for Ray is a fully managed environment, it’s by far the easiest way to run jobs because you don’t need to worry about cluster management. If you want to create your own cluster on Amazon Elastic Compute Cloud (Amazon EC2), refer to Distributing Calls on Ray Remote Cluster.

Configure solution resources

We use an AWS CloudFormation stack to provision the solution resources. Complete the following steps:

  1. Choose Launch stack to provision the stack in your AWS account:

Launch CloudFormation Stack

This takes about 2 minutes to complete. On successful deployment, the CloudFormation stack shows the status as CREATE_COMPLETE.

CloudFormation CREATE_COMPLETE

  1. Navigate to AWS Glue Studio to find an AWS Glue job named AdvancedGlueRayJob.

Glue for Ray Job Script

  1. On the Job details tab, scroll down and choose Advanced Properties.

Under Job Parameters, AWS SDK for pandas is specified as an additional Python module to install, along with Modin as an extra dependency.

Glue for Ray Job Details

  1. To run the job, choose Run and navigate to the Runs tab to monitor the job’s progress.

Glue for Ray Job Runs

Import the library

To import the library, use the following code:

import awswrangler as wr

AWS SDK for pandas detects if the runtime supports Ray, and automatically initializes a cluster with the default parameters. Advanced users can override this process by starting the Ray runtime before the import command.

Scale S3 Select workflows

S3 Select allows you to use SQL statements to query and filter S3 objects, including compressed files. This can be particularly useful if you have large files of several TBs and want to extract some information. Because the workload is delegated to Amazon S3, you don’t have to download and filter objects on the client side, leading to lower latency, lower cost, and higher performance.

With AWS SDK for pandas, these calls to S3 Select can be distributed across Ray workers in the cluster. In the following example, we query Amazon reviews data in Parquet format, filtering for reviews with 5-star ratings in the Mobile_Electronics partition. star_rating is a column in the Parquet data itself, while the partition is a directory.

# Filter for 5-star reviews with S3 Select within a partition
df_select = wr.s3.select_query(
    sql="SELECT * FROM s3object s where s.\"star_rating\" >= 5",
    path="s3://amazon-reviews-pds/parquet/product_category=Mobile_Electronics/",
    input_serialization="Parquet",
    input_serialization_params={},
    scan_range_chunk_size=1024*1024*16,
)

scan_range_chunk_size is an important parameter to calibrate when using S3 Select. It specifies the range of bytes to query the S3 object, thereby determining the amount of work delegated to each worker. For this example, it’s set to 16 MB, meaning the work of scanning the object is parallelized into separate S3 Select requests each 16 MB in size. A higher value equates to larger chunks per worker but fewer workers, and vice versa.

The results are returned in a Modin DataFrame, which is a drop-in replacement for pandas. It exposes the same APIs but enables you to use all the workers in the cluster. The data in the Modin DataFrame is distributed along with all the operations among the workers.

Scale DynamoDB workflows

DynamoDB is a scalable NoSQL database service that provides high-performance, low-latency, and managed storage.

AWS SDK for pandas uses Ray to scale DynamoDB workflows, allowing parallel data retrieval and insertion operations. The wr.dynamodb.read_items function retrieves data from DynamoDB in parallel across multiple workers, and the results are returned as a Modin DataFrame. Similarly, data insertion into DynamoDB can be parallelized using the wr.dynamodb.put_df function.

For example, the following code inserts the Amazon Reviews DataFrame obtained from S3 Select into a DynamoDB table and then reads it back:

# Write Modin DataFrame to DynamoDB
wr.dynamodb.put_df(
    df=df_select,
    table_name=dynamodb_table_name,
    use_threads=4,
)
# Read data back from DynamoDB to Modin
    df_dynamodb = wr.dynamodb.read_items(
    table_name=dynamodb_table_name,
    allow_full_scan=True,
)

DynamoDB calls are subject to AWS service quotas. The concurrency can be limited using the use_threads parameter.

Scale Timestream workflows

Timestream is a fast, scalable, fully managed, purpose-built time series database that makes it easy to store and analyze trillions of time series data points per day. With AWS SDK for pandas, you can distribute Timestream write operations across multiple workers in your cluster.

Data can be written to Timestream using the wr.timestream.write function, which parallelizes the data insertion process for improved performance.

In this example, we use sample data from Amazon S3 loaded into a Modin DataFrame. Familiar pandas commands such as selecting columns or resetting the index are applied at scale with Modin:

# Select columns
df_timestream = df_timestream.loc[:, ["region", "az", "hostname", "measure_kind", "measure", "time"]]
# Overwrite the time column
df_timestream["time"] = datetime.now()
# Reset the index
df_timestream.reset_index(inplace=True, drop=False)
# Filter a measure
df_timestream = df_timestream[df_timestream.measure_kind == "cpu_utilization"]

The Timestream write operation is parallelized across blocks in your dataset. If the blocks are too big, you can use Ray to repartition the dataset and increase the throughput, because each block will be handled by a separate thread:

# Repartition the data into 100 blocks
df_timestream = ray.data.from_modin(df_timestream).repartition(100).to_modin()

We are now ready to insert the data into Timestream, and a final query confirms the number of rows in the table:

# Write data to Timestream
rejected_records = wr.timestream.write(
    df=df_timestream,
    database=timestream_database_name,
    table=timestream_table_name,
    time_col="time",
    measure_col="measure",
    dimensions_cols=["index", "region", "az", "hostname"],
)

# Query
df = wr.timestream.query(f'SELECT COUNT(*) AS counter FROM "{timestream_database_name}"."{timestream_table_name}"')

Clean up

To prevent unwanted charges to your AWS account, we recommend deleting the AWS resources that you used in this post:

  1. On the Amazon S3 console, empty the data from the S3 bucket with prefix glue-ray-blog-script.

S3 Bucket

  1. On the AWS CloudFormation console, delete the AdvancedSDKPandasOnGlueRay stack.

All resources will be automatically deleted with it.

Conclusion

In this post, we showcased some more advanced patterns to run your workloads using AWS SDK for pandas. In particular, these examples demonstrated how Ray is used within the library to distribute operations for several other AWS services, not just Amazon S3. When used in combination with AWS Glue for Ray, this gives you access to a fully managed environment to run at scale. We hope this solution can help with migrating your existing pandas jobs to achieve higher performance and speedups across multiple data stores on AWS.


About the Authors

Abdel JaidiAbdel Jaidi is a Senior Cloud Engineer for AWS Professional Services. He works on open-source projects focused on AWS Data & Analytics services. In his spare time, he enjoys playing tennis and hiking.

Anton KukushkinAnton Kukushkin is a Data Engineer for AWS Professional Services based in London, UK. In his spare time, he enjoys playing musical instruments.

Leon LuttenbergerLeon Luttenberger is a Data Engineer for AWS Professional Services based in Austin, Texas. He works on AWS open-source solutions that help our customers analyze their data at scale. In his spare time, he enjoys reading and traveling.

Migrate from Google BigQuery to Amazon Redshift using AWS Glue and Custom Auto Loader Framework

Post Syndicated from Tahir Aziz original https://aws.amazon.com/blogs/big-data/migrate-from-google-bigquery-to-amazon-redshift-using-aws-glue-and-custom-auto-loader-framework/

Amazon Redshift is a widely used, fully managed, petabyte-scale cloud data warehouse. Tens of thousands of customers use Amazon Redshift to process exabytes of data every day to power their analytic workloads. Customers are looking for tools that make it easier to migrate from other data warehouses, such as Google BigQuery, to Amazon Redshift to take advantage of the service price-performance, ease of use, security, and reliability.

In this post, we show you how to use AWS native services to accelerate your migration from Google BigQuery to Amazon Redshift. We use AWS Glue, a fully managed, serverless, ETL (extract, transform, and load) service, and the Google BigQuery Connector for AWS Glue (for more information, refer to Migrating data from Google BigQuery to Amazon S3 using AWS Glue custom connectors). We also add automation and flexibility to simplify migration of multiple tables to Amazon Redshift using the Custom Auto Loader Framework.

Solution overview

The solution provides a scalable and managed data migration workflow to migrate data from Google BigQuery to Amazon Simple Storage Service (Amazon S3), and then from Amazon S3 to Amazon Redshift. This pre-built solution scales to load data in parallel using input parameters.

The following architecture diagram shows how the solution works. It starts with setting up the migration configuration to connect to Google BigQuery, then converts the database schemas, and finally migrates the data to Amazon Redshift.

Architecture diagram showing how the solution works. It starts with setting-up the migration configuration to connect to Google BigQuery, then convert the database schemas, and finally migrate the data to Amazon Redshift.

The workflow contains the following steps:

  1. A configuration file is uploaded to an S3 bucket you have chosen for this solution. This JSON file contains the migration metadata, namely the following:
    • A list of Google BigQuery projects and datasets.
    • A list of all tables to be migrated for each project and dataset pair.
  2. An Amazon EventBridge rule triggers an AWS Step Functions state machine to start migrating the tables.
  3. The Step Functions state machine iterates on the tables to be migrated and runs an AWS Glue Python shell job to extract the metadata from Google BigQuery and store it in an Amazon DynamoDB table used for tracking the tables’ migration status.
  4. The state machine iterates on the metadata from this DynamoDB table to run the table migration in parallel, based on the maximum number of migration jobs without incurring limits or quotas on Google BigQuery. It performs the following steps:
    • Runs the AWS Glue migration job for each table in parallel.
    • Tracks the run status in the DynamoDB table.
    • After the tables have been migrated, checks for errors and exits.
  5. The data exported from Google BigQuery is saved to Amazon S3. We use Amazon S3 (even though AWS Glue jobs can write directly to Amazon Redshift tables) for a few specific reasons:
    • We can decouple the data migration and the data load steps.
    • It offers more control on the load steps, with the ability to reload the data or pause the process.
    • It provides fine-grained monitoring of the Amazon Redshift load status.
  6. The Custom Auto Loader Framework automatically creates schemas and tables in the target database and continuously loads data from Amazon S3 to Amazon Redshift.

A few additional points to note:

  • If you have already created the target schema and tables in the Amazon Redshift database, you can configure the Custom Auto Loader Framework to not automatically detect and convert the schema.
  • If you want more control over converting the Google BigQuery schema, you can use the AWS Schema Conversion Tool (AWS SCT). For more information, refer to Migrate Google BigQuery to Amazon Redshift using AWS Schema Conversion tool (SCT).
  • As of this writing, neither the AWS SCT nor Custom Auto Loader Framework support the conversion of nested data types (record, array and struct). Amazon Redshift supports semistructured data using the Super data type, so if your table uses such complex data types, then you need to create the target tables manually.

To deploy the solution, there are two main steps:

  1. Deploy the solution stack using AWS CloudFormation.
  2. Deploy and configure Custom Auto Loader Framework to load files from Amazon S3 to Amazon Redshift.

Prerequisites

Before getting started, make sure you have the following:

In this example, we named the file bq-mig-config.json

    1. Configure your Google account.
    2. Create an IAM role for AWS Glue (and note down the name of the IAM role).
    3. Subscribe to and activate the Google BigQuery Connector for AWS Glue.

Deploy the solution using AWS CloudFormation

To deploy the solution stack using AWS CloudFormation, complete the following steps:

  1. Choose Launch Stack:

This template provisions the AWS resources in the us-east-1 Region. If you want to deploy to a different Region, download the template bigquery-cft.yaml and launch it manually: on the AWS CloudFormation console, choose Create stack with new resources and upload the template file you downloaded.

The list of provisioned resources is as follows:

    • An EventBridge rule to start the Step Functions state machine on the upload of the configuration file.
    • A Step Functions state machine that runs the migration logic. The following diagram illustrates the state machine.
      Diagram representing the state machine deployed by the solution stack.
    • An AWS Glue Python shell job used to extract the metadata from Google BigQuery. The metadata will be stored in an DynamoDB table, with a calculated attribute to prioritize the migration job. By default, the connector creates one partition per 400 MB in the table being read (before filtering). As of this writing, the Google BigQuery Storage API has a maximum quota for parallel read streams, so we set the limit for worker nodes for tables larger than 400 GB. We also calculate the max number of jobs that can run in parallel based on those values.
    • An AWS Glue ETL job used to extract the data from each Google BigQuery table and saves it in Amazon S3 in Parquet format.
    • A DynamoDB table (bq_to_s3_tracking) used to store the metadata for each table to be migrated (size of the table, S3 path used to store the migrated data, and the number of workers needed to migrate the table).
    • A DynamoDB table (bq_to_s3_maxstreams) used to store the maximum number of streams per state machine run. This helps us minimize job failures due to limits or quotas. Use the Cloud Formation template to customize the name of the DynamoDB table. The prefix for the DynamoDB table is bq_to_s3.
    • The IAM roles needed by the state machine and AWS Glue jobs.
  1. Choose Next.

Screen caption showing the AWS Cloudformation Create stack page.

  1. For Stack name, enter a name.
  2. For Parameters, enter the parameters listed in the following table, then choose Create.
CloudFormation Template Parameter Allowed Values Description
InputBucketName S3 bucket name

The S3 bucket where the AWS Glue job stores the migrated data.

The data will be actually stored in a folder named s3-redshift-loader-source, which is used by the Custom Auto Loader Framework.

InputConnectionName AWS Glue connection name, the default is glue-bq-connector-24 The name of the AWS Glue connection that is created using the Google BigQuery connector.
InputDynamoDBTablePrefix DynamoDB table name prefix, the default is bq_to_s3 The prefix that will be used when naming the two DynamoDB tables created by the solution.
InputGlueETLJob AWS Glue ETL job name, the default is bq-migration-ETL The name you want to give to the AWS Glue ETL job. The actual script is saved in the S3 path specified in the parameter InputGlueS3Path.
InputGlueMetaJob AWS Glue Python shell job name, the default is bq-get-metadata The name you want to give to AWS Glue Python shell job. The actual script is saved in the S3 path specified in the parameter InputGlueS3Path.
InputGlueS3Path S3 path, the default is s3://aws-glue-scripts-${AWS::Account}-${AWS::Region}/admin/ This is the S3 path in which the stack will copy the scripts for AWS Glue jobs. Remember to replace: ${AWS::Account} with the actual AWS account ID and ${AWS::Region} with the Region you plan to use, or provide your own bucket and prefix in a complete path.
InputMaxParallelism Number of parallel migration jobs to run, the default is 30 The maximum number of tables you want to migrate concurrently.
InputBQSecret AWS Secrets Manager secret name The name of the AWS Secrets Manager secret in which you stored the Google BigQuery credential.
InputBQProjectName Google BigQuery project name The name of your project in Google BigQuery in which you want to store temporary tables; you will need write permissions on the project.
StateMachineName

Step Functions state machine name, the default is

bq-to-s3-migration-state-machine

The name of the Step Functions state machine.
SourceS3BucketName S3 bucket name, the default is aws-blogs-artifacts-public

The S3 bucket where the artifacts for this post are stored.

Do not change the default.

Deploy and configure the Custom Auto Loader Framework to load files from Amazon S3 to Amazon Redshift

The Custom Auto Loader Framework utility makes data ingestion to Amazon Redshift simpler and automatically loads data files from Amazon S3 to Amazon Redshift. The files are mapped to the respective tables by simply dropping files into preconfigured locations on Amazon S3. For more details about the architecture and internal workflow, refer to Custom Auto Loader Framework.

To set up the Custom Auto Loader Framework, complete the following steps:

  1. Choose Launch Stack to deploy the CloudFormation stack in the us-east-1 Region:

  1. On the AWS CloudFormation console, choose Next.
  2. Provide the following parameters to help ensure the successful creation of resources. Make sure you have collected these values beforehand.
Parameter Name Allowed Values Description
CopyCommandSchedule cron(0/5 * ? * * *) The EventBridge rules KickoffFileProcessingSchedule and QueueRSProcessingSchedule are triggered based on this schedule. The default is 5 minutes.
DatabaseName dev The Amazon Redshift database name.
DatabaseSchemaName public The Amazon Redshift schema name.
DatabaseUserName demo The Amazon Redshift user name who has access to run COPY commands on the Amazon Redshift database and schema.
RedshiftClusterIdentifier democluster The Amazon Redshift cluster name.
RedshiftIAMRoleARN arn:aws:iam::7000000000:role/RedshiftDemoRole The Amazon Redshift cluster attached role, which has access to the S3 bucket. This role is used in COPY commands.
SourceS3Bucket Your-bucket-name The S3 bucket where data is located. Use the same bucket you used to store the migrated data as indicated in the previous stack.
CopyCommandOptions delimiter '|' gzip

Provide the additional COPY command data format parameters as follows:

delimiter '|' dateformat 'auto' TIMEFORMAT 'auto'

InitiateSchemaDetection Yes The setting to dynamically detect the schema prior to file upload.

The following screenshot shows an example of our parameters.

Screen capture showing the stack detailes page with the input parameters filled with example values

  1. Choose Create.
  2. Monitor the progress of the Stack creation and wait until it is complete.
  3. To verify the Custom Auto Loader Framework configuration, log in to the Amazon S3 console and navigate to the S3 bucket you provided as a value to the SourceS3Bucket parameter.

You should see a new directory called s3-redshift-loader-source is created.

Screen caption of the Amazon S3 console showing the folder you should be able to see in your S3 bucket.

Test the solution

To test the solution, complete the following steps:

  1. Create the configuration file based on the prerequisites. You can also download the demo file.
  2. To set up the S3 bucket, on the Amazon S3 console, navigate to the folder bq-mig-config in the bucket you provided in the stack.
  3. Upload the config file into it.
  4. To enable EventBridge notifications to the bucket, open the bucket on the console and on the Properties tab, locate Event notifications.
  5. In the Amazon EventBridge section, choose Edit.
  6. Select On, then choose Save changes.

  1. On AWS Step Function console, monitor the run of the state machine.
  2. Monitor the status of the loads in Amazon Redshift. For instructions, refer to Viewing Current Loads.
  3. Open the Amazon Redshift Query Editor V2 and query your data.

Pricing considerations

You might have egress charges for migrating data out of Google BigQuery into Amazon S3. Review and calculate the cost for moving your data on your Google cloud billing console. As of this writing, AWS Glue 3.0 or later charges $0.44 per DPU-hour, billed per second, with a 1-minute minimum for Spark ETL jobs. For more information, see AWS Glue Pricing. With auto scaling enabled, AWS Glue automatically adds and removes workers from the cluster depending on the parallelism at each stage or microbatch of the job run.

Clean up

To avoid incurring future charges, clean up your resources:

  1. Delete the CloudFormation solution stack.
  2. Delete the CloudFormation Custom Auto Loader Framework stack.

Conclusion

In this post, we demonstrated how to build a scalable and automated data pipeline to migrate your data from Google BigQuery to Amazon Redshift. We also highlighted how the Custom Auto Loader framework can automate the schema detection, create tables for your S3 files, and continuously load the files into your Amazon Redshift warehouse. With this approach, you can automate the migration of entire projects (even multiple projects at the time) in Google BigQuery to Amazon Redshift. This helps improve data migration times into Amazon Redshift significantly through the automatic table migration parallelization.

The auto-copy feature in Amazon Redshift simplifies automatic data loading from Amazon S3 with a simple SQL command, users can easily automate data ingestion from Amazon S3 to Amazon Redshift using the Amazon Redshift auto-copy preview feature

For more information about the performance of the Google BigQuery Connector for AWS Glue, refer to Migrate terabytes of data quickly from Google Cloud to Amazon S3 with AWS Glue Connector for Google BigQuery and learn how to migrate a large amount of data (1.9 TB) into Amazon S3 quickly (about 8 minutes).

To learn more about AWS Glue ETL jobs, see Simplify data pipelines with AWS Glue automatic code generation and workflows and Making ETL easier with AWS Glue Studio.


About the Authors

Tahir Aziz is an Analytics Solution Architect at AWS. He has worked with building data warehouses and big data solutions for over 13 years. He loves to help customers design end-to-end analytics solutions on AWS. Outside of work, he enjoys traveling and cooking.

Ritesh Kumar Sinha is an Analytics Specialist Solutions Architect based out of San Francisco. He has helped customers build scalable data warehousing and big data solutions for over 16 years. He loves to design and build efficient end-to-end solutions on AWS. In his spare time, he loves reading, walking, and doing yoga.

Fabrizio Napolitano is a Principal Specialist Solutions Architect for DB and Analytics. He has worked in the analytics space for the last 20 years, and has recently and quite by surprise become a Hockey Dad after moving to Canada.

Manjula Nagineni is a Senior Solutions Architect with AWS based in New York. She works with major financial service institutions, architecting and modernizing their large-scale applications while adopting AWS Cloud services. She is passionate about designing big data workloads cloud-natively. She has over 20 years of IT experience in software development, analytics, and architecture across multiple domains such as finance, retail, and telecom.

Sohaib Katariwala is an Analytics Specialist Solutions Architect at AWS. He has over 12 years of experience helping organizations derive insights from their data.

Managing data confidentiality for Scope 3 emissions using AWS Clean Rooms

Post Syndicated from Sundeep Ramachandran original https://aws.amazon.com/blogs/architecture/managing-data-confidentiality-for-scope-3-emissions-using-aws-clean-rooms/

Scope 3 emissions are indirect greenhouse gas emissions that are a result of a company’s activities, but occur outside the company’s direct control or ownership. Measuring these emissions requires collecting data from a wide range of external sources, like raw material suppliers, transportation providers, and other third parties. One of the main challenges with Scope 3 data collection is ensuring data confidentiality when sharing proprietary information between third-party suppliers. Organizations are hesitant to share information that could potentially be used by competitors. This can make it difficult for companies to accurately measure and report on their Scope 3 emissions. And the result is that it limits their ability to manage climate-related impacts and risks.

In this blog, we show how to use AWS Clean Rooms to share Scope 3 emissions data between a reporting company and two of their value chain partners (a raw material purchased goods supplier and a transportation provider). Data confidentially requirements are specified by each organization before participating in the data AWS Clean Rooms collaboration (see Figure 1).

Data confidentiality requirements of reporting company and value chain partners

Figure 1. Data confidentiality requirements of reporting company and value chain partners

Each account has confidential data described as follows:

  • Column 1 lists the raw material Region of origin. This is business confidential information for supplier.
  • Column 2 lists the emission factors at the raw material level. This is sensitive information for the supplier.
  • Column 3 lists the mode of transportation. This is business confidential information for the transportation provider.
  • Column 4 lists the emissions in transporting individual items. This is sensitive information for the transportation provider.
  • Rows in column 5 list the product recipe at the ingredient level. This is trade secret information for the reporting company.

Overview of solution

In this architecture, AWS Clean Rooms is used to analyze and collaborate on emission datasets without sharing, moving, or revealing underlying data to collaborators (shown in Figure 2).

Architecture for AWS Clean Rooms Scope 3 collaboration

Figure 2. Architecture for AWS Clean Rooms Scope 3 collaboration

Three AWS accounts are used to demonstrate this approach. The Reporting Account creates a collaboration in AWS Clean Rooms and invites the Purchased Goods Account and Transportation Account to join as members. All accounts can protect their underlying data with privacy-enhancing controls to contribute data directly from Amazon Simple Storage Service (S3) using AWS Glue tables.

The Purchased Goods Account includes users who can update the purchased goods bucket. Similarly, the Transportation Account has users who can update the transportation bucket. The Reporting Account can run SQL queries on the configured tables. AWS Clean Rooms only returns results complying with the analysis rules set by all participating accounts.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Although Amazon S3 and AWS Clean Rooms are free-tier eligible, a low fee applies to AWS Glue. Clean-up actions are provided later in this blog post to minimize costs.

Configuration

We configured the S3 buckets for each AWS account as follows:

  • Reporting Account: reportingcompany.csv
  • Purchased Goods Account: purchasedgood.csv
  • Transportation Account: transportation.csv

Create an AWS Glue Data Catalog for each S3 data source following the method in the Glue Data Catalog Developer Guide. The AWS Glue tables should match the schema detailed previously in Figure 1, for each respective account (see Figure 3).

Configured AWS Glue table for ‘Purchased Goods’

Figure 3. Configured AWS Glue table for ‘Purchased Goods’

Data consumers can be configured to ingest, analyze, and visualize queries (refer back to Figure 2). We will tag the Reporting Account Glue Database as “reporting-db” and the Glue Table as “reporting.” Likewise, the Purchased Goods Account will have “purchase-db” and “purchase” tags.

Security

Additional actions are recommended to secure each account in a production environment. To configure encryption, review the Further Reading section at the end of this post, AWS Identity and Access Management (IAM) roles, and Amazon CloudWatch.

Walkthrough

This walkthrough consists of four steps:

  1. The Reporting Account creates the AWS Clean Rooms collaboration and invites the Purchased Goods Account and Transportation Account to share data.
  2. The Purchased Goods Account and Transportation Account accepts this invitation.
  3. Rules are applied for each collaboration account restricting how data is shared between AWS Clean Rooms collaboration accounts.
  4. The SQL query is created and run in the Reporting Account.

1. Create the AWS Clean Rooms collaboration in the Reporting Account

(The steps covered in this section require you to be logged into the Reporting Account.)

  • Navigate to the AWS Clean Rooms console and click Create collaboration.
  • In the Details section, type “Scope 3 Clean Room Collaboration” in the Name field.
  • Scroll to the Member 1 section. Enter “Reporting Account” in the Member display name field.
  • In Member 2 section, enter “Purchased Goods Account” for your first collaboration member name, with their account number in the Member AWS account ID box.
  • Click Add another member and add “Transportation Account” as the third collaborator with their AWS account number.
  • Choose the “Reporting Account” as the Member who can query and receive result in the Member abilities section. Click Next.
  • Select Yes, join by creating membership now. Click Next.
  • Verify the collaboration settings on the Review and Create page, then select Create and join collaboration and create membership.

Both accounts will then receive an invitation to accept the collaboration (see Figure 4). The console reveals each member status as “Invited” until accepted. Next, we will show how the invited members apply query restrictions on their data.

New collaboration created in AWS Clean Rooms

Figure 4. New collaboration created in AWS Clean Rooms

2. Accept invitations and configure table collaboration rules

Steps in this section are applied to the Purchased Goods Account and Transportation Account following collaboration environment setup. For brevity, we will demonstrate steps using the Purchased Goods Account. Differences for the Transportation Account are noted.

  • Log in to the AWS account owning the Purchased Goods Account and accept the collaboration invitation.
  • Open the AWS Clean Rooms console and select Collaborations on the left-hand navigation pane, then click Available to join.
  • You will see an invitation from the Scope 3 Clean Room Collaboration. Click on Scope 3 Clean Room Collaboration and then Create membership.
  • Select Tables, then Associate table. Click Configure new table.

The next action is to associate the Glue table created from the purchasedgoods.csv file. This sequence restricts access to the origin_region column (transportation_mode for the Transportation Account table) in the collaboration.

  • In the Scope 3 Clean Room Collaboration, select Configured tables in the left-hand pane, then Configure new table. Select the AWS Glue table associated with purchasedgoods.csv (shown in Figure 5).
  • Select the AWS Glue Database (purchase-db) and AWS Glue Table (purchase).
  • Verify the correct table section by toggling View schema from the AWS Glue slider bar.
  • In the Columns allowed in collaboration section, select all fields except for origin_region. This action prevents the origin_region column being accessed and viewed in the collaboration.
  • Complete this step by selecting Configure new table.
Purchased Goods account table configuration

Figure 5. Purchased Goods account table configuration

  • Select Configure analysis rule (see Figure 6).
  • Select Aggregation type then Next.
  • Select SUM as the Aggregate function and s3_upstream_purchased_good for the column.
  • Under Join controls, select Specify Join column. Select “item” from the list of options. This permits SQL join queries to execute on the “item” column. Click Next.
Table rules for the Purchased Goods account

Figure 6. Table rules for the Purchased Goods account

  • The next page specifies the minimum number of unique rows to aggregate for the “join” command. Select “item” for Column name and “2” for the Minimum number of distinct values. Click Next.
  • To confirm the table configuration query rules, click Configure analysis rule.
  • The final step is to click Associate to collaboration and select Scope 3 Clean Room Collaboration in the pulldown menu. Select Associate table after page refresh.

The procedure in this section is repeated for the Transportation Account, with the following exceptions:

  1. The columns shared in this collaboration are item, s3_upstream_transportation, and unit.
  2. The Aggregation function is a SUM applied on the s3_upstream_transportation column.
  3. The item column has an Aggregation constraint minimum of two distinct values.

3. Configure table collaboration rules inside the Reporting Account

At this stage, member account tables are created and shared in the collaboration. The next step is to configure the Reporting Account tables in the Reporting Account’s AWS account.

  • Navigate to AWS Clean Rooms. Select Configured tables, then Configure new table.
  • Select the Glue database and table associated with the file reportingcompany.csv.
  • Under Columns allowed in collaboration, select All columns, then Configure new table.
  • Configure collaboration rules by clicking Configure analysis rule using the Guided workflow.
  • Select Aggregation type, then Next.
  • Select SUM as the Aggregate function and ingredient for the column (see Figure 7).
  • Only SQL join queries can be executed on the ingredient column by selecting it in the Specify join columns section.
  • In the Dimension controls, select product. This option permits grouping by product name in the SQL query. Select Next.
  • Select None in the Scalar functions section. Click Next. Read more about scalar functions in the AWS Clean Rooms User Guide.
Table rules for the Reporting account

Figure 7. Table rules for the Reporting account

  • On the next page, select ingredient for Column name and 2 for the Minimum number of distinct values. Click Next. To confirm query control submission, select Configure analysis rule on the next page.
  • Validate the setting in the Review and Configure window, then select Next.
  • Inside the Configured tables tab, select Associate to collaboration. Assign the table to the Scope 3 Clean Rooms Collaboration.
  • Select the Scope 3 Clean Room Collaboration in the dropdown menu. Select Choose collaboration.
    On the Scope 3 Clean Room Collaboration page, select reporting, then Associate table.

4. Create and run the SQL query

Queries can now be run inside the Reporting Account (shown in Figure 8).

Query results in the Clean Rooms Reporting Account

Figure 8. Query results in the Clean Rooms Reporting Account

  • Select an S3 destination to output the query results. Select Action, then Set results settings.
  • Enter the S3 bucket name, then click Save changes.
  • Paste this SQL snippet inside the query text editor (see Figure 8):

SELECT
  r.product AS “Product”,
SUM(p.s3_upstream_purchased_good) AS “Scope_3_Purchased_Goods_Emissions”,
SUM(t.s3_upstream_transportation) AS “Scope_3_Transportation_Emissions”
FROM
reporting r
  INNER JOIN purchase p ON r.ingredient = p.item
  INNER JOIN transportation t ON p.item = t.item
GROUP BY
  r.product

  • Click Run query. The query results should appear after a few minutes on the initial query, but will take less time for subsequent queries.

Conclusion

This example shows how Clean Rooms can aggregate data across collaborators to produce total Scope 3 emissions for each product from purchased goods and transportation. This query was performed between three organizations without revealing underlying emission factors or proprietary product recipe to one another. This alleviates data confidentially concerns and improves sustainability reporting transparency.

Clean Up

The following steps are taken to clean up all resources created in this walkthrough:

  • Member and Collaboration Accounts:
    1. AWS Clean Rooms: Disassociate and delete collaboration tables
    2. AWS Clean Rooms: Remove member account in the collaboration
    3. AWS Glue: Delete the crawler, database, and tables
    4. AWS IAM: Delete the AWS Clean Rooms service policy
    5. Amazon S3: Delete the CSV file storage buckets
      ·
  • Collaboration Account only:
    1. Amazon S3: delete the SQL query bucket
    2. AWS Clean Rooms: delete the Scope 3 Clean Room Collaboration

Further Reading:

Security Practices

Get custom data into Amazon Security Lake through ingesting Azure activity logs

Post Syndicated from Adam Plotzker original https://aws.amazon.com/blogs/security/get-custom-data-into-amazon-security-lake-through-ingesting-azure-activity-logs/

Amazon Security Lake automatically centralizes security data from both cloud and on-premises sources into a purpose-built data lake stored on a particular AWS delegated administrator account for Amazon Security Lake.

In this blog post, I will show you how to configure your Amazon Security Lake solution with cloud activity data from Microsoft Azure Monitor activity log, which you can query alongside your existing AWS CloudTrail data. I will walk you through the required steps — from configuring the required AWS Identity and Access Management (IAM) permissions, AWS Glue jobs, and Amazon Kinesis Data Streams required on the AWS side to forwarding that data from within Azure.

When you turn on Amazon Security Lake, it begins to collect actionable security data from various AWS sources. However, many enterprises today have complex environments that include a mix of different cloud resources in addition to on-premises data centers.

Although the AWS data sources in Amazon Security Lake encompass a large amount of the necessary security data needed for analysis, you may miss the full picture if your infrastructure operates across multiple cloud venders (for example, AWS, Azure, and Google Cloud Platform) and on-premises at the same time. By querying data from across your entire infrastructure, you can increase the number of indicators of compromise (IOC) that you identify, and thus increase the likelihood that those indicators will lead to actionable outputs.

Solution architecture

Figure 1 shows how to configure data to travel from an Azure event hub to Amazon Security Lake.

Figure 1: Solution architecture

Figure 1: Solution architecture

As shown in Figure 1, the solution involves the following steps:

  1. An AWS user instantiates the required AWS services and features that enable the process to function, including AWS Identity and Access Management (IAM) permissions, Kinesis data streams, AWS Glue jobs, and Amazon Simple Storage Service (Amazon S3) buckets, either manually or through an AWS CloudFormation template, such as the one we will use in this post.
  2. In response to the custom source created from the CloudFormation template, a Security Lake table is generated in AWS Glue.
  3. From this point on, Azure activity logs in their native format are stored within an Azure cloud event hub within an Azure account. An Azure function is deployed to respond to new events within the Azure event hub and forward these logs over the internet to the Kinesis data stream that was created in the preceding step.
  4. The Kinesis data stream forwards the data to an AWS Glue streaming job fronted by the Kinesis data.
  5. The AWS Glue job then performs the extract, transfer, and load (ETL) mapping to the appropriate Open Cybersecurity Schema Framework (OCSF) (specified for API Activity events at OCSF API Activity Mappings).
  6. The Azure events are partitioned with respect to the required partitioning requirements in Amazon Security Lake tables and stored in S3.
  7. The user can query these tables by using Amazon Athena alongside the rest of their data inside Amazon Security Lake.

Prerequisites

Before you implement the solution, complete the following prerequisites:

  • Verify that you have enabled Amazon Security Lake in the AWS Regions that correspond to the Azure Activity logs that you will forward. For more information, see What is Amazon Security Lake?
  • Preconfigure the custom source logging for the source AZURE_ACTIVITY in your Region. To configure this custom source in Amazon Security Lake, open the Amazon Security Lake console, navigate to Create custom data source, and do the following, as shown in Figure 2:
    • For Data source name, enter AZURE_ACTIVITY.
    • For Event class, select API_ACTIVITY.
    • For Account Id, enter the ID of the account which is authorized to write data to your data lake.
    • For External Id, enter “AZURE_ACTIVITY-<YYYYMMDD>
    Figure 2:  Configure custom data source

    Figure 2: Configure custom data source

For more information on how to configure custom sources for Amazon Security Lake, see Collecting data from custom sources.

Step 1: Configure AWS services for Azure activity logging

The first step is to configure the AWS services for Azure activity logging.

  1. To configure Azure activity logging in Amazon Security Lake, first prepare the assets required in the target AWS account. You can automate this process by using the provided CloudFormation template — Security Lake CloudFormation — which will do the heavy lifting for this portion of the setup.

    Note: I have predefined these scripts to create the AWS assets required to ingest Azure activity logs, but you can generalize this process for other external log sources, as well.

    The CloudFormation template has the following components:

    • securitylakeGlueStreamingRole — includes the following managed policies:
      • AWSLambdaKinesisExecutionRole
      • AWSGlueServiceRole
    • securitylakeGlueStreamingPolicy — includes the following attributes:
      • “s3:GetObject”
      • “s3:PutObject”
    • securitylakeAzureActivityStream — This Kinesis data stream is the endpoint that acts as the connection point between Azure and AWS and the frontend of the AWS Glue stream that feeds Azure activity logs to Amazon Security Lake.
    • securitylakeAzureActivityJob — This is an AWS Glue streaming job that is used to take in feeds from the Kinesis data stream and map the Azure activity logs within that stream to OCSF.
    • securitylake-glue-assets S3 bucket — This is the S3 bucket that is used to store the ETL scripts used in the AWS Glue job to map Azure activity logs.

    Running the CloudFormation template will instantiate the aforementioned assets in your AWS delegated administrator account for Amazon Security Lake.

  2. The CloudFormation template creates a new S3 bucket with the following syntax: securityLake-glue-assets-<ACCOUNT-ID><REGION>. After the CloudFormation run is complete, navigate to this bucket within the S3 console.
  3. Within the S3 bucket, create a scripts and temporary folder in the S3 bucket, as shown in Figure 4.
    Figure 4: Glue assets bucket

    Figure 4: Glue assets bucket

  4. Update the Azure AWS Glue Pyspark script by replacing the following values in the file. You will attach this script to your AWS Glue job and use it to generate the AWS assets required for the implementation.
    • Replace <AWS_REGION_NAME> with the Region that you are operating in — for example, us-east-2.
    • Replace <AWS_ACCOUNT_ID> with the account ID of your delegated administrator account for Amazon Security Lake — for example, 111122223333.
    • Replace <SECURITYLAKE-AZURE-STREAM-ARN> with the Kinesis stream name created through the CloudFormation template. To find the stream name, open the Kinesis console, navigate to the Kinesis stream with the name securityLakeAzureActivityStream<STREAM-UID>, and copy the Amazon Resource Name (ARN), as shown in the following figure.

      Figure 5: Kinesis stream ARN

      Figure 5: Kinesis stream ARN

    • Replace <SECURITYLAKE-BUCKET-NAME> with the name of your data lake S3 bucket root name — for example, s3://aws-security-data-lake-DOC-EXAMPLE-BUCKET.

    After you replace these values, navigate within the scripts folder and upload the AWS Glue PySpark Python script named azure-activity-pyspark.py, as shown in Figure 6.

    Figure 6: AWS Glue script

    Figure 6: AWS Glue script

  5. Within your AWS Glue job, choose Job details and configure the job as follows:
    • For Type, select Spark Streaming.
    • For Language, select Python 3.
    • For Script path, select the S3 path that you created in the preceding step.
    • For Temporary path, select the S3 path that you created in the preceding step.
  6. Save the changes, and run the AWS Glue job by selecting Save and then Run.
  7. Choose the Runs tab, and make sure that the Run status of the job is Running.
    igure 7: AWS Glue job status

    Figure 7: AWS Glue job status

At this point, you have finished the configurations from AWS.

Step 2: Configure Azure services for Azure activity log forwarding

You will complete the next steps in the Azure Cloud console. You need to configure Azure to export activity logs to an Azure cloud event hub within your desired Azure account or organization. Additionally, you need to create an Azure function to respond to new events within the Azure event hub and forward those logs over the internet to the Kinesis data stream that the CloudFormation template created in the initial steps of this post.

For information about how to set up and configure Azure Functions to respond to event hubs, see Azure Event Hubs Trigger for Azure Functions in the Azure documentation.

Configure the following Python script — Azure Event Hub Function — in an Azure function app. This function is designed to respond to event hub events, create a connection to AWS, and forward those events to Kinesis as deserialized JSON blobs.

In the script, replace the following variables with your own information:

  • For <SECURITYLAKE-AZURE-STREAM-ARN>, enter the Kinesis data stream ARN.
  • For <SECURITYLAKE-AZURE-STREAM-NAME>, enter the Kinesis data stream name.
  • For <SECURITYLAKE-AZURE-STREAM-KEYID>, enter the AWS Key Management Service (AWS KMS) key ID created through the CloudFormation template.

The <SECURITYLAKE-AZURE-STREAM-ARN> and securityLakeAzureActivityStream<STREAM-UID> are the same variables that you obtained earlier in this post (see Figure 5).

You can find the AWS KMS key ID within the AWS KMS managed key policy associated with securityLakeAzureActivityStream. For example, in the key policy shown in Figure 8, the <SECURITYLAKE-AZURE-STREAM-KEYID> is shown in line 3.

Figure 8: Kinesis data stream inputs

Figure 8: Kinesis data stream inputs

Important: When you are working with KMS keys retrieved from the AWS console or AWS API keys within Azure, you should be extremely mindful of how you approach key management. Improper or poor handling of keys could result in the interception of data from the Kinesis stream or Azure function.

It’s a best security practice to use a trusted key management architecture that uses sufficient encryption and security protocols when working with keys that safeguard sensitive security information. Within Azure, consider using services such as the AWS Azure AD integration for seamless and ephemeral credential usage inside of the azure function. See – Azure AD Integration – for more information on how the Azure AD Integration works to safeguard and manage stored security keys and help make sure that no keys are accessible to unauthorized parties or stored as unencrypted text outside the AWS console.

Step 3: Validate the workflow and query Athena

After you complete the preceding steps, your logs should be flowing. To make sure that the process is working correctly, complete the following steps.

  1. In the Kinesis Data Streams console, verify that the logs are flowing to your data stream. Open the Kinesis stream that you created previously, choose the Data viewer tab, and then choose Get records, as shown in Figure 9.
    Figure 9: Kinesis data stream inputs

    Figure 9: Kinesis data stream inputs

  2. Verify that the logs are partitioned and stored within the correct Security Lake bucket associated with the configured Region. The log partitions within the Security Lake bucket should have the following syntax — “region=<region>/account_id=<account_id>/eventDay=<YYYYMMDD>/”, and they should be stored with the expected parquet compression.
     Figure 10: S3 bucket with object

    Figure 10: S3 bucket with object

  3. Assuming that CloudTrail logs exist within your Amazon Security Lake instance as well, you can now create a query in Athena that pulls data from the newly created Azure activity table and examine it alongside your existing CloudTrail logs by running queries such as the following:
    SELECT 
        api.operation,
        actor.user.uid,
        actor.user.name,
        src_endpoint.ip,
        time,
        severity,
        metadata.version,
        metadata.product.name,
        metadata.product.vendor_name,
        category_name,
        activity_name,
        type_uid,
    FROM {SECURITY-LAKE-DB}.{SECURITY-LAKE-AZURE-TABLE}
    UNION ALL
    SELECT 
        api.operation,
        actor.user.uid,
        actor.user.name,
        src_endpoint.ip,
        time,
        severity,
        metadata.version,
        metadata.product.name,
        metadata.product.vendor_name,
        category_name,
        activity_name,
        type_uid,
    FROM {SECURITY-LAKE-DB}.{SECURITY-LAKE-CLOUDTRAIL-TABLE}

    Figure 11:  Query Azure activity and CloudTrail together in Athena

    Figure 11: Query Azure activity and CloudTrail together in Athena

For additional guidance on how to configure access and query Amazon Security Lake in Athena, see the following resources:

Conclusion

In this blog post, you learned how to create and deploy the AWS and Microsoft Azure assets needed to bring your own data to Amazon Security Lake. By creating an AWS Glue streaming job that can transform Azure activity data streams and by fronting that AWS Glue job with a Kinesis stream, you can open Amazon Security Lake to intake from external Azure activity data streams.

You also learned how to configure Azure assets so that your Azure activity logs can stream to your Kinesis endpoint. The combination of these two creates a working, custom source solution for Azure activity logging.

To get started with Amazon Security Lake, see the Getting Started page, or if you already use Amazon Security Lake and want to read additional blog posts and articles about this service, see Blog posts and articles.

If you have feedback about this blog post, submit comments in the Comments section below. If you have questions about this blog post, start a new thread on Amazon Security Lake re:Post or contact AWS Support.

Want more AWS Security news? Follow us on Twitter.

Adam Plotzker

Adam Plotzker

Adam is currently a Security Engineer at AWS, working primarily on the Amazon Security Lake solution. One of the things he enjoys most about his work at AWS is his ability to be creative when exploring customer needs and coming up with unique solutions that meet those needs.

Join a streaming data source with CDC data for real-time serverless data analytics using AWS Glue, AWS DMS, and Amazon DynamoDB

Post Syndicated from Manish Kola original https://aws.amazon.com/blogs/big-data/join-streaming-source-cdc-glue/

Customers have been using data warehousing solutions to perform their traditional analytics tasks. Recently, data lakes have gained lot of traction to become the foundation for analytical solutions, because they come with benefits such as scalability, fault tolerance, and support for structured, semi-structured, and unstructured datasets.

Data lakes are not transactional by default; however, there are multiple open-source frameworks that enhance data lakes with ACID properties, providing a best of both worlds solution between transactional and non-transactional storage mechanisms.

Traditional batch ingestion and processing pipelines that involve operations such as data cleaning and joining with reference data are straightforward to create and cost-efficient to maintain. However, there is a challenge to ingest datasets, such as Internet of Things (IoT) and clickstreams, at a fast rate with near-real-time delivery SLAs. You will also want to apply incremental updates with change data capture (CDC) from the source system to the destination. To make data-driven decisions in a timely manner, you need to account for missed records and backpressure, and maintain event ordering and integrity, especially if the reference data also changes rapidly.

In this post, we aim to address these challenges. We provide a step-by-step guide to join streaming data to a reference table changing in real time using AWS Glue, Amazon DynamoDB, and AWS Database Migration Service (AWS DMS). We also demonstrate how to ingest streaming data to a transactional data lake using Apache Hudi to achieve incremental updates with ACID transactions.

Solution overview

For our example use case, streaming data is coming through Amazon Kinesis Data Streams, and reference data is managed in MySQL. The reference data is continuously replicated from MySQL to DynamoDB through AWS DMS. The requirement here is to enrich the real-time stream data by joining with the reference data in near-real time, and to make it queryable from a query engine such as Amazon Athena while keeping consistency. In this use case, reference data in MySQL can be updated when the requirement is changed, and then queries need to return results by reflecting updates in the reference data.

This solution addresses the issue of users wanting to join streams with changing reference datasets when the size of the reference dataset is small. The reference data is maintained in DynamoDB tables, and the streaming job loads the full table into memory for each micro-batch, joining a high-throughput stream to a small reference dataset.

The following diagram illustrates the solution architecture.

Architecture

Prerequisites

For this walkthrough, you should have the following prerequisites:

Create IAM roles and S3 bucket

In this section, you create an Amazon Simple Storage Service (Amazon S3) bucket and two AWS Identity and Access Management (IAM) roles: one for the AWS Glue job, and one for AWS DMS. We do this using an AWS CloudFormation template. Complete the following steps:

  1. Sign in to the AWS CloudFormation console.
  2. Choose Launch Stack::
  3. Choose Next.
  4. For Stack name, enter a name for your stack.
  5. For DynamoDBTableName, enter tgt_country_lookup_table. This is the name of your new DynamoDB table.
  6. For S3BucketNamePrefix, enter the prefix of your new S3 bucket.
  7. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  8. Choose Create stack.

Stack creation can take about 1 minute.

Create a Kinesis data stream

In this section, you create a Kinesis data stream:

  1. On the Kinesis console, choose Data streams in the navigation pane.
  2. Choose Create data stream.
  3. For Data stream name, enter your stream name.
  4. Leave the remaining settings as default and choose Create data stream.

A Kinesis data stream is created with on-demand mode.

Create and configure an Aurora MySQL cluster

In this section, you create and configure an Aurora MySQL cluster as the source database. First, configure your source Aurora MySQL database cluster to enable CDC through AWS DMS to DynamoDB.

Create a parameter group

Complete the following steps to create a new parameter group:

  1. On the Amazon RDS console, choose Parameter groups in the navigation pane.
  2. Choose Create parameter group.
  3. For Parameter group family, select aurora-mysql5.7.
  4. For Type, choose DB Cluster Parameter Group.
  5. For Group name, enter my-mysql-dynamodb-cdc.
  6. For Description, enter Parameter group for demo Aurora MySQL database.
  7. Choose Create.
  8. Select my-mysql-dynamodb-cdc, and choose Edit under Parameter group actions.
  9. Edit the parameter group as follows:
Name Value
binlog_row_image full
binlog_format ROW
binlog_checksum NONE
log_slave_updates 1
  1. Choose Save changes.

RDS parameter group

Create the Aurora MySQL cluster

Complete following steps to create the Aurora MySQL cluster:

  1. On the Amazon RDS console, choose Databases in the navigation pane.
  2. Choose Create database.
  3. For Choose a database creation method, choose Standard create.
  4. Under Engine options, for Engine type, choose Aurora (MySQL Compatible).
  5. For Engine version, choose Aurora (MySQL 5.7) 2.11.2.
  6. For Templates, choose Production.
  7. Under Settings, for DB cluster identifier, enter a name for your database.
  8. For Master username, enter your primary user name.
  9. For Master password and Confirm master password, enter your primary password.
  10. Under Instance configuration, for DB instance class, choose Burstable classes (includes t classes) and choose db.t3.small.
  11. Under Availability & durability, for Multi-AZ deployment, choose Don’t create an Aurora Replica.
  12. Under Connectivity, for Compute resource, choose Don’t connect to an EC2 compute resource.
  13. For Network type, choose IPv4.
  14. For Virtual private cloud (VPC), choose your VPC.
  15. For DB subnet group, choose your public subnet.
  16. For Public access, choose Yes.
  17. For VPC security group (firewall), choose the security group for your public subnet.
  18. Under Database authentication, for Database authentication options, choose Password authentication.
  19. Under Additional configuration, for DB cluster parameter group, choose the cluster parameter group you created earlier.
  20. Choose Create database.

Grant permissions to the source database

The next step is to grant the required permission on the source Aurora MySQL database. Now you can connect to the DB cluster using the MySQL utility. You can run queries to complete the following tasks:

  • Create a demo database and table and run queries on the data
  • Grant permission for a user used by the AWS DMS endpoint

Complete the following steps:

  1. Log in to the EC2 instance that you’re using to connect to your DB cluster.
  2. Enter the following command at the command prompt to connect to the primary DB instance of your DB cluster:
$ mysql -h mycluster.cluster-123456789012.us-east-1.rds.amazonaws.com -P 3306 -u admin -p
  1. Run the following SQL command to create a database:
> CREATE DATABASE mydev;
  1. Run the following SQL command to create a table:
> use mydev; 
> CREATE TABLE country_lookup_table
(
code varchar(5),
countryname varchar(40) not null,
combinedname varchar(40) not null
);
  1. Run the following SQL command to populate the table with data:
> INSERT INTO country_lookup_table(code, countryname, combinedname) VALUES ('IN', 'India', 'IN-India'), ('US', 'USA', 'US-USA'), ('CA', 'Canada', 'CA-Canada'), ('CN', 'China', 'CN-China');
  1. Run the following SQL command to create a user for the AWS DMS endpoint and grant permissions for CDC tasks (replace the placeholder with your preferred password):
> CREATE USER repl IDENTIFIED BY '<your-password>';
> GRANT REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'repl'@'%';
> GRANT SELECT ON mydev.country_lookup_table TO 'repl'@'%';

Create and configure AWS DMS resources to load data into the DynamoDB reference table

In this section, you create and configure AWS DMS to replicate data into the DynamoDB reference table.

Create an AWS DMS replication instance

First, create an AWS DMS replication instance by completing the following steps:

  1. On the AWS DMS console, choose Replication instances in the navigation pane.
  2. Choose Create replication instance.
  3. Under Settings, for Name, enter a name for your instance.
  4. Under Instance configuration, for High Availability, choose Dev or test workload (Single-AZ).
  5. Under Connectivity and security, for VPC security groups, choose default.
  6. Choose Create replication instance.

Create Amazon VPC endpoints

Optionally, you can create Amazon VPC endpoints for DynamoDB when you need to connect to your DynamoDB table from the AWS DMS instance in a private network. Also make sure that you enable Publicly accessible when you need to connect to a database outside of your VPC.

Create an AWS DMS source endpoint

Create an AWS DMS source endpoint by completing the following steps:

  1. On the AWS DMS console, choose Endpoints in the navigation pane.
  2. Choose Create endpoint.
  3. For Endpoint type, choose Source endpoint.
  4. Under Endpoint configuration, for Endpoint identifier, enter a name for your endpoint.
  5. For Source engine, choose Amazon Aurora MySQL.
  6. For Access to endpoint database, choose Provide access information manually.
  7. For Server Name, enter the endpoint name of your Aurora writer instance (for example, mycluster.cluster-123456789012.us-east-1.rds.amazonaws.com).
  8. For Port, enter 3306.
  9. For User name, enter a user name for your AWS DMS task.
  10. For Password, enter a password.
  11. Choose Create endpoint.

Crate an AWS DMS target endpoint

Create an AWS DMS target endpoint by completing the following steps:

  1. On the AWS DMS console, choose Endpoints in the navigation pane.
  2. Choose Create endpoint.
  3. For Endpoint type, choose Target endpoint.
  4. Under Endpoint configuration, for Endpoint identifier, enter a name for your endpoint.
  5. For Target engine, choose Amazon DynamoDB.
  6. For Service access role ARN, enter the IAM role for your AWS DMS task.
  7. Choose Create endpoint.

Create AWS DMS migration tasks

Create AWS DMS database migration tasks by completing the following steps:

  1. On the AWS DMS console, choose Database migration tasks in the navigation pane.
  2. Choose Create task.
  3. Under Task configuration, for Task identifier, enter a name for your task.
  4. For Replication instance, choose your replication instance.
  5. For Source database endpoint, choose your source endpoint.
  6. For Target database endpoint, choose your target endpoint.
  7. For Migration type, choose Migrate existing data and replicate ongoing changes.
  8. Under Task settings, for Target table preparation mode, choose Do nothing.
  9. For Stop task after full load completes, choose Don’t stop.
  10. For LOB column settings, choose Limited LOB mode.
  11. For Task logs, enable Turn on CloudWatch logs and Turn on batch-optimized apply.
  12. Under Table mappings, choose JSON Editor and enter the following rules.

Here you can add values to the column. With the following rules, the AWS DMS CDC task will first create a new DynamoDB table with the specified name in target-table-name. Then it will replicate all the records, mapping the columns in the DB table to the attributes in the DynamoDB table.

{
    "rules": [
        {
            "rule-type": "selection",
            "rule-id": "1",
            "rule-name": "1",
            "object-locator": {
                "schema-name": "mydev",
                "table-name": "country_lookup_table"
            },
            "rule-action": "include"
        },
        {
            "rule-type": "object-mapping",
            "rule-id": "2",
            "rule-name": "2",
            "rule-action": "map-record-to-record",
            "object-locator": {
                "schema-name": "mydev",
                "table-name": "country_lookup_table"
            },
            "target-table-name": "tgt_country_lookup_table",
            "mapping-parameters": {
                "partition-key-name": "code",
                "sort-key-name": "countryname",
                "exclude-columns": [
                    "code",
                    "countryname"
                ],
                "attribute-mappings": [
                    {
                        "target-attribute-name": "code",
                        "attribute-type": "scalar",
                        "attribute-sub-type": "string",
                        "value": "${code}"
                    },
                    {
                        "target-attribute-name": "countryname",
                        "attribute-type": "scalar",
                        "attribute-sub-type": "string",
                        "value": "${countryname}"
                    }
                ],
                "apply-during-cdc": true
            }
        }
    ]
}

DMS table mapping

  1. Choose Create task.

Now the AWS DMS replication task has been started.

  1. Wait for the Status to show as Load complete.

DMS task

  1. On the DynamoDB console, choose Tables in the navigation pane.
  2. Select the DynamoDB reference table, and choose Explore table items to review the replicated records.

DynamoDB reference table initial

Create an AWS Glue Data Catalog table and an AWS Glue streaming ETL job

In this section, you create an AWS Glue Data Catalog table and an AWS Glue streaming extract, transform, and load (ETL) job.

Create a Data Catalog table

Create an AWS Glue Data Catalog table for the source Kinesis data stream with the following steps:

  1. On the AWS Glue console, choose Databases under Data Catalog in the navigation pane.
  2. Choose Add database.
  3. For Name, enter my_kinesis_db.
  4. Choose Create database.
  5. Choose Tables under Databases, then choose Add table.
  6. For Name, enter my_stream_src_table.
  7. For Database, choose my_kinesis_db.
  8. For Select the type of source, choose Kinesis.
  9. For Kinesis data stream is located in, choose my account.
  10. For Kinesis stream name, enter a name for your data stream.
  11. For Classification, select JSON.
  12. Choose Next.
  13. Choose Edit schema as JSON, enter the following JSON, then choose Save.
[
  {
    "Name": "uuid",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "country",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "itemtype",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "saleschannel",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "orderpriority",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "orderdate",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "region",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "shipdate",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "unitssold",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "unitprice",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "unitcost",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "totalrevenue",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "totalcost",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "totalprofit",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "impressiontime",
    "Type": "string",
    "Comment": ""
  }
]

Glue Catalog table schema

    1. Choose Next, then choose Create.

Create an AWS Glue streaming ETL job

Next, you create an AWS Glue streaming job. AWS Glue 3.0 and later supports Apache Hudi natively, so we use this native integration to ingest into a Hudi table. Complete the following steps to create the AWS Glue streaming job:

  1. On the AWS Glue Studio console, choose Spark script editor and choose Create.
  2. Under Job details tab, for Name, enter a name for your job.
  3. For IAM Role, choose the IAM role for your AWS Glue job.
  4. For Type, select Spark Streaming.
  5. For Glue version, choose Glue 4.0 – Supports spark 3.3, Scala 2, Python 3.
  6. For Requested number of workers, enter 3.
  7. Under Advanced properties, for Job parameters, choose Add new parameter.
  8. For Key, enter --conf.
  9. For Value, enter spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false.
  10. Choose Add new parameter.
  11. For Key, enter --datalake-formats.
  12. For Value, enter hudi.
  13. For Script path, enter s3://<S3BucketName>/scripts/.
  14. For Temporary path, enter s3://<S3BucketName>/temporary/.
  15. Optionally, for Spark UI logs path, enter s3://<S3BucketName>/sparkHistoryLogs/.

Glue job parameter

  1. On the Script tab, enter the following script into the AWS Glue Studio editor and choose Create.

The near-real-time streaming job enriches data by joining a Kinesis data stream with a DynamoDB table that contains frequently updated reference data. The enriched dataset is loaded into the target Hudi table in the data lake. Replace <S3BucketName> with your bucket that you created via AWS CloudFormation:

import sys, json
import boto3
from pyspark.sql import DataFrame, Row
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv,["JOB_NAME"])

# Initialize spark session and Glue context
sc = SparkContext() 
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# job paramters
dydb_lookup_table = "tgt_country_lookup_table"
kin_src_database_name = "my_kinesis_db" 
kin_src_table_name = "my_stream_src_table" 
hudi_write_operation = "upsert" 
hudi_record_key = "uuid" 
hudi_precomb_key = "orderdate" 
checkpoint_path = "s3://<S3BucketName>/streamlab/checkpoint/" 
s3_output_folder = "s3://<S3BucketName>/output/"
hudi_table = "hudi_table"
hudi_database = "my_kinesis_db"

# hudi options 
additional_options={
    "hoodie.datasource.hive_sync.use_jdbc": "false",
    "hoodie.datasource.write.recordkey.field": hudi_record_key,
    "hoodie.datasource.hive_sync.database": hudi_database,
    "hoodie.table.name": hudi_table,
    "hoodie.consistency.check.enabled": "true",
    "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.NonpartitionedKeyGenerator",
    "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.NonPartitionedExtractor",
    "hoodie.datasource.write.hive_style_partitioning": "false",
    "hoodie.datasource.write.precombine.field": hudi_precomb_key,
    "hoodie.bulkinsert.shuffle.parallelism": "4",
    "hoodie.datasource.hive_sync.enable": "true",
    "hoodie.datasource.write.operation": hudi_write_operation,
    "hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
}

# Scan and load the reference data table from DynamoDB into AWS Glue DynamicFrames using boto3 API.
def readDynamoDb():
    dynamodb = boto3.resource(“dynamodb”)
    table = dynamodb.Table(dydb_lookup_table)
    response = table.scan()
    items = response[“Items”]
    jsondata = sc.parallelize(items)
    lookupDf = glueContext.read.json(jsondata)
    return lookupDf


# Load the Amazon Kinesis data stream from Amazon Glue Data Catalog.
source_df = glueContext.create_data_frame.from_catalog(
    database=kin_src_database_name,
    table_name=kin_src_table_name,
    transformation_ctx=”source_df”,
    additional_options={“startingPosition”: “TRIM_HORIZON”},
)

# As part of batch processing, implement the transformation logic for joining streaming data frames with reference data frames.
def processBatch(data_frame, batchId):
    if data_frame.count() > 0:

        # Refresh the dymanodb table to pull latest snapshot for each microbatch
        country_lookup_df = readDynamoDb() 
                
        final_frame = data_frame.join(
            country_lookup_df, 
            data_frame["country"] == country_lookup_df["countryname"], 
            'left'
        ).drop(
            "countryname",
            "country",
            "unitprice", 
            "unitcost",
            "totalrevenue",
            "totalcost",
            "totalprofit"
        )

        # Script generated for node my-lab-hudi-connector
        final_frame.write.format("hudi") \
            .options(**additional_options) \
            .mode("append") \
            .save(s3_output_folder)
        
try:
    glueContext.forEachBatch(
        frame=source_df,
        batch_function=processBatch,
        options={"windowSize": "60 seconds", "checkpointLocation": checkpoint_path},
    )
except Exception as e:
    print(f"Error is @@@ ....{e}")
  1. Choose Run to start the streaming job.

The following screenshot shows examples of the DataFrames data_frame, country_lookup_df, and final_frame.

Glue job log output initial

The AWS Glue job successfully joined records coming from the Kinesis data stream and the reference table in DynamoDB, and then ingested the joined records into Amazon S3 in Hudi format.

Create and run a Python script to generate sample data and load it into the Kinesis data stream

In this section, you create and run a Python to generate sample data and load it into the source Kinesis data stream. Complete the following steps:

  1. Log in to AWS Cloud9, your EC2 instance, or any other computing host that puts records in your data stream.
  2. Create a Python file called generate-data-for-kds.py:
$ python3 generate-data-for-kds.py
  1. Open the Python file and enter the following script:
import json
import random
import boto3
import time

STREAM_NAME = "<mystreamname>"

def get_data():
    return {
        "uuid": random.randrange(0, 1000001, 1),
        "country": random.choice( [ "United Arab Emirates", "China", "India", "United Kingdom", "United States of America", ] ),
        "itemtype": random.choice( [ "Snacks", "Cereals", "Cosmetics", "Fruits", "Clothes", "Babycare", "Household", ] ),
        "saleschannel": random.choice( [ "Snacks", "Cereals", "Cosmetics", "Fruits", "Clothes", "Babycare", "Household", ] ),
        "orderpriority": random.choice(["H", "L", "M", "C"]),
        "orderdate": random.choice( [ "1/4/10", "2/28/10", "2/15/11", "11/8/11", "2/1/12", "2/18/12", "3/1/12", "9/24/12",
                                      "10/13/12", "12/2/12", "12/29/12", "3/30/13", "7/29/13", "3/23/14", "6/14/14", 
                                      "7/15/14", "10/19/14", "5/7/15", "10/11/15", "11/22/15", "8/23/16", "1/15/17", 
                                      "1/27/17", "2/25/17", "3/10/17", "4/1/17", ] ),
        "region": random.choice( ["Asia" "Europe", "Americas", "Middle Eastern", "Africa"] ),
        "shipdate": random.choice( [ "1/4/10", "2/28/10", "2/15/11", "11/8/11", "2/1/12", "2/18/12", "3/1/12", "9/24/12", 
                                    "10/13/12", "12/2/12", "12/29/12", "3/30/13", "7/29/13", "3/23/14", "6/14/14", "7/15/14",
                                      "10/19/14", "5/7/15", "10/11/15", "11/22/15", "8/23/16", "1/15/17", "1/27/17", 
                                      "2/25/17", "3/10/17", "4/1/17", ] ),
        "unitssold": random.choice( [ "8217", "3465", "8877", "2882", "70", "7044", "6307", "2384", "1327", "2572", "8794", 
                                     "4131", "5793", "9091", "4314", "9085", "5270", "5459", "1982", "8245", "4860", "4656", 
                                     "8072", "65", "7864", "9778", ] ),
        "unitprice": random.choice( [ "97.44", "117.11", "364.69", "502.54", "263.33", "117.11", "35.84", "6.92", "35.84", 
                                     "6.92", "35.84", "56.67", "159.42", "502.54", "117.11", "56.67", "524.96", "502.54", 
                                     "56.67", "56.67", "159.42", "56.67", "35.84", "159.42", "502.54", "31.79", ] ),
        "unitcost": random.choice( [ "97.44", "117.11", "364.69", "502.54", "263.33", "117.11", "35.84", "6.92", "35.84", 
                                    "6.92", "35.84", "56.67", "159.42", "502.54", "117.11", "56.67", "524.96", "502.54", 
                                    "56.67", "56.67", "159.42", "56.67", "35.84", "159.42", "502.54", "31.79", ] ),
        "totalrevenue": random.choice( [ "1253749.86", "712750.5", "3745117.53", "1925954.14", "30604", "1448950.8", 
                                        "689228.96", "22242.72", "145014.56", "23996.76", "961008.32", "337626.63", 
                                        "1478837.04", "6075242.57", "887389.8", "742517.05", "3431876.7", "3648085.93", 
                                        "161988.86", "673863.85", "1240660.8", "380534.88", "882108.16", "16593.2", 
                                        "5255275.28", "463966.1", ] ),
        "totalcost": random.choice( [ "800664.48", "405786.15", "3237353.13", "1448320.28", "18433.1", "824922.84", 
                                     "226042.88", "16497.28", "47559.68", "17798.24", "315176.96", "234103.77", "923520.06",
                                       "4568591.14", "505212.54", "514846.95", "2766539.2", "2743365.86", 
                                       "112319.94", "467244.15", "774781.2", "263855.52", "289300.48", "10362.3", 
                                       "3951974.56", "310842.62", ] ),
        "totalprofit": random.choice( [ "453085.38", "306964.35", "507764.4", "477633.86", "12170.9", "624027.96", 
                                       "463186.08", "5745.44", "97454.88", "6198.52", "645831.36", "103522.86", "555316.98", 
                                       "1506651.43", "382177.26", "227670.1", "665337.5", "904720.07", "49668.92", "206619.7",
                                         "465879.6", "116679.36", "592807.68", "6230.9", "1303300.72", "153123.48", ] ),
        "impressiontime": random.choice( [ "2022-10-24T02:27:41Z", "2022-10-24T02:27:41Z", "2022-11-24T02:27:41Z", 
                                          "2022-12-24T02:27:41Z", "2022-13-24T02:27:41Z", "2022-14-24T02:27:41Z", 
                                          "2022-15-24T02:27:41Z", ] ),
    }

def generate(stream_name, kinesis_client):
    while True:
        data = get_data()
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
        )
        time.sleep(2)

if __name__ == "__main__":
    generate(STREAM_NAME, boto3.client("kinesis"))

This script puts a Kinesis data stream record every 2 seconds.

Simulate updating the reference table in the Aurora MySQL cluster

Now all the resources and configurations are ready. For this example, we want to add a 3-digit country code to the reference table. Let’s update records in the Aurora MySQL table to simulate changes. Complete the following steps:

  1. Make sure that the AWS Glue streaming job is already running.
  2. Connect to the primary DB instance again, as described earlier.
  3. Enter your SQL commands to update records:
> UPDATE country_lookup_table SET combinedname='US-USA-US' WHERE code='US';
> UPDATE country_lookup_table SET combinedname='CA-CAN-Canada' WHERE code='CA';
> UPDATE country_lookup_table SET combinedname='CN-CHN-China' WHERE code='CN';
> UPDATE country_lookup_table SET combinedname='IN-IND-India' WHERE code='IN';

Now the reference table in the Aurora MySQL source database has been updated. Then the changes are automatically replicated to the reference table in DynamoDB.

DynamoDB reference table updated

The following tables show records in data_frame, country_lookup_df, and final_frame. In country_lookup_df and final_frame, the combinedname column has values formatted as <2-digit-country-code>-<3-digit-country-code>-<country-name>, which shows that the changed records in the referenced table are reflected in the table without restarting the AWS Glue streaming job. It means that the AWS Glue job successfully joins the incoming records from the Kinesis data stream with the reference table even when the reference table is changing.
Glue job log output updated

Query the Hudi table using Athena

Let’s query the Hudi table using Athena to see the records in the destination table. Complete the following steps:

  1. Make sure that the script and the AWS Glue Streaming job is still working:
    1. The Python script (generate-data-for-kds.py) is still running.
    2. The generated data is being sent to the data stream.
    3. The AWS Glue streaming job is still running.
  2. On the Athena console, run the following SQL in the query editor:
select shipdate, unitssold, impressiontime, code,combinedname from <database>.<table>
where combinedname is not null
limit 10;

The following query result shows the records that are processed before the referenced table was changed. Records in the combinedname column are similar to <2-digit-country-code>-<country-name>.

Athena query result initial

The following query result shows the records that are processed after the referenced table was changed. Records in the combinedname column are similar to <2-digit-country-code>-<3-digit-country-code>-<country-name>.

Athena query result updated

Now you understand that the changed reference data is successfully reflected in the target Hudi table joining records from the Kinesis data stream and the reference data in DynamoDB.

Clean up

As the final step, clean up the resources:

  1. Delete the Kinesis data stream.
  2. Delete the AWS DMS migration task, endpoint, and replication instance.
  3. Stop and delete the AWS Glue streaming job.
  4. Delete the AWS Cloud9 environment.
  5. Delete the CloudFormation template.

Conclusion

Building and maintaining a transactional data lake that involves real-time data ingestion and processing has multiple variable components and decisions to be made, such as what ingestion service to use, how to store your reference data, and what transactional data lake framework to use. In this post, we provided the implementation details of such a pipeline, using AWS native components as the building blocks and Apache Hudi as the open-source framework for a transactional data lake.

We believe that this solution can be a starting point for organizations looking to implement a new data lake with such requirements. Additionally, the different components are fully pluggable and can be mixed and matched to existing data lakes to target new requirements or migrate existing ones, addressing their pain points.


About the authors

Manish Kola is a Data Lab Solutions Architect at AWS, where he works closely with customers across various industries to architect cloud-native solutions for their data analytics and AI needs. He partners with customers on their AWS journey to solve their business problems and build scalable prototypes. Before joining AWS, Manish’s experience includes helping customers implement data warehouse, BI, data integration, and data lake projects.

Santosh Kotagiri is a Solutions Architect at AWS with experience in data analytics and cloud solutions leading to tangible business results. His expertise lies in designing and implementing scalable data analytics solutions for clients across industries, with a focus on cloud-native and open-source services. He is passionate about leveraging technology to drive business growth and solve complex problems.

Chiho Sugimoto is a Cloud Support Engineer on the AWS Big Data Support team. She is passionate about helping customers build data lakes using ETL workloads. She loves planetary science and enjoys studying the asteroid Ryugu on weekends.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his new road bike.

Automate alerting and reporting for AWS Glue job resource usage

Post Syndicated from Michael Hamilton original https://aws.amazon.com/blogs/big-data/automate-alerting-and-reporting-for-aws-glue-job-resource-usage/

Data transformation plays a pivotal role in providing the necessary data insights for businesses in any organization, small and large. To gain these insights, customers often perform ETL (extract, transform, and load) jobs from their source systems and output an enriched dataset. Many organizations today are using AWS Glue to build ETL pipelines that bring data from disparate sources and store the data in repositories like a data lake, database, or data warehouse for further consumption. These organizations are looking for ways they can reduce cost across their IT environments and still be operationally performant and efficient.

Picture a scenario where you, the VP of Data and Analytics, are in charge of your data and analytics environments and workloads running on AWS where you manage a team of data engineers and analysts. This team is allowed to create AWS Glue for Spark jobs in development, test, and production environments. During testing, one of the jobs wasn’t configured to automatically scale its compute resources, resulting in jobs timing out, costing the organization more than anticipated. The next steps usually include completing an analysis of the jobs, looking at cost reports to see which account generated the spike in usage, going through logs to see when what happened with the job, and so on. After the ETL job has been corrected, you may want to implement monitoring and set standard alert thresholds for your AWS Glue environment.

This post will help organizations proactively monitor and cost optimize their AWS Glue environments by providing an easier path for teams to measure efficiency of their ETL jobs and align configuration details according to organizational requirements. Included is a solution you will be able to deploy that will notify your team via email about any Glue job that has been configured incorrectly. Additionally, a weekly report is generated and sent via email that aggregates resource usage and provides cost estimates per job.

AWS Glue cost considerations

AWS Glue for Apache Spark jobs are provisioned with a number of workers and a worker type. These jobs can be either G.1X, G.2X, G.4X, G.8X or Z.2X (Ray) worker types that map to data processing units (DPUs). DPUs include a certain amount of CPU, memory, and disk space. The following table contains more details.

Worker Type DPUs vCPUs Memory (GB) Disk (GB)
G.1X 1 4 16 64
G.2X 2 8 32 128
G.4X 4 16 64 256
G.8X 8 32 128 512
Z.2X 2 8 32 128

For example, if a job is provisioned with 10 workers as G.1X worker type, the job will have access to 40 vCPU and 160 GB of RAM to process data and double using G.2X. Over-provisioning workers can lead to increased cost, due to not all workers being utilized efficiently.

In April 2022, Auto Scaling for AWS Glue was released for AWS Glue version 3.0 and later, which includes AWS Glue for Apache Spark and streaming jobs. Enabling auto scaling on your Glue for Apache Spark jobs will allow you to only allocate workers as needed, up to the worker maximum you specify. We recommend enabling auto scaling for your AWS Glue 3.0 & 4.0 jobs because this feature will help reduce cost and optimize your ETL jobs.

Amazon CloudWatch metrics are also a great way to monitor your AWS Glue environment by creating alarms for certain metrics like average CPU or memory usage. To learn more about how to use CloudWatch metrics with AWS Glue, refer to Monitoring AWS Glue using Amazon CloudWatch metrics.

The following solution provides a simple way to set AWS Glue worker and job duration thresholds, configure monitoring, and receive emails for notifications on how your AWS Glue environment is performing. If a Glue job finishes and detects worker or job duration thresholds were exceeded, it will notify you after the job run has completed, failed, or timed out.

Solution overview

The following diagram illustrates the solution architecture.

Solution Architecture

When you deploy this application via AWS Serverless Application Model (AWS SAM), it will ask what AWS Glue worker and job duration thresholds you would like to set to monitor the AWS Glue for Apache Spark and AWS Glue for Ray jobs running in that account. The solution will use these values as the decision criteria when invoked. The following is a breakdown of each step in the architecture:

  1. Any AWS Glue for Apache Spark job that succeeds, fails, stops, or times out is sent to Amazon EventBridge.
  2. EventBridge picks up the event from AWS Glue and triggers an AWS Lambda function.
  3. The Lambda function processes the event and determines if the data and analytics team should be notified about the particular job run. The function performs the following tasks:
    1. The function sends an email using Amazon Simple Notification Service (Amazon SNS) if needed.
      • If the AWS Glue job succeeded or was stopped without going over the worker or job duration thresholds, or is tagged to not be monitored, no alerts or notifications are sent.
      • If the job succeeded but ran with a worker or job duration thresholds higher than allowed, or the job either failed or timed out, Amazon SNS sends a notification to the designated email with information about the AWS Glue job, run ID, and reason for alerting, along with a link to the specific run ID on the AWS Glue console.
    2. The function logs the job run information to Amazon DynamoDB for a weekly aggregated report delivered to email. The Dynamo table has Time to Live enabled for 7 days, which keeps the storage to minimum.
  4. Once a week, the data within DynamoDB is aggregated by a separate Lambda function with meaningful information like longest-running jobs, number of retries, failures, timeouts, cost analysis, and more.
  5. Amazon Simple Email Service (Amazon SES) is used to deliver the report because it can be better formatted than using Amazon SNS. The email is formatted via HTML output that provides tables for the aggregated job run data.
  6. The data and analytics team is notified about the ongoing job runs through Amazon SNS, and they receive the weekly aggregation report through Amazon SES.

Note that AWS Glue Python shell and streaming ETL jobs are not supported because they’re not in scope of this solution.

Prerequisites

You must have the following prerequisites:

  • An AWS account to deploy the solution to
  • Proper AWS Identity and Access Management (IAM) privileges to create the resources
  • The AWS SAM CLI to build and deploy the solution button below, to run template on your AWS environment

Deploy the solution

This AWS SAM application provisions the following resources:

  • Two EventBridge rules
  • Two Lambda functions
  • An SNS topic and subscription
  • A DynamoDB table
  • An SES subscription
  • The required IAM roles and policies

To deploy the AWS SAM application, complete the following steps:

Clone the aws-samples GitHub repository:

git clone https://github.com/aws-samples/aws-glue-job-tracker.git

Deploy the AWS SAM application:

cd aws-glue-job-tracker
sam deploy --guided

sam deploy configuration

Provide the following parameters:

  • GlueJobWorkerThreshold – Enter the maximum number of workers you want an AWS Glue job to be able to run with before sending threshold alert. The default is 10. An alert will be sent if a Glue job runs with higher workers than specified.
  • GlueJobDurationThreshold – Enter the maximum duration in minutes you want an AWS Glue job to run before sending threshold alert. The default is 480 minutes (8 hours). An alert will be sent if a Glue job runs with higher job duration than specified.
  • GlueJobNotifications – Enter an email or distribution list of those who need to be notified through Amazon SNS and Amazon SES. You can go to the SNS topic after the deployment is complete and add emails as needed.

To receive emails from Amazon SNS and Amazon SES, you must confirm your subscriptions. After the stack is deployed, check your email that was specified in the template and confirm by choosing the link in each message. When the application is successfully provisioned, it will begin monitoring your AWS Glue for Apache Spark job environment. The next time a job fails, times out, or exceeds a specified threshold, you will receive an email via Amazon SNS. For example, the following screenshot shows an SNS message about a job that succeeded but had a job duration threshold violation.

You might have jobs that need to run at a higher worker or job duration threshold, and you don’t want the solution to evaluate them. You can simply tag that job with the key/value of remediate and false. The step function will still be invoked, but will use the PASS state when it recognizes the tag. For more information on job tagging, refer to AWS tags in AWS Glue.

Adding tags to glue job configuration

Configure weekly reporting

As mentioned previously, when an AWS Glue for Apache Spark job succeeds, fails, times out, or is stopped, EventBridge forwards this event to Lambda, where it logs specific information about each job run. Once a week, a separate Lambda function queries DynamoDB and aggregates your job runs to provide meaningful insights and recommendations about your AWS Glue for Apache Spark environment. This report is sent via email with a tabular structure as shown in the following screenshot. It’s meant for top-level visibility so you’re able to see your longest job runs over time, jobs that have had many retries, failures, and more. It also provides an overall cost calculation as an estimate of what each AWS Glue job will cost for that week. It should not be used as a guaranteed cost. If you would like to see exact cost per job, the AWS Cost and Usage Report is the best resource to use. The following screenshot shows one table (of five total) from the AWS Glue report function.

weekly report

Clean up

If you don’t want to run the solution anymore, delete the AWS SAM application for each account that it was provisioned in. To delete your AWS SAM stack, run the following command from your project directory:

sam delete

Conclusion

In this post, we discussed how you can monitor and cost-optimize your AWS Glue job configurations to comply with organizational standards and policy. This method can provide cost controls over AWS Glue jobs across your organization. Some other ways to help control the costs of your AWS Glue for Apache Spark jobs include the newly released AWS Glue Flex jobs and Auto Scaling. We also provided an AWS SAM application as a solution to deploy into your accounts. We encourage you to review the resources provided in this post to continue learning about AWS Glue. To learn more about monitoring and optimizing for cost using AWS Glue, please visit this recent blog. It goes in depth on all of the cost optimization options and includes a template that builds a CloudWatch dashboard for you with metrics about all of your Glue job runs.


About the authors

Michael Hamilton is a Sr Analytics Solutions Architect focusing on helping enterprise customers in the south east modernize and simplify their analytics workloads on AWS. He enjoys mountain biking and spending time with his wife and three children when not working.

Angus Ferguson is a Solutions Architect at AWS who is passionate about meeting customers across the world, helping them solve their technical challenges. Angus specializes in Data & Analytics with a focus on customers in the financial services industry.

Simplify AWS Glue job orchestration and monitoring with Amazon MWAA

Post Syndicated from Rushabh Lokhande original https://aws.amazon.com/blogs/big-data/simplify-aws-glue-job-orchestration-and-monitoring-with-amazon-mwaa/

Organizations across all industries have complex data processing requirements for their analytical use cases across different analytics systems, such as data lakes on AWS, data warehouses (Amazon Redshift), search (Amazon OpenSearch Service), NoSQL (Amazon DynamoDB), machine learning (Amazon SageMaker), and more. Analytics professionals are tasked with deriving value from data stored in these distributed systems to create better, secure, and cost-optimized experiences for their customers. For example, digital media companies seek to combine and process datasets in internal and external databases to build unified views of their customer profiles, spur ideas for innovative features, and increase platform engagement.

In these scenarios, customers looking for a serverless data integration offering use AWS Glue as a core component for processing and cataloging data. AWS Glue is well integrated with AWS services and partner products, and provides low-code/no-code extract, transform, and load (ETL) options to enable analytics, machine learning (ML), or application development workflows. AWS Glue ETL jobs may be one component in a more complex pipeline. Orchestrating the run of and managing dependencies between these components is a key capability in a data strategy. Amazon Managed Workflows for Apache Airflows (Amazon MWAA) orchestrates data pipelines using distributed technologies including on-premises resources, AWS services, and third-party components.

In this post, we show how to simplify monitoring an AWS Glue job orchestrated by Airflow using the latest features of Amazon MWAA.

Overview of solution

This post discusses the following:

  • How to upgrade an Amazon MWAA environment to version 2.4.3.
  • How to orchestrate an AWS Glue job from an Airflow Directed Acyclic Graph (DAG).
  • The Airflow Amazon provider package’s observability enhancements in Amazon MWAA. You can now consolidate run logs of AWS Glue jobs on the Airflow console to simplify troubleshooting data pipelines. The Amazon MWAA console becomes a single reference to monitor and analyze AWS Glue job runs. Previously, support teams needed to access the AWS Management Console and take manual steps for this visibility. This feature is available by default from Amazon MWAA version 2.4.3.

The following diagram illustrates our solution architecture.

Prerequisites

You need the following prerequisites:

Set up the Amazon MWAA environment

For instructions on creating your environment, refer to Create an Amazon MWAA environment. For existing users, we recommend upgrading to version 2.4.3 to take advantage of the observability enhancements featured in this post.

The steps to upgrade Amazon MWAA to version 2.4.3 differ depending on whether the current version is 1.10.12 or 2.2.2. We discuss both options in this post.

Prerequisites for setting up an Amazon MWAA environment

You must meet the following prerequisites:

Upgrade from version 1.10.12 to 2.4.3

If you’re using Amazon MWAA version 1.10.12, refer to Migrating to a new Amazon MWAA environment to upgrade to 2.4.3.

Upgrade from version 2.0.2 or 2.2.2 to 2.4.3

If you’re using Amazon MWAA environment version 2.2.2 or lower, complete the following steps:

  1. Create a requirements.txt for any custom dependencies with specific versions required for your DAGs.
  2. Upload the file to Amazon S3 in the appropriate location where the Amazon MWAA environment points to the requirements.txt for installing dependencies.
  3. Follow the steps in Migrating to a new Amazon MWAA environment and select version 2.4.3.

Update your DAGs

Customers who upgraded from an older Amazon MWAA environment may need to make updates to existing DAGs. In Airflow version 2.4.3, the Airflow environment will use the Amazon provider package version 6.0.0 by default. This package may include some potentially breaking changes, such as changes to operator names. For example, the AWSGlueJobOperator has been deprecated and replaced with the GlueJobOperator. To maintain compatibility, update your Airflow DAGs by replacing any deprecated or unsupported operators from previous versions with the new ones. Complete the following steps:

  1. Navigate to Amazon AWS Operators.
  2. Select the appropriate version installed in your Amazon MWAA instance (6.0.0. by default) to find a list of supported Airflow operators.
  3. Make the necessary changes in the existing DAG code and upload the modified files to the DAG location in Amazon S3.

Orchestrate the AWS Glue job from Airflow

This section covers the details of orchestrating an AWS Glue job within Airflow DAGs. Airflow eases the development of data pipelines with dependencies between heterogeneous systems such as on-premises processes, external dependencies, other AWS services, and more.

Orchestrate CloudTrail log aggregation with AWS Glue and Amazon MWAA

In this example, we go through a use case of using Amazon MWAA to orchestrate an AWS Glue Python Shell job that persists aggregated metrics based on CloudTrail logs.

CloudTrail enables visibility into AWS API calls that are being made in your AWS account. A common use case with this data would be to gather usage metrics on principals acting on your account’s resources for auditing and regulatory needs.

As CloudTrail events are being logged, they are delivered as JSON files in Amazon S3, which aren’t ideal for analytical queries. We want to aggregate this data and persist it as Parquet files to allow for optimal query performance. As an initial step, we can use Athena to do the initial querying of the data before doing additional aggregations in our AWS Glue job. For more information about creating an AWS Glue Data Catalog table, refer to Creating the table for CloudTrail logs in Athena using partition projection data. After we’ve explored the data via Athena and decided what metrics we want to retain in aggregate tables, we can create an AWS Glue job.

Create an CloudTrail table in Athena

First, we need to create a table in our Data Catalog that allows CloudTrail data to be queried via Athena. The following sample query creates a table with two partitions on the Region and date (called snapshot_date). Be sure to replace the placeholders for your CloudTrail bucket, AWS account ID, and CloudTrail table name:

create external table if not exists `<<<CLOUDTRAIL_TABLE_NAME>>>`(
  `eventversion` string comment 'from deserializer', 
  `useridentity` struct<type:string,principalid:string,arn:string,accountid:string,invokedby:string,accesskeyid:string,username:string,sessioncontext:struct<attributes:struct<mfaauthenticated:string,creationdate:string>,sessionissuer:struct<type:string,principalid:string,arn:string,accountid:string,username:string>>> comment 'from deserializer', 
  `eventtime` string comment 'from deserializer', 
  `eventsource` string comment 'from deserializer', 
  `eventname` string comment 'from deserializer', 
  `awsregion` string comment 'from deserializer', 
  `sourceipaddress` string comment 'from deserializer', 
  `useragent` string comment 'from deserializer', 
  `errorcode` string comment 'from deserializer', 
  `errormessage` string comment 'from deserializer', 
  `requestparameters` string comment 'from deserializer', 
  `responseelements` string comment 'from deserializer', 
  `additionaleventdata` string comment 'from deserializer', 
  `requestid` string comment 'from deserializer', 
  `eventid` string comment 'from deserializer', 
  `resources` array<struct<arn:string,accountid:string,type:string>> comment 'from deserializer', 
  `eventtype` string comment 'from deserializer', 
  `apiversion` string comment 'from deserializer', 
  `readonly` string comment 'from deserializer', 
  `recipientaccountid` string comment 'from deserializer', 
  `serviceeventdetails` string comment 'from deserializer', 
  `sharedeventid` string comment 'from deserializer', 
  `vpcendpointid` string comment 'from deserializer')
PARTITIONED BY ( 
  `region` string,
  `snapshot_date` string)
ROW FORMAT SERDE 
  'com.amazon.emr.hive.serde.CloudTrailSerde' 
STORED AS INPUTFORMAT 
  'com.amazon.emr.cloudtrail.CloudTrailInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://<<<CLOUDTRAIL_BUCKET>>>/AWSLogs/<<<ACCOUNT_ID>>>/CloudTrail/'
TBLPROPERTIES (
  'projection.enabled'='true', 
  'projection.region.type'='enum',
  'projection.region.values'='us-east-2,us-east-1,us-west-1,us-west-2,af-south-1,ap-east-1,ap-south-1,ap-northeast-3,ap-northeast-2,ap-southeast-1,ap-southeast-2,ap-northeast-1,ca-central-1,eu-central-1,eu-west-1,eu-west-2,eu-south-1,eu-west-3,eu-north-1,me-south-1,sa-east-1',
  'projection.snapshot_date.format'='yyyy/mm/dd', 
  'projection.snapshot_date.interval'='1', 
  'projection.snapshot_date.interval.unit'='days', 
  'projection.snapshot_date.range'='2020/10/01,now', 
  'projection.snapshot_date.type'='date',
  'storage.location.template'='s3://<<<CLOUDTRAIL_BUCKET>>>/AWSLogs/<<<ACCOUNT_ID>>>/CloudTrail/${region}/${snapshot_date}')

Run the preceding query on the Athena console, and note the table name and AWS Glue Data Catalog database where it was created. We use these values later in the Airflow DAG code.

Sample AWS Glue job code

The following code is a sample AWS Glue Python Shell job that does the following:

  • Takes arguments (which we pass from our Amazon MWAA DAG) on what day’s data to process
  • Uses the AWS SDK for Pandas to run an Athena query to do the initial filtering of the CloudTrail JSON data outside AWS Glue
  • Uses Pandas to do simple aggregations on the filtered data
  • Outputs the aggregated data to the AWS Glue Data Catalog in a table
  • Uses logging during processing, which will be visible in Amazon MWAA
import awswrangler as wr
import pandas as pd
import sys
import logging
from awsglue.utils import getResolvedOptions
from datetime import datetime, timedelta

# Logging setup, redirects all logs to stdout
LOGGER = logging.getLogger()
formatter = logging.Formatter('%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s')
streamHandler = logging.StreamHandler(sys.stdout)
streamHandler.setFormatter(formatter)
LOGGER.addHandler(streamHandler)
LOGGER.setLevel(logging.INFO)

LOGGER.info(f"Passed Args :: {sys.argv}")

sql_query_template = """
select
region,
useridentity.arn,
eventsource,
eventname,
useragent

from "{cloudtrail_glue_db}"."{cloudtrail_table}"
where snapshot_date='{process_date}'
and region in ('us-east-1','us-east-2')
"""

required_args = ['CLOUDTRAIL_GLUE_DB',
                'CLOUDTRAIL_TABLE',
                'TARGET_BUCKET',
                'TARGET_DB',
                'TARGET_TABLE',
                'ACCOUNT_ID']
arg_keys = [*required_args, 'PROCESS_DATE'] if '--PROCESS_DATE' in sys.argv else required_args
JOB_ARGS = getResolvedOptions ( sys.argv, arg_keys)

LOGGER.info(f"Parsed Args :: {JOB_ARGS}")

# if process date was not passed as an argument, process yesterday's data
process_date = (
    JOB_ARGS['PROCESS_DATE']
    if JOB_ARGS.get('PROCESS_DATE','NONE') != "NONE" 
    else (datetime.today() - timedelta(days=1)).strftime("%Y-%m-%d") 
)

LOGGER.info(f"Taking snapshot for :: {process_date}")

RAW_CLOUDTRAIL_DB = JOB_ARGS['CLOUDTRAIL_GLUE_DB']
RAW_CLOUDTRAIL_TABLE = JOB_ARGS['CLOUDTRAIL_TABLE']
TARGET_BUCKET = JOB_ARGS['TARGET_BUCKET']
TARGET_DB = JOB_ARGS['TARGET_DB']
TARGET_TABLE = JOB_ARGS['TARGET_TABLE']
ACCOUNT_ID = JOB_ARGS['ACCOUNT_ID']

final_query = sql_query_template.format(
    process_date=process_date.replace("-","/"),
    cloudtrail_glue_db=RAW_CLOUDTRAIL_DB,
    cloudtrail_table=RAW_CLOUDTRAIL_TABLE
)

LOGGER.info(f"Running Query :: {final_query}")

raw_cloudtrail_df = wr.athena.read_sql_query(
    sql=final_query,
    database=RAW_CLOUDTRAIL_DB,
    ctas_approach=False,
    s3_output=f"s3://{TARGET_BUCKET}/athena-results",
)

raw_cloudtrail_df['ct']=1

agg_df = raw_cloudtrail_df.groupby(['arn','region','eventsource','eventname','useragent'],as_index=False).agg({'ct':'sum'})
agg_df['snapshot_date']=process_date

LOGGER.info(agg_df.info(verbose=True))

upload_path = f"s3://{TARGET_BUCKET}/{TARGET_DB}/{TARGET_TABLE}"

if not agg_df.empty:
    LOGGER.info(f"Upload to {upload_path}")
    try:
        response = wr.s3.to_parquet(
            df=agg_df,
            path=upload_path,
            dataset=True,
            database=TARGET_DB,
            table=TARGET_TABLE,
            mode="overwrite_partitions",
            schema_evolution=True,
            partition_cols=["snapshot_date"],
            compression="snappy",
            index=False
        )
        LOGGER.info(response)
    except Exception as exc:
        LOGGER.error("Uploading to S3 failed")
        LOGGER.exception(exc)
        raise exc
else:
    LOGGER.info(f"Dataframe was empty, nothing to upload to {upload_path}")

The following are some key advantages in this AWS Glue job:

  • We use an Athena query to ensure initial filtering is done outside of our AWS Glue job. As such, a Python Shell job with minimal compute is still sufficient for aggregating a large CloudTrail dataset.
  • We ensure the analytics library-set option is turned on when creating our AWS Glue job to use the AWS SDK for Pandas library.

Create an AWS Glue job

Complete the following steps to create your AWS Glue job:

  1. Copy the script in the preceding section and save it in a local file. For this post, the file is called script.py.
  2. On the AWS Glue console, choose ETL jobs in the navigation pane.
  3. Create a new job and select Python Shell script editor.
  4. Select Upload and edit an existing script and upload the file you saved locally.
  5. Choose Create.

  1. On the Job details tab, enter a name for your AWS Glue job.
  2. For IAM role, choose an existing role or create a new role that has the required permissions for Amazon S3, AWS Glue, and Athena. The role needs to query the CloudTrail table you created earlier and write to an output location.

You can use the following sample policy code. Replace the placeholders with your CloudTrail logs bucket, output table name, output AWS Glue database, output S3 bucket, CloudTrail table name, AWS Glue database containing the CloudTrail table, and your AWS account ID.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "s3:List*",
                "s3:Get*"
            ],
            "Resource": [
                "arn:aws:s3:::<<<CLOUDTRAIL_LOGS_BUCKET>>>/*",
                "arn:aws:s3:::<<<CLOUDTRAIL_LOGS_BUCKET>>>*"
            ],
            "Effect": "Allow",
            "Sid": "GetS3CloudtrailData"
        },
        {
            "Action": [
                "glue:Get*",
                "glue:BatchGet*"
            ],
            "Resource": [
                "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:catalog",
                "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:database/<<<GLUE_DB_WITH_CLOUDTRAIL_TABLE>>>",
                "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:table/<<<GLUE_DB_WITH_CLOUDTRAIL_TABLE>>>/<<<CLOUDTRAIL_TABLE>>>*"
            ],
            "Effect": "Allow",
            "Sid": "GetGlueCatalogCloudtrailData"
        },
        {
            "Action": [
                "s3:PutObject*",
                "s3:Abort*",
                "s3:DeleteObject*",
                "s3:GetObject*",
                "s3:GetBucket*",
                "s3:List*",
                "s3:Head*"
            ],
            "Resource": [
                "arn:aws:s3:::<<<OUTPUT_S3_BUCKET>>>",
                "arn:aws:s3:::<<<OUTPUT_S3_BUCKET>>>/<<<OUTPUT_GLUE_DB>>>/<<<OUTPUT_TABLE_NAME>>>/*"
            ],
            "Effect": "Allow",
            "Sid": "WriteOutputToS3"
        },
        {
            "Action": [
                "glue:CreateTable",
                "glue:CreatePartition",
                "glue:UpdatePartition",
                "glue:UpdateTable",
                "glue:DeleteTable",
                "glue:DeletePartition",
                "glue:BatchCreatePartition",
                "glue:BatchDeletePartition",
                "glue:Get*",
                "glue:BatchGet*"
            ],
            "Resource": [
                "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:catalog",
                "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:database/<<<OUTPUT_GLUE_DB>>>",
                "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:table/<<<OUTPUT_GLUE_DB>>>/<<<OUTPUT_TABLE_NAME>>>*"
            ],
            "Effect": "Allow",
            "Sid": "AllowOutputToGlue"
        },
        {
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:/aws-glue/*",
            "Effect": "Allow",
            "Sid": "LogsAccess"
        },
        {
            "Action": [
                "s3:GetObject*",
                "s3:GetBucket*",
                "s3:List*",
                "s3:DeleteObject*",
                "s3:PutObject",
                "s3:PutObjectLegalHold",
                "s3:PutObjectRetention",
                "s3:PutObjectTagging",
                "s3:PutObjectVersionTagging",
                "s3:Abort*"
            ],
            "Resource": [
                "arn:aws:s3:::<<<ATHENA_RESULTS_BUCKET>>>",
                "arn:aws:s3:::<<<ATHENA_RESULTS_BUCKET>>>/*"
            ],
            "Effect": "Allow",
            "Sid": "AccessToAthenaResults"
        },
        {
            "Action": [
                "athena:StartQueryExecution",
                "athena:StopQueryExecution",
                "athena:GetDataCatalog",
                "athena:GetQueryResults",
                "athena:GetQueryExecution"
            ],
            "Resource": [
                "arn:aws:glue:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:catalog",
                "arn:aws:athena:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:datacatalog/AwsDataCatalog",
                "arn:aws:athena:us-east-1:<<<YOUR_AWS_ACCT_ID>>>:workgroup/primary"
            ],
            "Effect": "Allow",
            "Sid": "AllowAthenaQuerying"
        }
    ]
}

For Python version, choose Python 3.9.

  1. Select Load common analytics libraries.
  2. For Data processing units, choose 1 DPU.
  3. Leave the other options as default or adjust as needed.

  1. Choose Save to save your job configuration.

Configure an Amazon MWAA DAG to orchestrate the AWS Glue job

The following code is for a DAG that can orchestrate the AWS Glue job that we created. We take advantage of the following key features in this DAG:

"""Sample DAG"""
import airflow.utils
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow import DAG
from datetime import timedelta
import airflow.utils

# allow backfills via DAG run parameters
process_date = '{{ dag_run.conf.get("process_date") if dag_run.conf.get("process_date") else "NONE" }}'

dag = DAG(
    dag_id = "CLOUDTRAIL_LOGS_PROCESSING",
    default_args = {
        'depends_on_past':False, 
        'start_date':airflow.utils.dates.days_ago(0),
        'retries':1,
        'retry_delay':timedelta(minutes=5),
        'catchup': False
    },
    schedule_interval = None, # None for unscheduled or a cron expression - E.G. "00 12 * * 2" - at 12noon Tuesday
    dagrun_timeout = timedelta(minutes=30),
    max_active_runs = 1,
    max_active_tasks = 1 # since there is only one task in our DAG
)

## Log ingest. Assumes Glue Job is already created
glue_ingestion_job = GlueJobOperator(
    task_id="<<<some-task-id>>>",
    job_name="<<<GLUE_JOB_NAME>>>",
    script_args={
        "--ACCOUNT_ID":"<<<YOUR_AWS_ACCT_ID>>>",
        "--CLOUDTRAIL_GLUE_DB":"<<<GLUE_DB_WITH_CLOUDTRAIL_TABLE>>>",
        "--CLOUDTRAIL_TABLE":"<<<CLOUDTRAIL_TABLE>>>",
        "--TARGET_BUCKET": "<<<OUTPUT_S3_BUCKET>>>",
        "--TARGET_DB": "<<<OUTPUT_GLUE_DB>>>", # should already exist
        "--TARGET_TABLE": "<<<OUTPUT_TABLE_NAME>>>",
        "--PROCESS_DATE": process_date
    },
    region_name="us-east-1",
    dag=dag,
    verbose=True
)

glue_ingestion_job

Increase observability of AWS Glue jobs in Amazon MWAA

The AWS Glue jobs write logs to Amazon CloudWatch. With the recent observability enhancements to Airflow’s Amazon provider package, these logs are now integrated with Airflow task logs. This consolidation provides Airflow users with end-to-end visibility directly in the Airflow UI, eliminating the need to search in CloudWatch or the AWS Glue console.

To use this feature, ensure the IAM role attached to the Amazon MWAA environment has the following permissions to retrieve and write the necessary logs:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents",
        "logs:GetLogEvents",
        "logs:GetLogRecord",
        "logs:DescribeLogStreams",
        "logs:FilterLogEvents",
        "logs:GetLogGroupFields",
        "logs:GetQueryResults",
        
      ],
      "Resource": [
        "arn:aws:logs:*:*:log-group:airflow-243-<<<Your environment name>>>-*"--Your Amazon MWAA Log Stream Name
      ]
    }
  ]
}

If verbose=true, the AWS Glue job run logs show in the Airflow task logs. The default is false. For more information, refer to Parameters.

When enabled, the DAGs read from the AWS Glue job’s CloudWatch log stream and relay them to the Airflow DAG AWS Glue job step logs. This provides detailed insights into an AWS Glue job’s run in real time via the DAG logs. Note that AWS Glue jobs generate an output and error CloudWatch log group based on the job’s STDOUT and STDERR, respectively. All logs in the output log group and exception or error logs from the error log group are relayed into Amazon MWAA.

AWS admins can now limit a support team’s access to only Airflow, making Amazon MWAA the single pane of glass on job orchestration and job health management. Previously, users needed to check AWS Glue job run status in the Airflow DAG steps and retrieve the job run identifier. They then needed to access the AWS Glue console to find the job run history, search for the job of interest using the identifier, and finally navigate to the job’s CloudWatch logs to troubleshoot.

Create the DAG

To create the DAG, complete the following steps:

  1. Save the preceding DAG code to a local .py file, replacing the indicated placeholders.

The values for your AWS account ID, AWS Glue job name, AWS Glue database with CloudTrail table, and CloudTrail table name should already be known. You can adjust the output S3 bucket, output AWS Glue database, and output table name as needed, but make sure the AWS Glue job’s IAM role that you used earlier is configured accordingly.

  1. On the Amazon MWAA console, navigate to your environment to see where the DAG code is stored.

The DAGs folder is the prefix within the S3 bucket where your DAG file should be placed.

  1. Upload your edited file there.

  1. Open the Amazon MWAA console to confirm that the DAG appears in the table.

Run the DAG

To run the DAG, complete the following steps:

  1. Choose from the following options:
    • Trigger DAG – This causes yesterday’s data to be used as the data to process
    • Trigger DAG w/ config – With this option, you can pass in a different date, potentially for backfills, which is retrieved using dag_run.conf in the DAG code and then passed into the AWS Glue job as a parameter

The following screenshot shows the additional configuration options if you choose Trigger DAG w/ config.

  1. Monitor the DAG as it runs.
  2. When the DAG is complete, open the run’s details.

On the right pane, you can view the logs, or choose Task Instance Details for a full view.

  1. View the AWS Glue job output logs in Amazon MWAA without using the AWS Glue console thanks to the GlueJobOperator verbose flag.

The AWS Glue job will have written results to the output table you specified.

  1. Query this table via Athena to confirm it was successful.

Summary

Amazon MWAA now provides a single place to track AWS Glue job status and enables you to use the Airflow console as the single pane of glass for job orchestration and health management. In this post, we walked through the steps to orchestrate AWS Glue jobs via Airflow using GlueJobOperator. With the new observability enhancements, you can seamlessly troubleshoot AWS Glue jobs in a unified experience. We also demonstrated how to upgrade your Amazon MWAA environment to a compatible version, update dependencies, and change the IAM role policy accordingly.

For more information about common troubleshooting steps, refer to Troubleshooting: Creating and updating an Amazon MWAA environment. For in-depth details of migrating to an Amazon MWAA environment, refer to Upgrading from 1.10 to 2. To learn about the open-source code changes for increased observability of AWS Glue jobs in the Airflow Amazon provider package, refer to the relay logs from AWS Glue jobs.

Finally, we recommend visiting the AWS Big Data Blog for other material on analytics, ML, and data governance on AWS.


About the Authors

Rushabh Lokhande is a Data & ML Engineer with the AWS Professional Services Analytics Practice. He helps customers implement big data, machine learning, and analytics solutions. Outside of work, he enjoys spending time with family, reading, running, and golf.

Ryan Gomes is a Data & ML Engineer with the AWS Professional Services Analytics Practice. He is passionate about helping customers achieve better outcomes through analytics and machine learning solutions in the cloud. Outside of work, he enjoys fitness, cooking, and spending quality time with friends and family.

Vishwa Gupta is a Senior Data Architect with the AWS Professional Services Analytics Practice. He helps customers implement big data and analytics solutions. Outside of work, he enjoys spending time with family, traveling, and trying new food.