Tag Archives: AWS Glue

New Amazon DynamoDB zero-ETL integration with Amazon SageMaker Lakehouse

Post Syndicated from Donnie Prakoso original https://aws.amazon.com/blogs/aws/new-amazon-dynamodb-zero-etl-integration-with-amazon-sagemaker-lakehouse/

Amazon DynamoDB, a serverless NoSQL database, has been a go-to solution for over one million customers to build low-latency and high-scale applications. As data grows, organizations are constantly seeking ways to extract valuable insights from operational data, which is often stored in DynamoDB. However, to make the most of this data in Amazon DynamoDB for analytics and machine learning (ML) use cases, customers often build custom data pipelines—a time-consuming infrastructure task that adds little unique value to their core business.

Starting today, you can use Amazon DynamoDB zero-ETL integration with Amazon SageMaker Lakehouse to run analytics and ML workloads in just a few clicks without consuming your DynamoDB table capacity. Amazon SageMaker Lakehouse unifies all your data across Amazon S3 data lakes and Amazon Redshift data warehouses, helping you build powerful analytics and AI/ML applications on a single copy of data.

Zero-ETL is a set of integrations that eliminates or minimizes the need to build ETL data pipelines. This zero-ETL integration reduces the complexity of engineering efforts required to build and maintain data pipelines, benefiting users running analytics and ML workloads on operational data in Amazon DynamoDB without impacting production workflows.

Let’s get started
For the following demo, I need to set up zero-ETL integration for my data in Amazon DynamoDB with an Amazon Simple Storage Service data lake managed by Amazon SageMaker Lakehouse. Before setting up the zero-ETL integration, there are prerequisites to complete. If you want to learn more on how to set up, refer to this Amazon DynamoDB documentation page.

With all the prerequisites completed, I can get started with this integration. I navigate to the AWS Glue console and select Zero-ETL integrations under Data Integration and ETL. Then, I choose Create zero-ETL integration.

Here, I have options to select my data source. I choose Amazon DynamoDB and choose Next.

Next, I need to configure the source and target details. In the Source details section, I select my Amazon DynamoDB table. In the Target details section, I specify the S3 bucket that I’ve set up in the AWS Glue Data Catalog.

To set up this integration, I need an IAM role that grants AWS Glue the necessary permissions. For guidance on configuring IAM permissions, visit the Amazon DynamoDB documentation page. Also, if I haven’t configured a resource policy for my AWS Glue Data Catalog, I can select Fix it for me to automatically add the required resource policies.

Here, I have options to configure the output. Under Data partitioning, I can either use DynamoDB table keys for partitioning or specify custom partition keys. After completing the configuration, I choose Next.

Because I select the Fix it for me checkbox, I need to review the required changes and choose Continue before I can proceed to the next step.

On the next page, I have the flexibility to configure data encryption. I can use AWS Key Management Service (AWS KMS) or a custom encryption key. Then, I assign a name to the integration and choose Next.

On the last step, I need to review the configurations. When I’m happy, I choose Next to create the zero-ETL integration.

After the initial data ingestion completes, my zero-ETL integration will be ready for use. The completion time varies depending on the size of my source DynamoDB table.

If I navigate to Tables under Data Catalog in the left navigation panel, I can observe more details including Schema. Under the hood, this zero-ETL integration uses Apache Iceberg to transform related to data format and structure in my DynamoDB data into Amazon S3.

Lastly, I can tell that all my data is available in my S3 bucket. 

This zero-ETL integration significantly reduces the complexity and operational burden of data movement, and I can therefore focus on extracting insights rather than managing pipelines.

Available now
This new zero-ETL capability is available in the following AWS Regions: US East (N. Virginia, Ohio), US West (Oregon), Asia Pacific (Hong Kong, Singapore, Sydney, Tokyo), Europe (Frankfurt, Ireland, Stockholm).

Explore how to streamline your data analytics workflows using Amazon DynamoDB zero-ETL integration with Amazon SageMaker Lakehouse. Learn more how to get started on the Amazon DynamoDB documentation page.

Happy building!
Donnie

Introducing the HubSpot connector for AWS Glue

Post Syndicated from Eric Bomarsi original https://aws.amazon.com/blogs/big-data/introducing-the-hubspot-connector-for-aws-glue/

Most companies have adopted a diverse set of software as a service (SaaS) platforms to support various applications. The rapid adoption has enabled them to quickly streamline operations, enhance collaboration, and gain more accessible, scalable solutions for managing their critical data and workflows.

More companies have realized there is an opportunity to integrate, enhance, and present this SaaS data to improve internal operations and gain valuable insights on their data. Using AWS Glue, a serverless data integration service, companies can streamline this process, integrating data from internal and external sources into a centralized AWS data lake. From there, they can perform meaningful analytics, gain valuable insights, and optionally push enriched data back to external SaaS platforms.

This post introduces the new HubSpot managed connector for AWS Glue, and demonstrates how you can integrate HubSpot data into your existing data lake on AWS. By consolidating HubSpot data with data from your AWS accounts and from other SaaS services, you can enhance, analyze, and optionally write the data back to HubSpot, creating a seamless and integrated data experience.

Solution overview

In this example, we use AWS Glue to extract, transform, and load (ETL) data from your HubSpot account into a transactional data lake on Amazon Simple Storage Service (Amazon S3), using Apache Iceberg format. We register the schema in the AWS Glue Data Catalog to make your data discoverable. Subsequently, we use Amazon Athena to validate that the HubSpot data has been successfully loaded to Amazon S3. The following diagram illustrates the solution architecture.

bdb-4748_hubspotblog_architecture

The following are key components and steps in the integration:

  1. Configure your HubSpot account and app to enable access to your HubSpot data.
  2. Prepare for data movement by securely storing your HubSpot OAuth credentials in AWS Secrets Manager, creating an S3 bucket to store your ingested data, and creating an AWS Identity and Access Management (IAM) role for AWS Glue.
  3. Create an AWS Glue job to extract and load data from HubSpot to Amazon S3. AWS Glue establishes a secure connection to HubSpot using OAuth for authorization and TLS for data encryption in transit. AWS Glue also supports the ability to apply complex data transformations, enabling efficient data integration and preparation to meet your needs.
  4. Schema and other metadata will be registered in the AWS Glue Data Catalog, a centralized metadata repository for all your data assets. This helps simplify schema management, and also makes the data discoverable by other services.
  5. Run the AWS Glue job to extract data from HubSpot and write it to Amazon S3 using Iceberg format. Apache Iceberg is an open source, high-performance open table format designed for large-scale analytics, providing transactional consistency and seamless schema evolution. Although we use Iceberg in this example, AWS Glue offers robust support for various data formats, including other transactional formats such as Apache Hudi and Delta Lake.
  6. The data loaded to Amazon S3 will be organized into partitioned folders to optimize for query performance and management. Amazon S3 will also store the AWS Glue scripts, logs, and other temporary data required during the ETL process.
  7. Finally, Amazon Athena will be used to query the data loaded from HubSpot to Amazon S3, validating that all changes in the source system have been captured successfully.
  8. Optionally, HubSpot can regularly synchronize HubSpot data to Amazon S3 and analyze data updates over time.

Set up your HubSpot account

This example requires you to create a HubSpot public app for AWS Glue in a HubSpot Developer account, and connect it to an associated HubSpot account. A HubSpot public app is a type of integration that can be installed in your HubSpot accounts or listed in the HubSpot Marketplace. In this example, you create a HubSpot app for the AWS Glue integration, and install it in a new test account. Although HubSpot calls it a public app, it will not be listed in their Marketplace and will only have access to your test account.

  1. If you don’t already have one, sign up for a free HubSpot developer account.
  2. Log in to your HubSpot developer account, where you’ll see options to create apps and test accounts.
  3. Choose Create a test account and follow the instructions.

HubSpot test accounts have Enterprise versions of the HubSpot Marketing, Sales, and Service Hubs along with sample data, so you can test most HubSpot tools, create CRM data, and access it through APIs with Glue. For more information about creating a test account, refer to Create a developer test account.

Create a HubSpot app

Complete the following steps to create a HubSpot app:

  1. Switch back to your HubSpot developer account, and choose Create an app.
  2. Fill in the App Info section with the name AWS Glue and a brief description.
  3. Choose the Auth tab.
  4. For Redirect URLs, enter the redirect URL for AWS Glue in the form: https://<region>.console.aws.amazon.com/gluestudio/oauth.

Be sure to replace <region> with your AWS Glue operating AWS Region. For instance, the code for the US East (N. Virginia) Region is us-east-1, so the AWS Glue redirect URL is https://us-east-1.console.aws.amazon.com/gluestudio/oauth.

  1. In the Scopes section, choose Add new scope and select the following permissions:
    • automation
    • content
    • crm.lists.read
    • crm.lists.write
    • crm.objects.companies.read
    • crm.objects.companies.write
    • crm.objects.contacts.read
    • crm.objects.contacts.write
    • crm.objects.custom.read
    • crm.objects.custom.write
    • crm.objects.deals.read
    • crm.objects.deals.write
    • crm.objects.owners.read
    • crm.schemas.custom.read
    • e-commerce
    • forms
    • oauth
    • sales-email-read
    • tickets
  2. Review the Scopes and Redirect URL settings, then choose Create app.
  3. Navigate back to your app Auth tab.
  4. Take note of the values for Client ID, Client secret, and Install URL (OAuth). You will need these later to connect your AWS Glue instance.

Select or create an Amazon S3 bucket where your HubSpot data will reside

Select an existing Amazon S3 bucket in your account, or create a new bucket to store your HubSpot data, as well as scripts, logs, and so on. For this example, the bucket name will follow the format aws-glue-hubspot-<account>-<region>, where <account> is the AWS account number and <region> is the operating Region. The account will be configured with all defaults: public access disabled, versioning disabled, and server-side encryption with Amazon S3 managed keys (SSE-S3).

If you use AWSGlueServiceRole in your IAM role as shown in this example, it will provide access to S3 buckets with names starting with aws-glue-.

Create an IAM role for AWS Glue

Create an IAM role with permissions for the AWS Glue job. AWS Glue will assume this role when calling other services on your behalf.

  1. On the IAM console, choose Roles in the navigation pane.
  2. Choose Create role.
  3. For Trusted entity type¸ choose AWS service.
  4. For Use case, choose Glue.
  5. Add the following AWS managed policies to the role:
    1. AWSGlueServiceRole for accessing related services such as Amazon S3, Amazon Elastic Compute Cloud, Amazon CloudWatch, and IAM. This policy enables access to S3 buckets with names starting with aws-glue-.
    2. SecretsManagerReadWrite for read/write access to AWS Secrets Manager.
  6. Give the role a name, for instance AWSGlueServiceRole_blog.

For more information, see Getting started with AWS Glue and Create an IAM role for AWS Glue.

Create a AWS Secrets Manager secret

AWS Secrets Manager is used to securely store your HubSpot OAuth credentials. Complete the following steps to create a secret:

  1. On the AWS Secrets Manager console, choose Secrets in the navigation pane.
  2. Choose Store a new secret.
  3. For Secret type, select Other type of secret.
  4. Under Kay/value pairs, enter the HubSpot client secret with the key USER_MANAGED_CLIENT_APPLICATION_CLIENT_SECRET.
  5. Choose Next.

bdb-4748_secretsmanager

  1. Enter the secret name, such as HubSpot-Blog, a description, and continue.
  2. Leave the secret rotation as default, and choose Next.
  3. Review the secret configuration, and choose Store.

Create an AWS Glue connection

Complete the following steps to create an AWS Glue connection to your HubSpot account:

  1. On the AWS Glue console, choose Data connections in the navigation pane.
  2. Choose Create connection.
  3. For Data sources, search for and select HubSpot.
  4. Choose Next.

bdb-4748_glueconnection

  1. On the Configure connection page, fill in the required information:
    1. For IAM service role, choose the service role created previously. In this example, we use the role AWSGlueServiceRole_blog.
    2. For Authentication URL, leave as default.
    3. For User Managed Client Application ClientId, enter the OAuth client ID from HubSpot.
    4. For AWS Secret, choose the OAuth client secret name configured previously in AWS Secrets Manager.
    5. Choose Next.

bdb-4748_GlueConnection2.

  1. Choose Test Connection to validate the connection to HubSpot.
  2. This will bring up a new HubSpot connection window. Be sure to select your HubSpot test account (not your developer account) to test the connection.
  3. If this is your first connection attempt, you will be redirected to another page where you are asked to confirm the access level granted to AWS Glue. Choose Connect App.

If successful, the HubSpot window will close and your AWS connection window will say Connection test successful.

  1. Under Set properties, for Name, enter a name (for example, HubSpot_Connection_blog).
  2. Choose Next.
  3. Under Review and create, review your settings and then create the connection.

Create a database in AWS Glue Data Catalog

Complete the following steps to create a database in AWS Glue Data Catalog to organize your HubSpot data:

  1. On the AWS Glue console, choose Databases in the navigation pane.
  2. Create a new database.
  3. Enter a name (for example, hubspot).
  4. You can leave the location field blank.
  5. Choose Create database.

Create an AWS Glue ETL job

Now that you have an AWS Glue data connection to your HubSpot account, you can create an AWS Glue ETL job to ingest HubSpot data into your AWS data lake. AWS Glue provides both visual and code-based interfaces to simplify data integration, depending on your expertise. In this example, we use the Script interface to ingest HubSpot data into the Amazon S3 location. Complete the following steps:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Choose the Script editor.
  3. Choose Spark as the engine, and upload the following script.

The AWS Glue Spark job reads the HubSpot data and merges it into the S3 bucket in Iceberg format.

  1. On the Job details tab, provide the following information:
  2. For Name, enter a name, such as HubSpot_to_S3_blog.
  3. For Description, enter a meaningful description of the job.
  4. For IAM Role, choose the IAM role you created previously (for this post, AWSGlueServiceRole_blog).

bdb-4748_hubspot_connection

  1. Expand Advanced properties.
  2. Under Connections, enter your HubSpot connection from the previous section (for this post, HubSpot_Connection_blog).

bdb-4748_hubspotconnection2

  1. Under Job parameters, enter the following parameters:
    • For --conf, enter spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.catalog.glue_catalog.warehouse=file:///tmp/spark-warehouse
    • For --datalake-formats, enter iceberg
    • For --db_name, enter the AWS Glue database to store your data lake (for this post, hubspot)
    • For --table_name, enter the HubSpot table to be ingested (for this post, company)
    • For --s3_bucket_name, enter where the ingested Iceberg table is stored, in this case aws-glue-hubspot-<account>-<region>
    • For --connection_name, enter the AWS Glue connection name created, in this case HubSpot_Connection_blog
  1. Choose Save to save the job, then choose Run.

Depending on the amount of data in your HubSpot account, the job can take a few minutes to complete. After a successful job run, you can choose Run details to see the job specifications and logs.

Use Athena to query data

Athena is an interactive and serverless query service that makes it straightforward to analyze data directly in Amazon S3 using standard SQL. In this example, we query the results of the HubSpot data ingested into Amazon S3.

  1. On the Athena console, choose Query editor.
  2. For Database, choose hubspot, and you should see your company table.
  3. Select entries from the hubspot.company table to view the data captured from hubspot.

You can try various queries on the HubSpot data, such as:

-- get sample of dataset
SELECT * FROM "hubspot"."company" limit 10;

-- get companies revenue
SELECT * FROM "hubspot"."company" A
WHERE A.annualrevenue IS NOT NULL;

-- get number of companies with revenue
SELECT COUNT(*) AS companies_count FROM "hubspot"."company" A
WHERE A.annualrevenue IS NOT NULL;

bdb-4748_athena

Over time, your HubSpot data may change. You can rerun your ETL job periodically, and the Iceberg data lake table will effectively capture your changes. You can verify by adding, removing, and changing companies in your HubSpot database, and then rerun the ETL job. Your data lake should match your latest HubSpot data. With this capability, you can schedule the ETL job to run as often as you need.

Extending the HubSpot connector with AWS services

The HubSpot connector for AWS Glue provides a powerful foundation for building comprehensive data pipelines and analytics workflows. By integrating HubSpot data into your AWS environment, you can use additional services like Amazon Redshift, Amazon QuickSight, and Amazon SageMaker to further process, transform, and analyze the data. This allows you to construct sophisticated, end-to-end data architectures that unlock the full value of your HubSpot data, without the need to manage complex infrastructure. The seamless integration between these AWS services makes it straightforward to build scalable analytics pipelines tailored to your specific requirements.

Considerations

You can set up AWS Glue job triggers to run the ETL jobs on a schedule, so that the data is regularly synchronized between HubSpot and Amazon S3. You can also integrate the ETL jobs with other AWS services, including AWS Step Functions, Amazon MWAA (Amazon Managed Workflows for Apache Airflow), AWS Lambda, Amazon EventBridge , and Amazon Bedrock to create a more advanced data processing pipeline.

By default, the HubSpot connector doesn’t import deleted records. However, you can set the IMPORT_DELETED_RECORDS option to true to import all records, including the deleted ones.

Clean up

To avoid incurring charges, clean up the resources used in this post from your AWS account, including the AWS Glue jobs, HubSpot connection, AWS Secrets Manager secret, IAM role, and Amazon S3 bucket.

Conclusion

With the introduction of the AWS Glue connector for HubSpot, integrating HubSpot data with information from other data sources has become more streamlined than ever. This feature enables you to set up ongoing data integration from HubSpot to AWS, providing a unified view of data from across platforms and enabling more comprehensive analytics. The serverless nature of AWS Glue means there is no infrastructure management required, and you only pay for the resources consumed. By following the steps outlined in this post, you can make sure that up-to-date data from HubSpot is captured in the your data lake, allowing teams to make faster data-driven decisions and uncover complex insights from across data sources.

To learn more about the AWS Glue connector for HubSpot, refer to Connecting to HubSpot in AWS Glue. This guide walks through the entire process, from setting up the connection to running the data transfer flow. For more information on AWS Glue, visit AWS Glue.


About the Authors

Eric Bomarsi is a Senior Solutions Architect in the ISV group at AWS, where he focuses on building scalable solutions for large customers. As a member of the AWS analytics community, he helps customers get strategic insights from their data. Outside of work, he enjoys playing ice hockey and traveling with his family.

Annie Nelson is a Senior Solutions Architect at AWS. She is a data enthusiast who enjoys problem solving and tackling complex architectural challenges with customers.

Kartikay KhatorKartikay Khator is a Solutions Architect within Global Life Sciences at AWS, where he dedicates his efforts to developing innovative and scalable solutions that cater to the evolving needs of customers. His expertise lies in harnessing the capabilities of AWS analytics services. Extending beyond his professional pursuits, he finds joy and fulfillment in the world of running and hiking. Having already completed multiple marathons, he is currently preparing for his next marathon challenge.

bdb-4748_awskamenKamen Sharlandjiev is a Sr. Big Data and ETL Solutions Architect, Amazon MWAA and AWS Glue ETL expert. He’s on a mission to make life easier for customers who are facing complex data integration and orchestration challenges. His secret weapon? Fully managed AWS services that can get the job done with minimal effort. Follow Kamen on LinkedIn to keep up to date with the latest Amazon MWAA and AWS Glue features and news!

Scaling RISE with SAP data and AWS Glue

Post Syndicated from Allison Quinn original https://aws.amazon.com/blogs/big-data/scaling-rise-with-sap-data-and-aws-glue/

Customers often want to augment and enrich SAP source data with other non-SAP source data. Such analytic use cases can be enabled by building a data warehouse or data lake. Customers can now use the AWS Glue SAP OData connector to extract data from SAP. The SAP OData connector supports both on-premises and cloud-hosted (native and SAP RISE) deployments. By using the AWS Glue OData connector for SAP, you can work seamlessly with your data on AWS Glue and Apache Spark in a distributed fashion for efficient processing. AWS Glue is a serverless data integration service that makes it easier to discover, prepare, move, and integrate data from multiple sources for analytics, machine learning (ML), and application development.

AWS Glue OData connector for SAP uses the SAP ODP framework and OData protocol for data extraction. This framework acts in a provider-subscriber model to enable data transfers between SAP systems and non-SAP data targets. The ODP framework supports full data extraction and change data capture through the Operational Delta Queues (ODQ) mechanism. As a source for data extraction for SAP, you can use SAP data extractors, ABAP CDS views, SAP BW, or BW/4 HANA sources, HANA information views in SAP ABAP sources, or any ODP-enabled data sources.

SAP source systems can hold historical data, and can receive constant updates. For this reason, it’s important to enable incremental processing of source changes. This blog post details how you can extract data from SAP and implement incremental data transfer from your SAP source using the SAP ODP OData framework with source delta tokens.

Solution overview

Example Corp wants to analyze the product data stored in their SAP source system. They want to understand their current product offering, in particular the number of products that they have in each of their material groups. This will include joining data from the SAP material master and material group data sources from their SAP system. The material master data is available on incremental extraction, while the material group is only available on a full load. These data sources should be combined and available to query for analysis.

Prerequisites

To complete the solution presented in the post, start by completing the following prerequisite steps:

  1. Configure operational data provisioning (ODP) data sources for extraction in the SAP Gateway of your SAP system.
  2. Create an Amazon Simple Storage Service (Amazon S3) bucket to store your SAP data.
  3. In an AWS Glue Data Catalog, create a database called sapgluedatabase.
  4. Create an AWS Identity and Access Management (IAM) role for the AWS Glue extract, transform, and load (ETL) job to use. The role must grant access to all resources used by the job, including Amazon S3 and AWS Secrets Manager. For the solution in this post, name the role GlueServiceRoleforSAP. Use the following policies:
    • AWS managed policies:
    • Inline policy:
      {
             "Version": "2012-10-17",
             "Statement": [
                    {
                            "Sid": "VisualEditor0",
                            "Effect": "Allow",
                            "Action": [
                                   "s3:PutObject",
                                   "s3:GetObjectAcl",
                                   "s3:GetObject",
                                   "s3:GetObjectAttributes",
                                   "s3:ListBucket",
                                   "s3:DeleteObject",
                                   "s3:PutObjectAcl"],
                            "Resource": [
                                   "arn:aws:s3:::<S3-BUCKET-NAME>",
                                   "arn:aws:s3:::<S3-BUCKET-NAME>/*"
                            ]
                    }
             ]
      }
      

Create the AWS Glue connection for SAP

The SAP connector supports both CUSTOM (this is SAP BASIC authentication) and OAUTH authentication methods. For this example, you will be connecting with BASIC authentication.

  1. Use the AWS Management Console for AWS Secrets Manager to create a secret called ODataGlueSecret for your SAP source. Details in AWS Secrets Manager should include the elements in the following code. You will need to enter your SAP system username in place of <your SAP username> and its password in place of <your SAP username password>.
    {
       "basicAuthUsername": "<your SAP username>",
       "basicAuthPassword": "<your SAP username password>",
       "basicAuthDisableSSO": "True",
       "customAuthenticationType": "CustomBasicAuth"
    }
    

  2. Create the AWS Glue connection GlueSAPOdata for your SAP system by selecting the new SAP OData data source.
  3. Configure the connection with the appropriate values for your SAP source.
    1. Application host URL: The host must have the SSL certificates for the authentication and validation of your SAP host name.
    2. Application service path: /sap/opu/odata/iwfnd/catalogservice;v=2;
    3. Port number: Port number of your SAP source system.
    4. Client number: Client number of your SAP source system.
    5. Logon language: Logon language of your SAP source system.
  4. In the Authentication section, select CUSTOM as the Authentication Type.
  5. Select the AWS Secret created in the preceding steps: SAPODataSecret.
  6. In the Network Options section enter the VPC, subnet and security group used for the connection to your SAP system. For more information on connecting to your SAP system, see Configure a VPC for your ETL job.

Create an ETL job to ingest data from SAP

In the AWS Glue console, create a new Visual Editor AWS Glue job.

  1. Go to the AWS Glue console.
  2. In the navigation pane under ETL Jobs choose Visual ETL.
  3. Choose Visual ETL to create a job in the Visual Editor.
  4. For this post, edit the default name to be Material Master Job and choose Save.

On your Visual Editor canvas, select your SAP sources.

  1. Choose the Visual tab, then choose the plus sign to open the Add nodes menu. Search for SAP and add the SAP OData Source.
  2. Choose the node you just added and name it Material Master Attributes.
    1. For SAP OData connection, select the GlueSAPOData connection.
    2. Select the material attributes, service and entity set from your SAP source.
    3. For Entity Name and Sub Entity Name, select SAP OData entity from your SAP source.
    4. From the Fields, select Material, Created on, Material Group, Material Type, Old Matl number, GLUE_FETCH_SQ, DELTA_TOKEN and DML_STATUS.
    5. Enter limit 100 in the filter section, to limit the data for design time.

Note that this service supports delta extraction, so Incremental transfer is the default selected option.

After the AWS Glue service role details have been chosen, the data preview is available. You can adjust the preview to include the three new available fields, which are:

  • glue_fetch_sq: This is a sequence field, generated from the EPOC timestamp in the order the record was received and is unique for each record. This can be used if you need to know or establish the order of changes in the source system.
  • delta_token: All records will have this field value blank, except for the last passed record, which will contain the value for the ODQ token to capture any changed records (CDC). This record is not a transactional record from the source and is only there for the purpose of passing the delta token value.
  • dml_status: This will show UPDATED for all newly inserted and updated records from the source and DELETED for records that have been deleted from source.

For delta enabled extraction, the last record passed will contain the value DELTA_TOKEN and the delta_token field will be filled as mentioned above.

  1. Add another SAP ODATA source connection to your canvas, and name this node Material Group Text.
    1. Select the material group service and entity set from your SAP source
    2. For Entity Name and Sub Entity Name, select the SAP OData entity from your SAP source

Note that this service supports full extraction, so Full transfer is the default selected option. You can also preview this dataset.

  1. When previewing the data, notice the language key. SAP passes all languages, so add a filter of SPRAS = ‘E’ to only extract English. Note this uses the SAP internal value of the field.
  2. Add a transform node to the canvas Change Schema transform after the Material Group Text.
    • Rename the material group field in target key to matkl2, so it is different than your first source.
    • Under Drop, select ;spras, odq_changemode, odq_entitycntr, dml_status, delta_token and glue_fetch_sq.

  3. Add a join transform to your canvas, bringing together both source datasets.
    1. Ensure the node parents of both Material Master Attributes and Change Schema have been chosen
    2. Select the Join type of Left join
    3. Select the join conditions as the key fields from each source
      • Under Material Master Attributes, select matkl
      • Under Change Schema, select matkl2

You can preview the output to ensure the correct data is being returned. Now, you are ready to store the result.

  1. Add the S3 bucket target, to your canvas.
    1. Ensure the node parents is Join
    2. For format, select Parquet.
    3. For S3 Target Location, browse to the S3 bucket you created in the prerequisites and add materialmaster/ to the S3 target location.
    4. For the Data Catalog update options, select Create a table in the Data Catalog and on subsequent runs, update the schema and add new partitions.
    5. For Database, select the name of the AWS Glue database created earlier sapgluedatabase.
    6. For Table name, enter materialmaster.
  2. Choose Save to save your job. Your job should look like the following figure.

Clone your ETL job and make it incremental

After your ETL job has been created, it’s ready to clone and include incremental data handling using delta tokens.

To do this, you will need to modify the job script directly. You will modify the script to add a statement which retrieves the last delta token (to be stored on the job tag) and add the delta token value to the to the request (or execution of the job), which will enable the Delta Enabled SAP OData Service when retrieving the data on the next job run.

The first execution of the job will not have a delta token value on the tag; therefore, the call will be an initial run and the delta token will subsequently be stored in the tags for future executions.

  1. Go to the AWS Glue console.
  2. In the navigation pane under ETL Jobs choose Visual ETL.
  3. Select the Material Master Job, choose Actions and select Clone job.
  4. Change the name of the job to Material Master Job Delta, then choose the Script tab.
  5. You need to add an additional python library that will take care of storing and retrieving the Delta Tokens for each job execution. To do this, navigate to the Job Details tab, scroll down and expand the Advanced Properties section. In the Python library path add the following path:
    s3://aws-blogs-artifacts-public/artifacts/BDB-4789/sap_odata_state_management.zip

  1. Now choose the Script tab and choose Edit script on the top right corner. Choose Confirm to confirm that your job will be script-only.

Apply the following changes to the script to enable the delta token.

  1. 7. Import the SAP OData state management library classes you added in step 5 above, by adding the following code to row 8.
    from sap_odata_state_management.state_manager import StateManagerFactory, StateManagerType, StateType

  2. The next few steps will retrieve and persist the delta token in the job tags so it can be accessed by the subsequent job execution. The delta token is added to the request back to the SAP source, so the incremental changes are extracted. If there is no token passed, the load will run as an initial load and the token will be persisted for the next run which will then be a delta load.To initialize the sap_odata_state_management library, extract the connection options into a variable and update them using the state manager. Do this by adding the following code to line 16 (after the job.init statement).

You can find the <key of MaterialMasterAttributes node> and the <entityName for Material Attribute> in the existing generated script under # Script generated for node Material Master Attributes. Be sure to replace with the appropriate values.

key = "<key of MaterialMasterAttributes node>"
state_manager = StateManagerFactory.create_manager(
    manager_type=StateManagerType.JOB_TAG, state_type=StateType.DELTA_TOKEN, options={"job_name": args['JOB_NAME'], "logger": glueContext.get_logger()}
)
options = {
    "connectionName": "GlueSAPOData",
    "entityName": "<entityName for Material Attribute>",
    "ENABLE_CDC": "true"
}
connector_options = state_manager.get_connector_options(key)
options.update(connector_options)
  1. 9. Comment out the existing script generated for node Material Master Attributes by adding a #, and add the following replacement snippet.
    <key of MaterialMasterAttributes node> = glueContext.create_dynamic_frame.from_options(connection_type="sapodata", connection_options=options, transformation_ctx="<key of MaterialMasterAttributes node>")

  2. To extract the delta token from the dynamic frame and persist it in the job tags, add the following code snippet just above the last line in your script (before job.commit())
    state_manager.update_state(key, <key of MaterialMasterAttributes node>.toDF())

This is what your final script should look like:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from sap_odata_state_management.state_manager import StateManagerFactory, StateManagerType, StateType

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

key = "MaterialMasterAttributes_node1730873953236"
state_manager = StateManagerFactory.create_manager(
    manager_type=StateManagerType.JOB_TAG, state_type=StateType.DELTA_TOKEN, options={"job_name": args['JOB_NAME'], "logger": glueContext.get_logger()}
)
options = {
    "connectionName": "GlueSAPOData",
    "entityName": "/sap/opu/odata/sap/ZMATERIAL_ATTR_SRV/EntityOf0MATERIAL_ATTR",
    "ENABLE_CDC": "true"
}

# Script generated for node Material Group Text
MaterialGroupText_node1730874412841 = glueContext.create_dynamic_frame.from_options(connection_type="sapodata", connection_options={"ENABLE_CDC": "false", "connectionName": "GlueSAPOData", "FILTER_PREDICATE": "SPRAS = 'E'", "ENTITY_NAME": "/sap/opu/odata/sap/ZMATL_GROUP_SRV/EntityOf0MATL_GROUP_TEXT"}, transformation_ctx="MaterialGroupText_node1730874412841")

# Script generated for node Material Master Attributes
#MaterialMasterAttributes_node1730873953236 = glueContext.create_dynamic_frame.from_options(connection_type="sapodata", connection_options={"ENABLE_CDC": "true", "connectionName": "GlueSAPOdata", "FILTER_PREDICATE": "limit 100", "SELECTED_FIELDS": "MATNR,MTART,MATKL,BISMT,ERSDA,DML_STATUS,DELTA_TOKEN,GLUE_FETCH_SQ", "ENTITY_NAME": "/sap/opu/odata/sap/ZMATERIAL_ATTR_SRV/EntityOf0MATERIAL_ATTR"}, transformation_ctx="MaterialMasterAttributes_node1732755261264")
MaterialMasterAttributes_node1730873953236 = glueContext.create_dynamic_frame.from_options(connection_type="sapodata", connection_options=options, transformation_ctx="MaterialMasterAttributes_node1730873953236")

# Script generated for node Change Schema
ChangeSchema_node1730875214894 = ApplyMapping.apply(frame=MaterialGroupText_node1730874412841, mappings=[("matkl", "string", "matkl2", "string"), ("txtsh", "string", "txtsh", "string")], transformation_ctx="ChangeSchema_node1730875214894")

# Script generated for node Join
MaterialMasterAttributes_node1730873953236DF = MaterialMasterAttributes_node1730873953236.toDF()
ChangeSchema_node1730875214894DF = ChangeSchema_node1730875214894.toDF()
Join_node1730874996674 = DynamicFrame.fromDF(MaterialMasterAttributes_node1730873953236DF.join(ChangeSchema_node1730875214894DF, (MaterialMasterAttributes_node1730873953236DF['matkl'] == ChangeSchema_node1730875214894DF['matkl2']), "left"), glueContext, "Join_node1730874996674")

# Script generated for node Amazon S3
AmazonS3_node1730875848117 = glueContext.write_dynamic_frame.from_options(frame=Join_node1730874996674, connection_type="s3", format="json", connection_options={"path": "s3://sapglueodatabucket", "compression": "snappy", "partitionKeys": []}, transformation_ctx="AmazonS3_node1730875848117")
state_manager.update_state(key, MaterialMasterAttributes_node1730873953236.toDF())
job.commit()
  1. Choose Save to save your changes.
  2. Choose Run to run your job. Note that there are currently no tags in your job details.
  3. Wait for your job run to be successfully completed. You can see the status on the Runs tab.
  4. After your job run is complete, you will notice on the Job Details tab that a tag has been added. The next job run will read this token and run a delta load.

Query your SAP data source data

The AWS Glue job run has created an entry in the Data Catalog enabling you to query the data immediately.

  1. Go to the Amazon Athena console.
  2. Choose Launch Query Editor.
  3. Make sure you have an appropriate workgroup assigned, or create a workgroup if required.
  4. Select the sapgluedatabase and run a query (such as the following) to start analyzing your data.
    select matkl, txtsh, count(*)
    from materialmaster
    group by 1, 2
    order by 1, 2;

Clean up

To avoid incurring charges, clean up the resources used in this post from your AWS account, including the AWS Glue jobs, SAP OData connection, Glue Data Catalog entry, Secrets Manager secret, IAM role, the contents of the S3 bucket, and the S3 bucket.

Conclusion

In this post, we showed you how to create a serverless incremental data load process for multiple SAP data sources. The approach used AWS Glue to incrementally load the data from a SAP source using SAP ODP delta tokens and then load the data into Amazon S3.

The serverless nature of AWS Glue means that there is no infrastructure management, and you pay only for the resources consumed while your jobs are running (plus storage cost for outputs). As organizations increasingly become more data driven, this SAP connector can provide an efficient, cost effective, performant, secure way to include SAP source data in your big data and analytic outcomes. For more information see AWS Glue.


About the authors

Allison Quinn is a Sr. ANZ Analytics Specialist Solutions Architect for Data and AI based in Melbourne, Australia working closely with Financial Service customers in the region. Allison worked over 15 years with SAP products before concentrating her Analytics technical specialty on AWS native services. She’s very passionate about all things data, and democratizing so that customers of all types can drive business benefit.

Pavol is an Innovation Solution Architect at AWS, specializing in SAP cloud adoption across EMEA. With over 20 years of experience, he helps global customers migrate and optimize SAP systems on AWS. Pavol develops tailored strategies to transition SAP environments to the cloud, leveraging AWS’s agility, resiliency, and performance. He assists clients in modernizing their SAP landscapes using AWS’s AI/ML, data analytics, and application services to enhance intelligence, automation, and performance.

Partha Pratim Sanyal is a Software Development Engineer with AWS Glue in Vancouver, Canada, specializing in Data Integration, Analytics, and Connectivity. With extensive backend development expertise, he is dedicated to crafting impactful, customer-centric solutions. His work focuses on building features that empower users to effortlessly analyze and understand their data. Partha’s commitment to addressing complex user needs drives him to create intuitive and value-driven experiences that elevate data accessibility and insights for customers.

Diego is an experienced Enterprise Solutions Architect with over 20 years’ experience across SAP technologies, specializing in SAP innovation and data and analytics. He has worked both as partner and as a customer, giving him a complete perspective on what it takes to sell, implement, and run systems and organizations. He is passionate about technology and innovation, focusing on customer outcomes and delivering business value.

Luis Alberto Herrera Gomez is a Software Development Engineer with AWS Glue in Vancouver, specializing in backend engineering, microservices, and cloud computing. With 7-8 years of experience, including roles as a backend and full-stack developer for multiple startups before joining Amazon and AWS; Luis focuses on developing scalable and efficient cloud-based applications. His expertise in AWS technologies enables him to design high-performance systems that handle complex data processing tasks. Luis is passionate about leveraging cloud computing to solving challenging business problems.

Run Apache XTable in AWS Lambda for background conversion of open table formats

Post Syndicated from Matthias Rudolph original https://aws.amazon.com/blogs/big-data/run-apache-xtable-in-aws-lambda-for-background-conversion-of-open-table-formats/

This post was co-written with Dipankar Mazumdar, Staff Data Engineering Advocate with AWS Partner OneHouse.

Data architecture has evolved significantly to handle growing data volumes and diverse workloads. Initially, data warehouses were the go-to solution for structured data and analytical workloads but were limited by proprietary storage formats and their inability to handle unstructured data. This led to the rise of data lakes based on columnar formats like Apache Parquet, which came with different challenges like the lack of ACID capabilities.

Eventually, transactional data lakes emerged to add transactional consistency and performance of a data warehouse to the data lake. Central to a transactional data lake are open table formats (OTFs) such as Apache Hudi, Apache Iceberg, and Delta Lake, which act as a metadata layer over columnar formats. These formats provide essential features like schema evolution, partitioning, ACID transactions, and time-travel capabilities, that address traditional problems in data lakes.

In practice, OTFs are used in a broad range of analytical workloads, from business intelligence to machine learning. Moreover, they can be combined to benefit from individual strengths. For instance, a streaming data pipeline can write tables using Hudi because of its strength in low-latency, write-heavy workloads. In later pipeline stages, data is converted to Iceberg, to benefit from its read performance. Traditionally, this conversion required time-consuming rewrites of data files, resulting in data duplication, higher storage, and increased compute costs. In response, the industry is shifting toward interoperability between OTFs, with tools that allow conversions without data duplication. Apache XTable (incubating), an emerging open source project, facilitates seamless conversions between OTFs, eliminating many of the challenges associated with table format conversion.

In this post, we explore how Apache XTable, combined with the AWS Glue Data Catalog, enables background conversions between OTFs residing on Amazon Simple Storage Service (Amazon S3) based data lakes, with minimal to no changes to existing pipelines in a scalable and cost-effective way, as shown in the following diagram.

This post is one of multiple posts about XTable on AWS. For more examples and references to other posts, refer to the following GitHub repository.

Apache XTable

Apache XTable (incubating) is an open source project designed to enable interoperability among various data lake table formats, allowing omnidirectional conversions between formats without the need to copy or rewrite data. Originally open sourced in November 2023 under the name OneTable, with contributions from amongst others OneHouse, it was licensed under Apache 2.0. In March 2024, the project was donated to the Apache Software Foundation (ASF) and rebranded as Apache XTable, where it is now incubating. XTable isn’t a new table format but provides abstractions and tools to translate the metadata associated with existing formats. The primary objective of XTable is to allow users to start with any table format and have the flexibility to switch to another as needed.

Inner workings and features

At a fundamental level, Hudi, Iceberg, and Delta Lake share similarities in their structure. When data is written to a distributed file system, these formats consist of a data layer, typically Parquet files, and a metadata layer that provides the necessary abstraction (see the following diagram). XTable uses these commonalities to enable interoperability between formats.

The synchronization process in XTable works by translating table metadata using the existing APIs of these table formats. It reads the current metadata from the source table and generates the corresponding metadata for one or more target formats. This metadata is then stored in a designated directory within the base path of your table, such as _delta_log for Delta Lake, metadata for Iceberg, and .hoodie for Hudi. This allows the existing data to be interpreted as if it were originally written in any of these formats.

XTable provides two metadata translation methods: Full Sync, which translates all commits, and Incremental Sync, which only translates new, unsynced commits for greater efficiency with large tables. If issues arise with Incremental Sync, XTable automatically falls back to Full Sync to provide uninterrupted translation.

Community and future

In terms of future plans, XTable is focused on achieving feature parity with OTFs’ built-in features, including adding critical capabilities like support for Merge-on-Read (MoR) tables. The project also plans to facilitate synchronization of table formats across multiple catalogs, such as AWS Glue, Hive, and Unity catalog.

Run XTable as a continuous background conversion mechanism

In this post, we describe a background conversion mechanism for OTFs that doesn’t require changes to data pipelines. The mechanism periodically scans a data catalog like the AWS Glue Data Catalog for tables to convert with XTable.

On a data platform, a data catalog stores table metadata and typically contains the data model and physical storage location of the datasets. It serves as the central integration with analytical services. To maximize ease of use, compatibility, and scalability on AWS, the conversion mechanism described in this post is built around the AWS Glue Data Catalog.

The following diagram illustrates the solution at a glance. We design this conversion mechanism based on Lambda, AWS Glue, and XTable.

In order for the Lambda function to be able to detect the tables inside the Data Catalog, the following information needs to be associated with a table: source format and target formats. For each detected table, the Lambda function invokes the XTable application, which is packaged into the functions environment. Then XTable translates between source and target formats and writes the new metadata on the same data store.

Solution overview

We implement the solution with the AWS Cloud Development Kit (AWS CDK), an open source software development framework for defining cloud infrastructure in code, and provide it on GitHub. The AWS CDK solution deploys the following components:

  • A converter Lambda function that contains the XTable application and starts the conversion job for the detected tables
  • A detector Lambda function that scans the Data Catalog for tables that are to be converted and invokes the converter Lambda function
  • An Amazon EventBridge schedule that invokes the detector Lambda function on an hourly basis

Currently, the XTable application needs to be built from source. We therefore provide a Dockerfile that implements the required build steps and use the resulting Docker image as the Lambda function runtime environment.

In case you don’t have sample data available for testing, we provide scripts for generating sample datasets on GitHub. Data and metadata are shown in blue in the following detail diagram.

Converter Lambda function: Run XTable

The converter Lambda function invokes the XTable JAR, wrapped with the third-party library jpype, and converts the metadata layer of the respective data lake tables.

The function is defined in the AWS CDK through the DockerImageFunction, which uses a Dockerfile and builds a Docker container as part of the deploy step. With this mechanism, we can bundle the XTable application inside our Lambda function.

First, we download the XTtable GitHub repository and build the jar with the maven CLI. This is done as a part of the Docker container build process:

# Dockerfile # clone sources
RUN git clone --depth 1 --branch <xtable_branch> https://github.com/apache/incubator-xtable.git

# build xtable jar
WORKDIR /incubator-xtable
RUN /apache-maven-<maven_version>/bin/mvn package -DskipTests=true
WORKDIR /

To automatically build and upload the Docker image, we create a DockerImageFunction in the AWS CDK and reference the Dockerfile in its definition. To successfully run Spark and therefore XTable in a Lambda function, we need to set the LOCAL_IP variable of Spark to localhost and therefore to 127.0.0.1:

# cdk_stack.py
detector = _lambda.DockerImageFunction(
    scope=self,
    id="Converter",
    # Dockerfile in ./src directory
    code=_lambda.DockerImageCode.from_image_asset(
        directory="src", cmd=["detector.handler"]
    )
    environment={"SPARK_LOCAL_IP": "127.0.0.1"}
    ...
)

To call the XTtable JAR, we use a third-party Python library called jpype, which handles the communication with the Java virtual machine. In our Python code, the XTtable call is as follows:

# call java class with configuration files
run_sync = jpype.JPackage("org").apache.xtable.utilities.RunSync.main
run_sync(
    [
        "--datasetConfig",
        "<path_to_dataset_config>",
        "--icebergCatalogConfig",
        "<path_to_catalog_config>",
    ]
)

For more information on XTable application parameters, see Creating your first interoperable table.

Detector Lambda function: Identify tables to convert in the Data Catalog

The detector Lambda function scans the tables in the Data Catalog. For a table that will be converted, it invokes the converter Lambda function through an event. This decouples the scanning and conversion parts and makes our solution more resilient to potential failures.

The detection mechanism searches in the table parameters for the parameters xtable_table_type and xtable_target_formats. If they exist, the conversion is invoked. See the following code:

# detector.py
# create paginator to loop through AWS Glue tables
tables = glue_client.get_paginator("get_tables").paginate(
    DatabaseName=database["Name"]
)
for table_list in tables:
    table_list = table_list["TableList"]
…
# loop through all tables and check for required custom glue parameters
for table in table_list:
    required_parameters={"xtable_table_type", "xtable_target_formats"}
    # if required table parameters exist pass on table for conversion
    if required_parameters <= table["Parameters"].keys():
        yield table

EventBridge Scheduler rule

In the AWS CDK, you define an EventBridge Scheduler rule as follows. Based on the rule, EventBridge will then call the Lambda detector function every hour:

# cdk_stack.py
event = events.Rule(
    scope=self,
    id="DetectorSchedule",
    schedule=events.Schedule.rate(Duration.hours(1)),
)
event.add_target(targets.LambdaFunction(detector))

Prerequisites

Let’s dive deeper into how to deploy the provided AWS CDK stack. You need one of the following container runtimes:

  • Finch (an open source client for container development)
  • Docker

You also need the AWS CDK configured. For more details, see Getting started with the AWS CDK.

Build and deploy the solution

Complete the following steps:

  1. To deploy the stack, clone the GitHub repo, change into the folder for this post (xtable_lambda), and deploy the AWS CDK stack:
    git clone https://github.com/aws-samples/apache-xtable-on-aws-samples.git
    cd xtable_lambda
    cdk deploy

This deploys the described Lambda functions and the EventBridge Scheduler rule.

  1. When using Finch, you need to set the CDK_DOCKER environment variable before deployment:
    export CDK_DOCKER=finch

After successful deployment, the conversion mechanism starts to run every hour.

  1. The following parameters need to exist on the AWS Glue table that will be converted:
    1. "xtable_table_type": "<source_format>"
    2. "xtable_target_formats": "<target_format>, <target_format>"

On the AWS Glue console, the parameters look like the following screenshot and can be set under Table properties when editing an AWS Glue table.

  1. Optionally, if you don’t have sample data, the following scripts can help you set up a test environment either with your local machine or in an AWS Glue for Spark job:
    # local: create hudi dataset on S3
    cd scripts
    pip install -r requirements.txt
    python ./create_hudi_s3.py

Convert a streaming table (Hudi to Iceberg)

Let’s assume we have a Hudi table on Amazon S3, which is registered in the Data Catalog, and want to periodically translate it to Iceberg format. Data is streaming in continuously. We have deployed the provided AWS CDK stack and set the required AWS Glue table properties to translate the dataset to the Iceberg format. In the following steps, we run the background job, see the results in AWS Glue and Amazon S3, and query it with Amazon Athena, a serverless and interactive analytics service that provides a simplified and flexible way to analyze petabytes of data.

In Amazon S3 and AWS Glue, we can see our Hudi dataset and table along with the metadata folder .hoodie. On the AWS Glue console, we set the following table properties:

  • "xtable_target_type": "HUDI"
  • "xtable_table_formats": "ICEBERG"

Our Lambda function is invoked periodically every hour. After the run, we can find the Iceberg-specific metadata folder in our S3 bucket, which was generated by XTable.

If we look at the Data Catalog, we can see the new table <table_name>_converted was registered as an Iceberg table.

img-registered-table-after-conversion

With the Iceberg format, we can now take advantage of the time travel feature by querying the dataset with a downstream analytical service like Athena. In the following screenshot, you can see at Name: that the table is in Iceberg format.

Querying all snapshots, we can see that we created three snapshots with overwrites after the initial one.

We then take the current time and query the dataset representation of 180 minutes ago, resulting in the data from the first snapshot committed.

Summary

In this post, we demonstrated how to build a background conversion job for OTFs, using XTable and the Data Catalog, which is independent from data pipelines and transformation jobs. Through Xtable, it allows for efficient translation between OTFs, because data files are reused and only the metadata layer is processed. The integration with the Data Catalog provides wide compatability with AWS analytical services.

You can reuse the Lambda based XTable deployment in other solutions. For instance, you could use it in a reactive mechanism for near real-time conversion of OTFs, which is invoked by Amazon S3 object events resulting from changes to OTF metadata.

For further information about XTable, see the project’s official website. For more examples and references to other posts on using XTable on AWS, refer to the following GitHub repository.


About the authors

Matthias Rudolph is a Solutions Architect at AWS, digitalizing the German manufacturing industry, focusing on analytics and big data. Before that he was a lead developer at the German manufacturer KraussMaffei Technologies, responsible for the development of data platforms.

Dipankar Mazumdar is a Staff Data Engineer Advocate at Onehouse.ai, focusing on open-source projects like Apache Hudi and XTable to help engineering teams build and scale robust analytics platforms, with prior contributions to critical projects such as Apache Iceberg and Apache Arrow.

Stephen Said is a Senior Solutions Architect and works with Retail/CPG customers. His areas of interest are data platforms and cloud-native software engineering.

Introducing generative AI troubleshooting for Apache Spark in AWS Glue (preview)

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/introducing-generative-ai-troubleshooting-for-apache-spark-in-aws-glue-preview/

Organizations run millions of Apache Spark applications each month to prepare, move, and process their data for analytics and machine learning (ML). Building and maintaining these Spark applications is an iterative process, where developers spend significant time testing and troubleshooting their code. During development, data engineers often spend hours sifting through log files, analyzing execution plans, and making configuration changes to resolve issues. This process becomes even more challenging in production environments due to the distributed nature of Spark, its in-memory processing model, and the multitude of configuration options available. Troubleshooting these production issues requires extensive analysis of logs and metrics, often leading to extended downtimes and delayed insights from critical data pipelines.

Today, we are excited to announce the preview of generative AI troubleshooting for Spark in AWS Glue. This is a new capability that enables data engineers and scientists to quickly identify and resolve issues in their Spark applications. This feature uses ML and generative AI technologies to provide automated root cause analysis for failed Spark applications, along with actionable recommendations and remediation steps. This post demonstrates how you can debug your Spark applications with generative AI troubleshooting.

How generative AI troubleshooting for Spark works

For Spark jobs, the troubleshooting feature analyzes job metadata, metrics and logs associated with the error signature of your job to generates a comprehensive root cause analysis. You can initiate the troubleshooting and optimization process with a single click on the AWS Glue console. With this feature, you can reduce your mean time to resolution from days to minutes, optimize your Spark applications for cost and performance, and focus more on deriving value from your data.

Manually debugging Spark applications can get challenging for data engineers and ETL developers due to a few different reasons:

  • Extensive connectivity and configuration options to a variety of resources with Spark while makes it a popular data processing platform, often makes it challenging to root cause issues when configurations are not correct, especially related to resource setup (S3 bucket, databases, partitions, resolved columns) and access permissions (roles and keys).
  • Spark’s in-memory processing model and distributed partitioning of datasets across its workers while good for parallelism, often make it difficult for users to identify root cause of failures resulting from resource exhaustion issues like out of memory and disk exceptions.
  • Lazy evaluation of Spark transformations while good for performance, makes it challenging to accurately and quickly identify the application code and logic which caused the failure from the distributed logs and metrics emitted from different executors.

Let’s look at a few common and complex Spark troubleshooting scenarios where Generative AI Troubleshooting for Spark can save hours of manual debugging time required to deep dive and come up with the exact root cause.

Resource setup or access errors

Spark applications allows to integrate data from a variety of resources like datasets with several partitions and columns on S3 buckets and Data Catalog tables, use the associated job IAM roles and KMS keys for correct permissions to access these resources, and require these resources to exist and be available in the right regions and locations referenced by their identifiers. Users can mis-configure their applications that result in errors requiring deep dive into the logs to understand the root cause being a resource setup or permission issue.

Manual RCA: Failure reason and Spark application Logs

Following example shows the failure reason for such a common setup issue for S3 buckets in a production job run. The failure reason coming from Spark does not help understand the root cause or the line of code that needs to be inspected for fixing it.

Exception in User Class: org.apache.spark.SparkException : Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (172.36.245.14 executor 1): com.amazonaws.services.glue.util.NonFatalException: Error opening file:

After deep diving into the logs of one of the many distributed Spark executors, it becomes clear that the error was caused due to a S3 bucket not existing, however the error stack trace is usually quite long and truncated to understand the precise root cause and location within Spark application where the fix is needed.

Caused by: java.io.IOException: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: The specified bucket does not exist (Service: Amazon S3; Status Code: 404; Error Code: NoSuchBucket; Request ID: 80MTEVF2RM7ZYAN9; S3 Extended Request ID: AzRz5f/Amtcs/QatfTvDqU0vgSu5+v7zNIZwcjUn4um5iX3JzExd3a3BkAXGwn/5oYl7hOXRBeo=; Proxy: null), S3 Extended Request ID: AzRz5f/Amtcs/QatfTvDqU0vgSu5+v7zNIZwcjUn4um5iX3JzExd3a3BkAXGwn/5oYl7hOXRBeo=
at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:423)
at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.isFolderUsingFolderObject(Jets3tNativeFileSystemStore.java:249)
at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.isFolder(Jets3tNativeFileSystemStore.java:212)
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:518)
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:935)
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:927)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:983)
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(EmrFileSystem.java:197)
at com.amazonaws.services.glue.hadoop.TapeHadoopRecordReaderSplittable.initialize(TapeHadoopRecordReaderSplittable.scala:168)
... 29 more

With Generative AI Spark Troubleshooting: RCA and Recommendations

With Spark Troubleshooting, you simply click the Troubleshooting analysis button on your failed job run, and the service analyzes the debug artifacts of your failed job to identify the root cause analysis along with the line number in your Spark application that you can inspect to further resolve the issue.

Spark Out of Memory Errors

Let’s take a common but relatively complex error that requires significant manual analysis to conclude its because of a Spark job running out of memory on Spark driver (master node) or one of the distributed Spark executors. Usually, troubleshooting requires an experienced data engineer to manually go over the following steps to identify the root cause.

  • Search through Spark driver logs to find the exact error message
  • Navigate to the Spark UI to analyze memory usage patterns
  • Review executor metrics to understand memory pressure
  • Analyze the code to identify memory-intensive operations

This process often takes hours because the failure reason from Spark is usually not challenging to understand that it was a out of memory issue on the Spark driver and what is the remedy to fix it.

Manual RCA: Failure reason and Spark application Logs

Following example shows the failure reason for the error.

Py4JJavaError: An error occurred while calling o4138.collectToPython. java.lang.StackOverflowError

Spark driver logs require extensive search to find the exact error message. In this case, the error stack trace consisted of more than hundred function calls and is challenging to understand the precise root cause as the Spark application terminated abruptly.

py4j.protocol.Py4JJavaError: An error occurred while calling o4138.collectToPython.
: java.lang.StackOverflowError
 at org.apache.spark.sql.catalyst.trees.TreeNode$$Lambda$1942/131413145.get$Lambda(Unknown Source)
 at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:798)
 at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:459)
 at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:781)
 at org.apache.spark.sql.catalyst.trees.TreeNode.clone(TreeNode.scala:881)
 at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$clone(LogicalPlan.scala:30)
 at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.clone(AnalysisHelper.scala:295)
 at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.clone$(AnalysisHelper.scala:294)
 at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.clone(LogicalPlan.scala:30)
 at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.clone(LogicalPlan.scala:30)
 at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$clone$1(TreeNode.scala:881)
 at org.apache.spark.sql.catalyst.trees.TreeNode.applyFunctionIfChanged$1(TreeNode.scala:747)
 at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:783)
 at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:459)
 ... repeated several times with hundreds of function calls

With Generative AI Spark Troubleshooting: RCA and Recommendations

With Spark Troubleshooting, you can click the Troubleshooting analysis button on your failed job run and get a detailed root cause analysis with the line of code which you can inspect, and also recommendations on best practices to optimize your Spark application for fixing the problem.

Spark Out of Disk Errors

Another complex error pattern with Spark is when it runs out of disk storage on one of the many Spark executors in the Spark application. Similar to Spark OOM exceptions, manual troubleshooting requires extensive deep dive into distributed executor logs and metrics to understand the root cause and identify the application logic or code causing the error due to Spark’s lazy execution of its transformations.

Manual RCA: Failure Reason and Spark application Logs

The associated failure reason and error stack trace in the application logs is again quiet long requiring the user to gather more insights from Spark UI and Spark metrics to identify the root cause and identify the resolution.

An error occurred while calling o115.parquet. No space left on device
py4j.protocol.Py4JJavaError: An error occurred while calling o115.parquet.
: org.apache.spark.SparkException: Job aborted.
 at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:638)
 at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:279)
 at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:193)
 at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
 at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
 at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
 ....

With Generative AI Spark Troubleshooting: RCA and Recommendations

With Spark Troubleshooting, it provides the RCA and the line number of code in the script where the data shuffle operation was lazily evaluated by Spark. It also points to best practices guide for optimizing the shuffle or wide transforms or using S3 shuffle plugin on AWS Glue.

Debug AWS Glue for Spark jobs

To use this troubleshooting feature for your failed job runs, complete following:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Choose your job.
  3. On the Runs tab, choose your failed job run.
  4. Choose Troubleshoot with AI to start the analysis.
  5. You will be redirected to the Troubleshooting analysis tab with generated analysis.

You will see Root Cause Analysis and Recommendations sections.

The service analyzes your job’s debug artifacts and provide the results. Let’s look at a real example of how this works in practice.

We show below an end-to-end example where Spark Troubleshooting helps a user with identification of the root cause for a resource setup issue and help fix the job to resolve the error.

Considerations

During preview, the service focuses on common Spark errors like resource setup and access issues, out of memory exceptions on Spark driver and executors, out of disk exceptions on Spark executors, and will clearly indicate when an error type is not yet supported. Your jobs must run on AWS Glue version 4.0.

The preview is available at no additional charge in all AWS commercial Regions where AWS Glue is available. When you use this capability, any validation runs triggered by you to test proposed solutions will be charged according to the standard AWS Glue pricing.

Conclusion

This post demonstrated how generative AI troubleshooting for Spark in AWS Glue helps your day-to-day Spark application debugging. It simplifies the debugging process for your Spark applications by using generative AI to automatically identify the root cause of failures and provides actionable recommendations to resolve the issues.

To learn more about this new troubleshooting feature for Spark, please visit Troubleshooting Spark jobs with AI.

A special thanks to everyone who contributed to the launch of generative AI troubleshooting for Apache Spark in AWS Glue: Japson Jeyasekaran, Rahul Sharma, Mukul Prasad, Weijing Cai, Jeremy Samuel, Hirva Patel, Martin Ma, Layth Yassin, Kartik Panjabi, Maya Patwardhan, Anshi Shrivastava, Henry Caballero Corzo, Rohit Das, Peter Tsai, Daniel Greenberg, McCall Peltier, Takashi Onikura, Tomohiro Tanaka, Sotaro Hikita, Chiho Sugimoto, Yukiko Iwazumi, Gyan Radhakrishnan, Victor Pleikis, Sriram Ramarathnam, Matt Sampson, Brian Ross, Alexandra Tello, Andrew King, Joseph Barlan, Daiyan Alamgir, Ranu Shah, Adam Rohrscheib, Nitin Bahadur, Santosh Chandrachood, Matt Su, Kinshuk Pahare, and William Vambenepe.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Vishal Kajjam is a Software Development Engineer on the AWS Glue team. He is passionate about distributed computing and using ML/AI for designing and building end-to-end solutions to address customers’ data integration needs. In his spare time, he enjoys spending time with family and friends.

Shubham Mehta is a Senior Product Manager at AWS Analytics. He leads generative AI feature development across services such as AWS Glue, Amazon EMR, and Amazon MWAA, using AI/ML to simplify and enhance the experience of data practitioners building data applications on AWS.

Wei Tang is a Software Development Engineer on the AWS Glue team. She is strong developer with deep interests in solving recurring customer problems with distributed systems and AI/ML.

XiaoRun Yu is a Software Development Engineer on the AWS Glue team. He is working on building new features for AWS Glue to help customers. Outside of work, Xiaorun enjoys exploring new places in the Bay Area.

Jake Zych is a Software Development Engineer on the AWS Glue team. He has deep interest in distributed systems and machine learning. In his spare time, Jake likes to create video content and play board games.

Savio Dsouza is a Software Development Manager on the AWS Glue team. His team works on distributed systems & new interfaces for data integration and efficiently managing data lakes on AWS.

Mohit Saxena is a Senior Software Development Manager on the AWS Glue team. His team focuses on building distributed systems to enable customers with interactive and simple-to-use interfaces to efficiently manage and transform petabytes of data across data lakes on Amazon S3, and databases and data warehouses on the cloud.

Introducing generative AI upgrades for Apache Spark in AWS Glue (preview)

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/introducing-generative-ai-upgrades-for-apache-spark-in-aws-glue-preview/

Organizations run millions of Apache Spark applications each month on AWS, moving, processing, and preparing data for analytics and machine learning. As these applications age, keeping them secure and efficient becomes increasingly challenging. Data practitioners need to upgrade to the latest Spark releases to benefit from performance improvements, new features, bug fixes, and security enhancements. However, these upgrades are often complex, costly, and time-consuming.

Today, we are excited to announce the preview of generative AI upgrades for Spark, a new capability that enables data practitioners to quickly upgrade and modernize their Spark applications running on AWS. Starting with Spark jobs in AWS Glue, this feature allows you to upgrade from an older AWS Glue version to AWS Glue version 4.0. This new capability reduces the time data engineers spend on modernizing their Spark applications, allowing them to focus on building new data pipelines and getting valuable analytics faster.

Understanding the Spark upgrade challenge

The traditional process of upgrading Spark applications requires significant manual effort and expertise. Data practitioners must carefully review incremental Spark release notes to understand the intricacies and nuances of breaking changes, some of which may be undocumented. They then need to modify their Spark scripts and configurations, updating features, connectors, and library dependencies as needed.

Testing these upgrades involves running the application and addressing issues as they arise. Each test run may reveal new problems, resulting in multiple iterations of changes. After the upgraded application runs successfully, practitioners must validate the new output against the expected results in production. This process often turns into year-long projects that cost millions of dollars and consume tens of thousands of engineering hours.

How generative AI upgrades for Spark works

The Spark upgrades feature uses AI to automate both the identification and validation of required changes to your AWS Glue Spark applications. Let’s explore how these capabilities work together to simplify your upgrade process.

AI-driven upgrade plan generation

When you initiate an upgrade, the service analyzes your application using AI to identify necessary changes across both PySpark code and Spark configurations. During preview, Spark Upgrades supports upgrading from Glue 2.0 (Spark 2.4.3, Python 3.7) to Glue 4.0 (Spark 3.3.0, Python 3.10), automatically handling changes that would typically require extensive manual review of public Spark, Python and Glue version migration guides, followed by development, testing, and verification. Spark Upgrades addresses four key areas of changes:

  • Spark SQL API methods and functions
  • Spark DataFrame API methods and operations
  • Python language updates (including module deprecations and syntax changes)
  • Spark SQL and Core configuration settings

The complexity of these upgrades becomes evident when you consider migrating from Spark 2.4.3 to Spark 3.3.0 involves over a hundred version-specific changes. Several factors contribute to the challenges of performing manual upgrades:

  • Highly expressive language with a mix of imperative and declarative programming styles, allows users to easily develop Spark applications. However, this increases the complexity of identifying impacted code during upgrades.
  • Lazy execution of transformations in a distributed Spark application improves performance but makes runtime verification of application upgrades challenging for users.
  • Spark configurations changes in default values or the introduction of new configurations across versions can impact application behavior in different ways, making it difficult for users to identify issues during upgrades.

For example, in Spark 3.2, Spark SQL TRANSFORM operator can’t support alias in inputs. In Spark 3.1 and earlier, you could write a script transform like SELECT TRANSFORM(a AS c1, b AS c2) USING 'cat' FROM TBL.

# Original code (Glue 2.0)
query = """
SELECT TRANSFORM(item as product_name, price as product_price, number as product_number)
   USING 'cat'
FROM goods
WHERE goods.price > 5
"""
spark.sql(query)

# Updated code (Glue 4.0)
query = """
SELECT TRANSFORM(item, price, number)
   USING 'cat' AS (product_name, product_price, product_number)
FROM goods
WHERE goods.price > 5
"""
spark.sql(query)

In Spark 3.1, loading and saving timestamps before 1900-01-01 00:00:00Z as INT96 in Parquet files causes errors. In Spark 3.0, this wouldn’t fail but could result in timestamp shifts due to calendar rebasing. To restore the old behavior in Spark 3.1, you would need to configure the Spark SQL configurations for spark.sql.legacy.parquet.int96RebaseModeInRead and spark.sql.legacy.parquet.int96RebaseModeInWrite to LEGACY.

# Original code (Glue 2.0)
data = [(1, "1899-12-31 23:59:59"), (2, "1900-01-01 00:00:00")]
schema = StructType([ StructField("id", IntegerType(), True), StructField("timestamp", TimestampType(), True) ])
df = spark.createDataFrame(data, schema=schema)
df.write.mode("overwrite").parquet("path/to/parquet_file") 

# Updated code (Glue 4.0)
qspark.conf.set("spark.sql.legacy.parquet.int96RebaseModeInRead", "LEGACY") 
spark.conf.set("spark.sql.legacy.parquet.int96RebaseModeInWrite", "LEGACY")

data = [(1, "1899-12-31 23:59:59"), (2, "1900-01-01 00:00:00")]
schema = StructType([ StructField("id", IntegerType(), True), StructField("timestamp", TimestampType(), True) ])
df = spark.createDataFrame(data, schema=schema)
df.write.mode("overwrite").parquet("path/to/parquet_file")

Automated validation in your environment

After identifying the necessary changes, Spark Upgrades validates the upgraded application by running it as an AWS Glue job in your AWS account. The service iterates through multiple validation runs, up to 10, reviewing any errors encountered in each iteration and refining the upgrade plan until it achieves a successful run. You can run a Spark Upgrade Analysis in your development account using mock datasets supplied through Glue job parameters used for validation runs.

After Spark Upgrades has successfully validated the changes, it presents an upgrade plan for you to review. You can then accept and apply the changes to your job in the development account, before replicating them to your job in the production account. The Spark Upgrade plan includes the following:

  • An upgrade summary with an explanation of code updates made during the process
  • The final script that you can use in place of your current script
  • Logs from validation runs showing how issues were identified and resolved

You can review all aspects of the upgrade, including intermediate validation attempts and any error resolutions, before deciding to apply the changes to your production job. This approach ensures you have full visibility into and control over the upgrade process while benefiting from AI-driven automation.

Get started with generative AI Spark upgrades

Let’s walk through the process of upgrading an AWS Glue 2.0 job to AWS Glue 4.0. Complete the following steps:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Select your AWS Glue 2.0 job, and choose Run upgrade analysis with AI.
  3. For Result path, enter s3://aws-glue-assets-<account-id>-<region>/scripts/upgraded/ (provide your own account ID and AWS Region).
  4. Choose Run.
  5. On the Upgrade analysis tab, wait for the analysis to be completed.

    While an analysis is running, you can view the intermediate job analysis attempts (up to 10) for validation under the Runs tab. Additionally, the Upgraded summary in S3 documents the upgrades made by the Spark Upgrade service so far, refining the upgrade plan with each attempt. Each attempt will display a different failure reason, which the service tries to address in the subsequent attempt through code or configuration updates.
    After a successful analysis, the upgraded script and a summary of changes will be uploaded to Amazon Simple Storage Service (Amazon S3).
  6. Review the changes to make sure they meet your requirements, then choose Apply upgraded script.

Your job has now been successfully upgraded to AWS Glue version 4.0. You can check the Script tab to verify the updated script and the Job details tab to review the modified configuration.

Understanding the upgrade process through an example

We now show a production Glue 2.0 job that we would like to upgrade to Glue 4.0 using the Spark Upgrade feature. This Glue 2.0 job reads a dataset, updated daily in an S3 bucket under different partitions, containing new book reviews from an online marketplace and runs SparkSQL to gather insights into the user votes for the book reviews.

Original code (Glue 2.0) – before upgrade

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
from collections import Sequence
from pyspark.sql.types import DecimalType
from pyspark.sql.functions import lit, to_timestamp, col

def is_data_type_sequence(coming_dict):
    return True if isinstance(coming_dict, Sequence) else False

def dataframe_to_dict_list(df):
    return [row.asDict() for row in df.collect()]

books_input_path = (
    "s3://aws-bigdata-blog/generated_synthetic_reviews/data/product_category=Books/"
)
view_name = "books_temp_view"
static_date = "2010-01-01"
books_source_df = (
    spark.read.option("header", "true")
    .option("recursiveFileLookup", "true")
    .option("path", books_input_path)
    .parquet(books_input_path)
)
books_source_df.createOrReplaceTempView(view_name)
books_with_new_review_dates_df = spark.sql(
    f"""
        SELECT 
        {view_name}.*,
            DATE_ADD(to_date(review_date), "180.8") AS next_review_date,
            CASE 
                WHEN DATE_ADD(to_date(review_date), "365") < to_date('{static_date}') THEN 'Yes' 
                ELSE 'No' 
            END AS Actionable
        FROM {view_name}
    """
)
books_with_new_review_dates_df.createOrReplaceTempView(view_name)
aggregate_books_by_marketplace_df = spark.sql(
    f"SELECT marketplace, count({view_name}.*) as total_count, avg(star_rating) as average_star_ratings, avg(helpful_votes) as average_helpful_votes, avg(total_votes) as average_total_votes  FROM {view_name} group by marketplace"
)
aggregate_books_by_marketplace_df.show()
data = dataframe_to_dict_list(aggregate_books_by_marketplace_df)
if is_data_type_sequence(data):
    print("data is valid")
else:
    raise ValueError("Data is invalid")

aggregated_target_books_df = aggregate_books_by_marketplace_df.withColumn(
    "average_total_votes_decimal", col("average_total_votes").cast(DecimalType(3, -2))
)
aggregated_target_books_df.show()

New code (Glue 4.0) – after upgrade

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from collections.abc import Sequence
from pyspark.sql.types import DecimalType
from pyspark.sql.functions import lit, to_timestamp, col

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
spark.conf.set("spark.sql.adaptive.enabled", "false")
spark.conf.set("spark.sql.legacy.allowStarWithSingleTableIdentifierInCount", "true")
spark.conf.set("spark.sql.legacy.allowNegativeScaleOfDecimal", "true")
job = Job(glueContext)

def is_data_type_sequence(coming_dict):
    return True if isinstance(coming_dict, Sequence) else False

def dataframe_to_dict_list(df):
    return [row.asDict() for row in df.collect()]

books_input_path = (
    "s3://aws-bigdata-blog/generated_synthetic_reviews/data/product_category=Books/"
)
view_name = "books_temp_view"
static_date = "2010-01-01"
books_source_df = (
    spark.read.option("header", "true")
    .option("recursiveFileLookup", "true")
    .load(books_input_path)
)
books_source_df.createOrReplaceTempView(view_name)
books_with_new_review_dates_df = spark.sql(
    f"""
        SELECT 
        {view_name}.*,
            DATE_ADD(to_date(review_date), 180) AS next_review_date,
            CASE 
                WHEN DATE_ADD(to_date(review_date), 365) < to_date('{static_date}') THEN 'Yes' 
                ELSE 'No' 
            END AS Actionable
        FROM {view_name}
    """
)
books_with_new_review_dates_df.createOrReplaceTempView(view_name)
aggregate_books_by_marketplace_df = spark.sql(
    f"SELECT marketplace, count({view_name}.*) as total_count, avg(star_rating) as average_star_ratings, avg(helpful_votes) as average_helpful_votes, avg(total_votes) as average_total_votes  FROM {view_name} group by marketplace"
)
aggregate_books_by_marketplace_df.show()
data = dataframe_to_dict_list(aggregate_books_by_marketplace_df)
if is_data_type_sequence(data):
    print("data is valid")
else:
    raise ValueError("Data is invalid")

aggregated_target_books_df = aggregate_books_by_marketplace_df.withColumn(
    "average_total_votes_decimal", col("average_total_votes").cast(DecimalType(3, -2))
)
aggregated_target_books_df.show()

Upgrade summary

In Spark 3.2, spark.sql.adaptive.enabled is enabled by default. To restore the behavior before Spark 3.2, 
you can set spark.sql.adaptive.enabled to false.

No suitable migration rule was found in the provided context for this specific error. The change was made based on the error message, which indicated that Sequence could not be imported from collections module. In Python 3.10, Sequence has been moved to the collections.abc module.

In Spark 3.1, path option cannot coexist when the following methods are called with path parameter(s): DataFrameReader.load(), DataFrameWriter.save(), DataStreamReader.load(), or DataStreamWriter.start(). In addition, paths option cannot coexist for DataFrameReader.load(). For example, spark.read.format(csv).option(path, /tmp).load(/tmp2) or spark.read.option(path, /tmp).csv(/tmp2) will throw org.apache.spark.sql.AnalysisException. In Spark version 3.0 and below, path option is overwritten if one path parameter is passed to above methods; path option is added to the overall paths if multiple path parameters are passed to DataFrameReader.load(). To restore the behavior before Spark 3.1, you can set spark.sql.legacy.pathOptionBehavior.enabled to true.

In Spark 3.0, the `date_add` and `date_sub` functions accepts only int, smallint, tinyint as the 2nd argument; fractional and non-literal strings are not valid anymore, for example: `date_add(cast('1964-05-23' as date), '12.34')` causes `AnalysisException`. Note that, string literals are still allowed, but Spark will throw `AnalysisException` if the string content is not a valid integer. In Spark version 2.4 and below, if the 2nd argument is fractional or string value, it is coerced to int value, and the result is a date value of `1964-06-04`.

In Spark 3.2, the usage of count(tblName.*) is blocked to avoid producing ambiguous results. Because count(*) and count(tblName.*) will output differently if there is any null values. To restore the behavior before Spark 3.2, you can set spark.sql.legacy.allowStarWithSingleTableIdentifierInCount to true.

In Spark 3.0, negative scale of decimal is not allowed by default, for example, data type of literal like 1E10BD is DecimalType(11, 0). In Spark version 2.4 and below, it was DecimalType(2, -9). To restore the behavior before Spark 3.0, you can set spark.sql.legacy.allowNegativeScaleOfDecimal to true.

As seen in the updated Glue 4.0 (Spark 3.3.0) script diff compared to the Glue 2.0 (Spark 2.4.3) script and the resulting upgrade summary, a total of six different code and configuration updates were applied across the six attempts of the Spark Upgrade Analysis.

  • Attempt #1 included a Spark SQL configuration (spark.sql.adaptive.enabled) to restore the application behavior as a new feature for Spark SQL adaptive query execution is introduced starting Spark 3.2. Users can inspect this configuration change and can further enable or disable it as per their preference.
  • Attempt #2 resolved a Python language change between Python 3.7 and 3.10 with the introduction of a new abstract base class (abc) under the Python collections module for importing Sequence.
  • Attempt #3 resolved an error encountered due to a change in behavior of DataFrame API starting Spark 3.1 where path option cannot exist with other DataFrameReader operations.
  • Attempt #4 resolved an error caused by a change in the Spark SQL function API signature for DATE_ADD which now only accepts integers as the second argument starting from Spark 3.0.
  • Attempt #5 resolved an error encountered due to the change in behavior Spark SQL function API for count(tblName.*) starting Spark 3.2. The behavior was restored with the introduction of a new Spark SQL configuration spark.sql.legacy.allowStarWithSingleTableIdentifierInCount
  • Attempt #6 successfully completed the analysis and ran the new script on Glue 4.0 without any new errors. The final attempt resolved an error encountered due to the prohibited use of negative scale for cast(DecimalType(3, -6) in Spark DataFrame API starting Spark 3.0. The issue was addressed by enabling the new Spark SQL configuration spark.sql.legacy.allowNegativeScaleOfDecimal.

Important considerations for preview

As you begin using automated Spark upgrades during the preview period, there are several important aspects to consider for optimal usage of the service:

  • Service scope and limitations – The preview release focuses on PySpark code upgrades from AWS Glue versions 2.0 to version 4.0. At the time of writing, the service handles PySpark code that doesn’t rely on additional library dependencies. You can run automated upgrades for up to 10 jobs concurrently in an AWS account, allowing you to efficiently modernize multiple jobs while maintaining system stability.
  • Optimizing costs during the upgrade process – Because the service uses generative AI to validate the upgrade plan through multiple iterations, with each iteration running as an AWS Glue job in your account, it’s essential to optimize the validation job run configurations for cost-efficiency. To achieve this, we recommend specifying a run configuration when starting an upgrade analysis as follows:
    • Using non-production developer accounts and selecting sample mock datasets that represent your production data but are smaller in size for validation with Spark Upgrades.
    • Using right-sized compute resources, such as G.1X workers, and selecting an appropriate number of workers for processing your sample data.
    • Enabling Glue auto scaling when applicable to automatically adjust resources based on workload.

    For example, if your production job processes terabytes of data with 20 G.2X workers, you might configure the upgrade job to process a few gigabytes of representative data with 2 G.2X workers and auto scaling enabled for validation.

  • Preview best practices – During the preview period, we strongly recommend starting your upgrade journey with non-production jobs. This approach allows you to familiarize yourself with the upgrade workflow, and understand how the service handles different types of Spark code patterns.

Your experience and feedback are crucial in helping us enhance and improve this feature. We encourage you to share your insights, suggestions, and any challenges you encounter through AWS Support or your account team. This feedback will help us improve the service and add capabilities that matter most to you during preview.

Conclusion

This post demonstrates how automated Spark upgrades can assist with migrating your Spark applications in AWS Glue. It simplifies the migration process by using generative AI to automatically identify the necessary script changes across different Spark versions.

To learn more about this feature in AWS Glue, see Generative AI upgrades for Apache Spark in AWS Glue.

A special thanks to everyone who contributed to the launch of generative AI upgrades for Apache Spark in AWS Glue: Shuai Zhang, Mukul Prasad, Liyuan Lin, Rishabh Nair, Raghavendhar Thiruvoipadi Vidyasagar, Tina Shao, Chris Kha, Neha Poonia, Xiaoxi Liu, Japson Jeyasekaran, Suthan Phillips, Raja Jaya Chandra Mannem, Yu-Ting Su, Neil Jonkers, Boyko Radulov, Sujatha Rudra, Mohammad Sabeel, Mingmei Yang, Matt Su, Daniel Greenberg, Charlie Sim, McCall Petier, Adam Rohrscheib, Andrew King, Ranu Shah, Aleksei Ivanov, Bernie Wang, Karthik Seshadri, Sriram Ramarathnam, Asterios Katsifodimos, Brody Bowman, Sunny Konoplev, Bijay Bisht, Saroj Yadav, Carlos Orozco, Nitin Bahadur, Kinshuk Pahare, Santosh Chandrachood, and William Vambenepe.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his new road bike.

Keerthi Chadalavada is a Senior Software Development Engineer at AWS Glue, focusing on combining generative AI and data integration technologies to design and build comprehensive solutions for customers’ data and analytics needs.

Shubham Mehta is a Senior Product Manager at AWS Analytics. He leads generative AI feature development across services such as AWS Glue, Amazon EMR, and Amazon MWAA, using AI/ML to simplify and enhance the experience of data practitioners building data applications on AWS.

Pradeep Patel is a Software Development Manager on the AWS Glue team. He is passionate about helping customers solve their problems by using the power of the AWS Cloud to deliver highly scalable and robust solutions. In his spare time, he loves to hike and play with web applications.

Chuhan LiuChuhan Liu is a Software Engineer at AWS Glue. He is passionate about building scalable distributed systems for big data processing, analytics, and management. He is also keen on using generative AI technologies to provide brand-new experience to customers. In his spare time, he likes sports and enjoys playing tennis.

Vaibhav Naik is a software engineer at AWS Glue, passionate about building robust, scalable solutions to tackle complex customer problems. With a keen interest in generative AI, he likes to explore innovative ways to develop enterprise-level solutions that harness the power of cutting-edge AI technologies.

Mohit Saxena is a Senior Software Development Manager on the AWS Glue and Amazon EMR team. His team focuses on building distributed systems to enable customers with simple-to-use interfaces and AI-driven capabilities to efficiently transform petabytes of data across data lakes on Amazon S3, and databases and data warehouses on the cloud.

AWS Glue Data Catalog supports automatic optimization of Apache Iceberg tables through your Amazon VPC

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/aws-glue-data-catalog-supports-automatic-optimization-of-apache-iceberg-tables-through-your-amazon-vpc/

The AWS Glue Data Catalog supports automatic table optimization of Apache Iceberg tables, including compaction, snapshots, and orphan data management. The data compaction optimizer constantly monitors table partitions and kicks off the compaction process when the threshold is exceeded for the number of files and file sizes.

The Iceberg table compaction process starts and will continue if the table or any of the partitions within the table has more than the configured number of files (default five files), each smaller than 75% of the target file size. The snapshot retention process runs periodically (default daily) to identify and remove snapshots that are older than the specified retention configuration from the table properties, while keeping the most recent snapshots up to the configured limit. Similarly, the orphan file deletion process scans the table metadata and the actual data files, identifies the unreferenced files, and deletes them to reclaim storage space. These storage optimizations can help you reduce metadata overhead, control storage costs, and improve query performance.

Although automatic table optimization has simplified day-to-day Iceberg table maintenance tasks, certain industries and customers have advanced requirements to access their Iceberg tables from specific virtual private clouds (VPCs). This access control is needed for not only data ingestion and querying, but also for table maintenance.

To help achieve such requirements, we provide the capability where the Data Catalog optimizes Iceberg tables to run in your specific VPC. This post demonstrates how it works with step-by-step instructions.

How the table optimizer works with AWS Glue network connection

By default, a table optimizer is not associated with any of your VPCs and subnets. With this new capability of supporting data access from VPCs, you can associate a table optimizer with an AWS Glue network connection to run in a specific VPC, subnet, and security group. An AWS Glue network connection is commonly used to run an AWS Glue job with a specific VPC, subnet, and security group. The following diagram illustrates how it works.

In the next sections, we demonstrate how to configure a table optimizer with an AWS Glue network connection.

Prerequisites

To run through this instruction, you must have the following prerequisites:

Set up resources with AWS CloudFormation

This post includes a sample AWS CloudFormation template that enables a quick setup of the solution resources. You can review and customize the template to suit your needs.

The CloudFormation template generates the following resources:

  • An Amazon Simple Storage Service (Amazon S3) bucket to store the dataset, AWS Glue job scripts, and so on. (See Appendix 1 at the end of this post for manual instructions.)
  • A Data Catalog database.
  • An AWS Glue job that creates and modifies sample customer data in your S3 bucket with a trigger every 10 minutes.
  • AWS IAM roles and policies.
  • A VPC, public subnet, two private subnets, internet gateway, and route tables.
  • Amazon Virtual Private Cloud (Amazon VPC) endpoints for AWS Glue, AWS Lake Formation, Amazon CloudWatch, Amazon S3, and AWS Security Token Service (AWS STS). The endpoint names are as follows:
    • AWS Gluecom.amazonaws.<region>.glue (for example, com.amazonaws.us-east-1.glue).
    • Lake Formationcom.amazonaws.<region>.lakeformation (only if tables are registered with Lake Formation).
    • CloudWatchcom.amazonaws.<region>.monitoring.
    • Amazon S3com.amazonaws.<region>.s3.
    • AWS STScom.amazonaws.<region>.sts.
  • An AWS Glue network connection configured with the VPC and subnet. (See Appendix 2 at the end of this post for manual instructions.)

To launch the CloudFormation stack, complete the following steps:

  1. Sign in to the AWS CloudFormation console.
  2. Choose Launch Stack.
    Launch Stack
  3. Choose Next.
  4. For SubnetAz1, choose your preferred Availability Zone.
  5. For SubnetAz2, choose your preferred Availability Zone. This needs to be different from SubnetAz1.
  6. Leave the other parameters as default or make appropriate changes based on your requirements, then choose Next.
  7. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  8. Choose Create.

This stack can take around 5–10 minutes to complete, after which you can view the deployed stack on the AWS CloudFormation console.

Configure automatic table optimization with an AWS Glue network connection

Complete following steps to configure automatic table optimization with an AWS Glue network connection:

  1. On the AWS Glue console, choose Databases in the navigation pane.
  2. Choose iceberg_optimizer_vpc_db.
  3. Under Tables, choose customer.
  4. On the Table optimization – new tab, choose Enable optimization.

  1. For Optimization configuration, choose Customize settings.
  2. For IAM role, choose the iceberg-optimizer-vpc-MyGlueTableOptimizerRole-xxx role created by the CloudFormation stack.
  3. For Virtual private cloud (VPC) – optional, choose myvpc_private_network_connection.

  1. Select I acknowledge that expired data will be deleted as part of the optimizers and choose Enable optimization.

Now the table optimizer has been configured with your VPC. After a while, you can see how the optimizer worked.

  1. Under Table optimization – new, choose View optimization history on the Actions menu.

You can confirm that the table optimizer worked successfully for this Iceberg table.

You have now seen how to set up the table optimizer with an AWS Glue network connection to run it through a specific VPC.

Clean up

When you have finished all the preceding steps, remember to clean up all the AWS resources you created using AWS CloudFormation:

  1. Delete the S3 bucket storing the Iceberg table and the AWS Glue job script.
  2. Delete the CloudFormation stack.

Conclusion

This post demonstrated how the Data Catalog supports automatic optimization of Iceberg tables through your VPC. With this enhancement, you can simplify table maintenance for your Iceberg tables under advanced security requirements. This feature is available today in all AWS Glue supported AWS Regions.

Try out this solution for your own use case, and share your feedback and questions in the comments.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his new road bike.

Paul Villena is an Analytics Solutions Architect in AWS with expertise in building modern data and analytics solutions to drive business value. He works with customers to help them harness the power of the cloud. His areas of interest are infrastructure as code, serverless technologies, and coding in Python.

Justin Lin is a software engineer on the AWS Lake Formation team. He works on delivering managed optimization solutions for open table formats to enhance customer data management and query performance. In his spare time, he enjoys playing tennis.

Himani Desai is a Software Engineer on the AWS Lake Formation team. She works on providing managed optimization solutions for Iceberg tables.

Abishek Shankar is a software engineer on the AWS Lake Formation team, working on providing managed optimization solutions for Iceberg tables.

Shyam Rathi is a Software Development Manager on the AWS Lake Formation team, working on delivering new features and enhancements related to modern data lakes.

Sandeep Adwankar is a Senior Product Manager at AWS. Based in the California Bay Area, he works with customers around the globe to translate business and technical requirements into products that enable customers to improve how they manage, secure, and access data.


Appendix 1: Configure your S3 bucket to allow access only from a specific VPC

The instructions provided in this post help you configure your S3 bucket automatically through the CloudFormation template, but you can also manually configure your S3 bucket to allow access only from a specific VPC. This is an optional step to simulate the strict security regulation on your Iceberg table. Complete following steps:

  1. On the Amazon S3 console, choose Buckets in the navigation pane.
  2. Choose your S3 bucket.
  3. Choose Permissions.
  4. Under Bucket policy, choose Edit.
  5. Enter following bucket policy:
{
    "Version": "2012-10-17",
    "Id": "S3BucketPolicyVPCAccessOnly",
    "Statement": [
        {
            "Sid": "DenyIfNotFromAllowedVPC",
            "Effect": "Deny",
            "Principal": "*",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket",
                "s3:PutObject"
            ],
            "Resource": [
                "arn:aws:s3:::<your-bucket-name>",
                "arn:aws:s3:::<your-bucket-name>/*"
            ],
            "Condition": {
                "StringNotEquals": {
                    "aws:SourceVpc": "<your-vpc-id>",
                    "aws:PrincipalArn": [
                        "arn:aws:iam::<your-account-id>:role/<your-IAM-role-name>"
                    ]
                }
            }
        }
    ]
}
  1. Choose Save changes.

Now this S3 bucket prevents any data operations not from the VPC. You can try uploading files to the bucket through Amazon S3 console to see that this operation fails as expected.

Appendix 2: Create an AWS Glue network connection

You can also can manually configure the AWS Glue network connection with the following steps:

  1. On the AWS Glue console, choose Data connections in the navigation pane.
  2. Under Connections, choose Create connection.
  3. Select Network, and choose Next.
  4. For VPC, choose your VPC created by the CloudFormation stack. The VPC ID is shown on the Outputs tab of the CloudFormation stack.
  5. For Subnet, choose your private subnet created by the CloudFormation stack. The subnet ID is shown on the Outputs tab of the CloudFormation stack.
  6. For Security groups, choose your security group created by the CloudFormation stack. The security group ID is shown on the Outputs tab of the CloudFormation stack.
  7. Choose Next.
  8. For Name, enter myvpc_private_network_connection.
  9. Choose Next.
  10. Review the configurations and choose Create connection.

Enrich your AWS Glue Data Catalog with generative AI metadata using Amazon Bedrock

Post Syndicated from Manos Samatas original https://aws.amazon.com/blogs/big-data/enrich-your-aws-glue-data-catalog-with-generative-ai-metadata-using-amazon-bedrock/

Metadata can play a very important role in using data assets to make data driven decisions. Generating metadata for your data assets is often a time-consuming and manual task. By harnessing the capabilities of generative AI, you can automate the generation of comprehensive metadata descriptions for your data assets based on their documentation, enhancing discoverability, understanding, and the overall data governance within your AWS Cloud environment. This post shows you how to enrich your AWS Glue Data Catalog with dynamic metadata using foundation models (FMs) on Amazon Bedrock and your data documentation.

AWS Glue is a serverless data integration service that makes it straightforward for analytics users to discover, prepare, move, and integrate data from multiple sources. Amazon Bedrock is a fully managed service that offers a choice of high-performing FMs from leading AI companies like AI21 Labs, Anthropic, Cohere, Meta, Mistral AI, Stability AI, and Amazon through a single API.

Solution overview

In this solution, we automatically generate metadata for table definitions in the Data Catalog by using large language models (LLMs) through Amazon Bedrock. First, we explore the option of in-context learning, where the LLM generates the requested metadata without documentation. Then we improve the metadata generation by adding the data documentation to the LLM prompt using Retrieval Augmented Generation (RAG).

AWS Glue Data Catalog

This post uses the Data Catalog, a centralized metadata repository for your data assets across various data sources. The Data Catalog provides a unified interface to store and query information about data formats, schemas, and sources. It acts as an index to the location, schema, and runtime metrics of your data sources.

The most common method to populate the Data Catalog is to use an AWS Glue crawler, which automatically discovers and catalogs data sources. When you run the crawler, it creates metadata tables that are added to a database you specify or the default database. Each table represents a single data store.

Generative AI models

LLMs are trained on vast volumes of data and use billions of parameters to generate outputs for common tasks like answering questions, translating languages, and completing sentences. To use an LLM for a specific task like metadata generation, you need an approach to guide the model to produce the outputs you expect.

This post shows you how to generate descriptive metadata for your data with two different approaches:

  • In-context learning
  • Retrieval Augmented Generation (RAG)

The solutions uses two generative AI models available in Amazon Bedrock: for text generation and Amazon Titan Embeddings V2 for text retrieval tasks.

The following sections describe the implementation details of each approach using the Python programming language. You can find the accompanying code in the GitHub repository. You can implement it step by step in Amazon SageMaker Studio and JupyterLab or your own environment. If you’re new to SageMaker Studio, check out the Quick setup experience, which allows you to launch it with default settings in minutes. You can also use the code in an AWS Lambda function or your own application.

Approach 1: In-context learning

In this approach, you use an LLM to generate the metadata descriptions. You employ prompt engineering techniques to guide the LLM on the outputs you want it to generate. This approach is ideal for AWS Glue databases with a small number of tables. You can send the table information from the Data Catalog as context in your prompt without exceeding the context window (the number of input tokens that most Amazon Bedrock models accept). The following diagram illustrates this architecture.

Approach 2: RAG architecture

If you have hundreds of tables, adding all of the Data Catalog information as context to the prompt may lead to a prompt that exceeds the LLM’s context window. In some cases, you may also have additional content such as business requirements documents or technical documentation you want the FM to reference before generating the output. Such documents can be several pages that typically exceed the maximum number of input tokens most LLMs will accept. As a result, they can’t be included in the prompt as they are.

The solution is to use a RAG approach. With RAG, you can optimize the output of an LLM so it references an authoritative knowledge base outside of its training data sources before generating a response. RAG extends the already powerful capabilities of LLMs to specific domains or an organization’s internal knowledge base, without the need to fine-tune the model. It is a cost-effective approach to improving LLM output, so it remains relevant, accurate, and useful in various contexts.

With RAG, the LLM can reference technical documents and other information about your data before generating the metadata. As a result, the generated descriptions are expected to be richer and more accurate.

The example in this post ingests data from a public Amazon Simple Storage Service (Amazon S3): s3://awsglue-datasets/examples/us-legislators/all. The dataset contains data in JSON format about US legislators and the seats that they have held in the U.S. House of Representatives and U.S. Senate. The data documentation was retrieved from and the Popolo specification http://www.popoloproject.com/.

The following architecture diagram illustrates the RAG approach.

 

The steps are as follows:

  1. Ingest the information from the data documentation. The documentation can be in a variety of formats. For this post, the documentation is a website.
  2. Chunk the contents of the HTML page of the data documentation. Generate and store vector embeddings for the data documentation.
  3. Fetch information for the database tables from the Data Catalog.
  4. Perform a similarity search in the vector store and retrieve the most relevant information from the vector store.
  5. Build the prompt. Provide instructions on how to create metadata and add the retrieved information and the Data Catalog table information as context. Because this is a rather small database, containing six tables, all of the information about the database is included.
  6. Send the prompt to the LLM, get the response, and update the Data Catalog.

Prerequisites

To follow the steps in this post and deploy the solution in your own AWS account, refer to the GitHub repository.

You need the following prerequisite resources:

 {
   "Version": "2012-10-17",
    "Statement": [
        {
          "Effect": "Allow",
          "Action": [
              "s3:GetObject",
              "s3:PutObject"
          ],
          "Resource": [
              "arn:aws:s3:::aws-gen-ai-glue-metadata-*/*"
          ]
        }
    ]
}
  • An IAM role for your notebook environment. The IAM role should have the appropriate permissions for AWS Glue, Amazon Bedrock, and Amazon S3. The following is an example policy. You can apply additional conditions to restrict it further for your own environment.
{
      "Version": "2012-10-17",
      "Statement": [
           {
                 "Sid": "GluePermissions",
                 "Effect": "Allow",
                 "Action": [
                      "glue:GetCrawler",
                      "glue:DeleteDatabase",
                      "glue:GetTables",
                      "glue:DeleteCrawler",
                      "glue:StartCrawler",
                      "glue:CreateDatabase",
                      "glue:UpdateTable",
                      "glue:DeleteTable",
                      "glue:UpdateCrawler",
                      "glue:GetTable",
                      "glue:CreateCrawler"
                 ],
                 "Resource": "*"
           },
           {
                 "Sid": "S3Permissions",
                 "Effect": "Allow",
                 "Action": [
                      "s3:PutObject",
                      "s3:GetObject",
                      "s3:CreateBucket",
                      "s3:ListBucket",
                      "s3:DeleteObject",
                      "s3:DeleteBucket"
                 ],
                 "Resource": "arn:aws:s3:::<bucket_name>"
           },
           {
                 "Sid": "IAMPermissions",
                 "Effect": "Allow",
                 "Action": "iam:PassRole",
                 "Resource": "arn:aws:iam::<account_ID>:role/GlueCrawlerRoleBlog"

           },
           {
                 "Sid": "BedrockPermissions",
                 "Effect": "Allow",
                 "Action": "bedrock:InvokeModel",
                 "Resource": [
                      "arn:aws:bedrock:*::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0",
                      "arn:aws:bedrock:*::foundation-model/amazon.titan-embed-text-v2:0"
                 ]
           }
      ]
}
  • Model access for Anthropic’s Claude 3 and Amazon Titan Text Embeddings V2 on Amazon Bedrock.
  • The notebook glue-catalog-genai_claude.ipynb.

Set up the resources and environment

Now that you have completed the prerequisites, you can switch to the notebook environment to run the next steps. First, the notebook will create the required resources:

  • S3 bucket
  • AWS Glue database
  • AWS Glue crawler, which will run and automatically generate the database tables

After you finish the setup steps, you will have an AWS Glue database called legislators.

The crawler creates the following metadata tables:

  • persons
  • memberships
  • organizations
  • events
  • areas
  • countries

This is a semi-normalized collection of tables containing legislators and their histories.

Follow the rest of the steps in the notebook to complete the environment setup. It should only take a few minutes.

Inspect the Data Catalog

Now that you have completed the setup, you can inspect the Data Catalog to familiarize yourself with it and the metadata it captured. On the AWS Glue console, choose Databases in the navigation pane, then open the newly created legislators database. It should contain six tables, as shown in the following screenshot:

You can open any table to inspect the details. The table description and comment for each column is empty because they aren’t completed automatically by the AWS Glue crawlers.

You can use the AWS Glue API to programmatically access the technical metadata for each table. The following code snippet uses the AWS Glue API through the AWS SDK for Python (Boto3) to retrieve tables for a chosen database and then prints them on the screen for validation. The following code, found in the notebook of this post, is used to get the data catalog information programmatically.

def get_alltables(database):
    tables = []
    get_tables_paginator = glue_client.get_paginator('get_tables')
    for page in get_tables_paginator.paginate(DatabaseName=database):
        tables.extend(page['TableList'])
    return tables

def json_serial(obj):
    if isinstance(obj, (datetime, date)):
        return obj.isoformat()
    raise TypeError ("Type %s not serializable" % type(obj))

database_tables =  get_alltables(database)

for table in database_tables:
    print(f"Table: {table['Name']}")
    print(f"Columns: {[col['Name'] for col in table['StorageDescriptor']['Columns']]}")

Now that you’re familiar with the AWS Glue database and tables, you can move to the next step to generate table metadata descriptions with generative AI.

Generate table metadata descriptions with Anthropic’s Claude 3 using Amazon Bedrock and LangChain

In this step, we generate technical metadata for a selected table that belongs to an AWS Glue database. This post uses the persons table. First, we get all the tables from the Data Catalog and include it as part of the prompt. Even though our code aims to generate metadata for a single table, giving the LLM wider information is useful because you want the LLM to detect foreign keys. In our notebook environment we install LangChain v0.2.1. See the following code:

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from botocore.config import Config
from langchain_aws import ChatBedrock

glue_data_catalog = json.dumps(get_alltables(database),default=json_serial)


model_kwargs ={
    "temperature": 0.5, # You can increase or decrease this value depending on the amount of randomness you want injected into the response. A value closer to 1 increases the amount of randomness.
    "top_p": 0.999
}

model = ChatBedrock(
    client = bedrock_client,
    model_id=model_id,
    model_kwargs=model_kwargs
)

table = "persons"
response_get_table = glue_client.get_table( DatabaseName = database, Name = table )
pprint.pp(response_get_table)

user_msg_template_table="""
I'd like you to create metadata descriptions for the table called {table} in your AWS Glue data catalog. Please follow these steps:
1. Review the data catalog carefully
2. Use all the data catalog information to generate the table description
3. If a column is a primary key or foreign key to another table mention it in the description.
4. In your response, reply with the entire JSON object for the table {table}
5. Remove the DatabaseName, CreatedBy, IsRegisteredWithLakeFormation, CatalogId,VersionId,IsMultiDialectView,CreateTime, UpdateTime.
6. Write the table description in the Description attribute
7. List all the table columns under the attribute "StorageDescriptor" and then the attribute Columns. Add Location, InputFormat, and SerdeInfo
8. For each column in the StorageDescriptor, add the attribute "Comment". If a table uses a composite primary key, then the order of a given column in a table’s primary key is listed in parentheses following the column name.
9. Your response must be a valid JSON object.
10. Ensure that the data is accurately represented and properly formatted within the JSON structure. The resulting JSON table should provide a clear, structured overview of the information presented in the original text.
11. If you cannot think of an accurate description of a column, say 'not available'
Here is the data catalog json in <glue_data_catalog></glue_data_catalog> tags.
<glue_data_catalog>
{data_catalog}
</glue_data_catalog>
Here is some additional information about the database in <notes></notes> tags.
<notes>
Typically foreign key columns consist of the name of the table plus the id suffix
<notes>
"""
messages = [
    ("system", "You are a helpful assistant"),
    ("user", user_msg_template_table),
]

prompt = ChatPromptTemplate.from_messages(messages)

chain = prompt | model | StrOutputParser()

# Chain Invoke

TableInputFromLLM = chain.invoke({"data_catalog": {glue_data_catalog}, "table":table})
print(TableInputFromLLM)

In the preceding code, you instructed the LLM to provide a JSON response that fits the TableInput object expected by the Data Catalog update API action. The following is an example response:

{
  "Name": "persons",
  "Description": "This table contains information about individual persons, including their names, identifiers, contact details, and other relevant personal data.",
  "StorageDescriptor": {
    "Columns": [
      {
        "Name": "family_name",
        "Type": "string",
        "Comment": "The family name or surname of the person."
      },
      {
        "Name": "name",
        "Type": "string",
        "Comment": "The full name of the person."
      },
      {
        "Name": "links",
        "Type": "array<struct<note:string,url:string>>",
        "Comment": "An array of links related to the person, containing a note and URL."
      },
      {
        "Name": "gender",
        "Type": "string",
        "Comment": "The gender of the person."
      },
      {
        "Name": "image",
        "Type": "string",
        "Comment": "A URL or path to an image of the person."
      },
      {
        "Name": "identifiers",
        "Type": "array<struct<scheme:string,identifier:string>>",
        "Comment": "An array of identifiers for the person, each with a scheme and identifier value."
      },
      {
        "Name": "other_names",
        "Type": "array<struct<lang:string,note:string,name:string>>",
        "Comment": "An array of other names the person may be known by, including the language, a note, and the name itself."
      },

      {
        "Name": "sort_name",
        "Type": "string",
        "Comment": "The name to be used for sorting or alphabetical ordering."
      },
      {
        "Name": "images",
        "Type": "array<struct<url:string>>",
        "Comment": "An array of URLs or paths to additional images of the person."
      },
      {
        "Name": "given_name",
        "Type": "string",
        "Comment": "The given name or first name of the person."
      },
      {
        "Name": "birth_date",
        "Type": "string",
        "Comment": "The date of birth of the person."
      },
      {
        "Name": "id",
        "Type": "string",
        "Comment": "The unique identifier for the person (likely a primary key)."
      },
      {
        "Name": "contact_details",
        "Type": "array<struct<type:string,value:string>>",
        "Comment": "An array of contact details for the person, including the type (e.g., email, phone) and the value."
      },
      {
        "Name": "death_date",
        "Type": "string",
        "Comment": "The date of death of the person, if applicable."
      }
    ],
    "Location": "s3://<your-s3-bucket>/persons/",
    "InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
    "SerdeInfo": {
      "SerializationLibrary": "org.openx.data.jsonserde.JsonSerDe",
      "Parameters": {
        "paths": "birth_date,contact_details,death_date,family_name,gender,given_name,id,identifiers,image,images,links,name,other_names,sort_name"
      }
    }
  },
  "PartitionKeys": [],
  "TableType": "EXTERNAL_TABLE"
}

You can also validate the JSON generated to make sure it conforms to the format expected by the AWS Glue API:

from jsonschema import validate

schema_table_input = {
    "type": "object",
    "properties" : {
            "Name" : {"type" : "string"},
            "Description" : {"type" : "string"},
            "StorageDescriptor" : {
            "Columns" : {"type" : "array"},
            "Location" : {"type" : "string"} ,
            "InputFormat": {"type" : "string"} ,
            "SerdeInfo": {"type" : "object"}
        }
    }
}
validate(instance=json.loads(TableInputFromLLM), schema=schema_table_input)

Now that you have generated table and column descriptions, you can update the Data Catalog.

Update the Data Catalog with metadata

In this step, use the AWS Glue API to update the Data Catalog:

response = glue_client.update_table(DatabaseName=database, TableInput= json.loads(TableInputFromLLM) )
print(f"Table {table} metadata updated!")

The following screenshot shows the persons table metadata with a description.

The following screenshot shows the table metadata with column descriptions.

Now that you have enriched the technical metadata stored in Data Catalog, you can improve the descriptions by adding external documentation.

Improve metadata descriptions by adding external documentation with RAG

In this step, we add external documentation to generate more accurate metadata. The documentation for our dataset can be found online as an HTML. We use the LangChain HTML community loader to load the HTML content:

from langchain_community.document_loaders import AsyncHtmlLoader

# We will use an HTML Community loader to load the external documentation stored on HTLM
urls = ["http://www.popoloproject.com/specs/person.html", "http://docs.everypolitician.org/data_structure.html",'http://www.popoloproject.com/specs/organization.html','http://www.popoloproject.com/specs/membership.html','http://www.popoloproject.com/specs/area.html']
loader = AsyncHtmlLoader(urls)
docs = loader.load()

After you download the documents, split the documents into chunks:

text_splitter = CharacterTextSplitter(
    separator='\n',
    chunk_size=1000,
    chunk_overlap=200,

)
split_docs = text_splitter.split_documents(docs)

embedding_model = BedrockEmbeddings(
    client=bedrock_client,
    model_id=embeddings_model_id
)

Next, vectorize and store the documents locally and perform a similarity search. For production workloads, you can use a managed service for your vector store such as Amazon OpenSearch Service or a fully managed solution for implementing the RAG architecture such as Amazon Bedrock Knowledge Bases.

vs = FAISS.from_documents(split_docs, embedding_model)
search_results = vs.similarity_search(
    'What standards are used in the dataset?', k=2
)
print(search_results[0].page_content)

Next, include the catalog information along with the documentation to generate more accurate metadata:

from operator import itemgetter
from langchain_core.callbacks import BaseCallbackHandler
from typing import Dict, List, Any


class PromptHandler(BaseCallbackHandler):
    def on_llm_start( self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any) -> Any:
        output = "\n".join(prompts)
        print(output)

system = "You are a helpful assistant. You do not generate any harmful content."
# specify a user message
user_msg_rag = """
Here is the guidance document you should reference when answering the user:

<documentation>{context}</documentation>
I'd like to you create metadata descriptions for the table called {table} in your AWS Glue data catalog. Please follow these steps:

1. Review the data catalog carefully.
2. Use all the data catalog information and the documentation to generate the table description.
3. If a column is a primary key or foreign key to another table mention it in the description.
4. In your response, reply with the entire JSON object for the table {table}
5. Remove the DatabaseName, CreatedBy, IsRegisteredWithLakeFormation, CatalogId,VersionId,IsMultiDialectView,CreateTime, UpdateTime.
6. Write the table description in the Description attribute. Ensure you use any relevant information from the <documentation>
7. List all the table columns under the attribute "StorageDescriptor" and then the attribute Columns. Add Location, InputFormat, and SerdeInfo
8. For each column in the StorageDescriptor, add the attribute "Comment". If a table uses a composite primary key, then the order of a given column in a table’s primary key is listed in parentheses following the column name.
9. Your response must be a valid JSON object.
10. Ensure that the data is accurately represented and properly formatted within the JSON structure. The resulting JSON table should provide a clear, structured overview of the information presented in the original text.
11. If you cannot think of an accurate description of a column, say 'not available'
<glue_data_catalog>
{data_catalog}
</glue_data_catalog>
Here is some additional information about the database in <notes></notes> tags.
<notes>
Typically foreign key columns consist of the name of the table plus the id suffix
<notes>
"""
messages = [
    ("system", system),
    ("user", user_msg_rag),
]
prompt = ChatPromptTemplate.from_messages(messages)

# Retrieve and Generate
retriever = vs.as_retriever(
    search_type="similarity",
    search_kwargs={"k": 3},
)

chain = (  
    {"context": itemgetter("table")| retriever, "data_catalog": itemgetter("data_catalog"), "table": itemgetter("table")}
    | prompt
    | model
    | StrOutputParser()
)

TableInputFromLLM = chain.invoke({"data_catalog":glue_data_catalog, "table":table})
print(TableInputFromLLM)

The following is the response from the LLM:

{
  "Name": "persons",
  "Description": "This table contains information about individual persons, including their names, identifiers, contact details, and other personal information. It follows the Popolo data specification for representing persons involved in government and organizations. The 'person_id' column relates a person to an organization through the 'memberships' table.",
  "StorageDescriptor": {
    "Columns": [
      {
        "Name": "family_name",
        "Type": "string",
        "Comment": "The family or last name of the person."
      },
      {
        "Name": "name",
        "Type": "string",
        "Comment": "The full name of the person."
      },
      {
        "Name": "links",
        "Type": "array<struct<note:string,url:string>>",
        "Comment": "An array of links related to the person, with a note and URL for each link."
      },
      {
        "Name": "gender",
        "Type": "string",
        "Comment": "The gender of the person."
      },
      {
        "Name": "image",
        "Type": "string",
        "Comment": "A URL or path to an image representing the person."
      },
      {
        "Name": "identifiers",
        "Type": "array<struct<scheme:string,identifier:string>>",
        "Comment": "An array of identifiers for the person, with a scheme and identifier value for each."
      },
      {
        "Name": "other_names",
        "Type": "array<struct<lang:string,note:string,name:string>>",
        "Comment": "An array of other names the person may be known by, with language, note, and name for each."
      },
      {
        "Name": "sort_name",
        "Type": "string",
        "Comment": "The name to be used for sorting or alphabetical ordering of the person."
      },
      {
        "Name": "images",
        "Type": "array<struct<url:string>>",
        "Comment": "An array of URLs or paths to additional images representing the person."
      },
      {
        "Name": "given_name",
        "Type": "string",
        "Comment": "The given or first name of the person."
      },
      {
        "Name": "birth_date",
        "Type": "string",
        "Comment": "The date of birth of the person."
      },
      {
        "Name": "id",
        "Type": "string",
        "Comment": "The unique identifier for the person. This is likely a primary key."
      },
      {
        "Name": "contact_details",
        "Type": "array<struct<type:string,value:string>>",
        "Comment": "An array of contact details for the person, with a type and value for each."
      },
      {
        "Name": "death_date",
        "Type": "string",
        "Comment": "The date of death of the person, if applicable."
      }
    ],
    "Location": "s3:<your-s3-bucket>/persons/",
    "InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
    "SerdeInfo": {
      "SerializationLibrary": "org.openx.data.jsonserde.JsonSerDe"
    }
  }
}

Similar to the first approach, you can validate the output to make sure it conforms to the AWS Glue API.

Update the Data Catalog with new metadata

Now that you have generated the metadata, you can update the Data Catalog:

response = glue_client.update_table(DatabaseName=database, TableInput= json.loads(TableInputFromLLM) )
print(f"Table {table} metadata updated!")

Let’s inspect the technical metadata generated. You should now see a newer version in the Data Catalog for the persons table. You can access schema versions on the AWS Glue console.

Note the persons table description this time. It should differ slightly from the descriptions provided earlier:

  • In-context learning table description – “This table contains information about persons, including their names, identifiers, contact details, birth and death dates, and associated images and links. The ‘id’ column is the primary key for this table.”
  • RAG table description – “This table contains information about individual persons, including their names, identifiers, contact details, and other personal information. It follows the Popolo data specification for representing persons involved in government and organizations. The ‘person_id’ column relates a person to an organization through the ‘memberships’ table.”

The LLM demonstrated knowledge around the Popolo specification, which was part of the documentation provided to the LLM.

Clean up

Now that you have completed the steps described in the post, don’t forget to clean up the resources with the code provided in the notebook so you don’t incur unnecessary costs.

Conclusion

In this post, we explored how you can use generative AI, specifically Amazon Bedrock FMs, to enrich the Data Catalog with dynamic metadata to improve the discoverability and understanding of existing data assets. The two approaches we demonstrated, in-context learning and RAG, showcase the flexibility and versatility of this solution. In-context learning works well for AWS Glue databases with a small number of tables, whereas the RAG approach uses external documentation to generate more accurate and detailed metadata, making it suitable for larger and more complex data landscapes. By implementing this solution, you can unlock new levels of data intelligence, empowering your organization to make more informed decisions, drive data-driven innovation, and unlock the full value of your data. We encourage you to explore the resources and recommendations provided in this post to further enhance your data management practices.


About the Authors

Manos Samatas is a Principal Solutions Architect in Data and AI with Amazon Web Services. He works with government, non-profit, education and healthcare customers in the UK on data and AI projects, helping build solutions using AWS. Manos lives and works in London. In his spare time, he enjoys reading, watching sports, playing video games and socialising with friends.

Anastasia Tzeveleka is a Senior GenAI/ML Specialist Solutions Architect at AWS. As part of her work, she helps customers across EMEA build foundation models and create scalable generative AI and machine learning solutions using AWS services.

Ingest telemetry messages in near real time with Amazon API Gateway, Amazon Data Firehose, and Amazon Location Service

Post Syndicated from Srini Ponnada original https://aws.amazon.com/blogs/big-data/ingest-telemetry-messages-in-near-real-time-with-amazon-api-gateway-amazon-data-firehose-and-amazon-location-service/

Many organizations specializing in communications and navigation surveillance technologies are required to support multi-modal transportation supply chain markets such as road, water, air, space, and rail. One common use case is provisioning of emergency alerts services for multiple government agencies.

These organizations use third-party satellite-powered terminal devices for remote monitoring using telemetry and NMEA-0183 formatted messages generated in near real time. This post demonstrates how to implement a satellite-based remote alerting and response solution on the AWS Cloud to provide time-critical alerts and actionable insights, with a focus on telemetry message ingestion and alerts. Key services in the solution include Amazon API Gateway, Amazon Data Firehose, and Amazon Location Service.

The challenge

In the event of a disaster e.g. water flood, there is usually a lack of terrestrial data connectivity that prevents monitoring stations from taking actionable measures in real time. In the space analytics domain, many organizations deploy satellite-powered terminals on these monitoring stations.

These terminal devices transmit telemetry and NMEA-0183 formatted messages to a satellite network managed by a third-party entity, which is subsequently traversed down to an API endpoint.

Our AWS-powered solution aims to capture, enrich, and ingest satellite-powered telemetry messages as well as deliver alerts in near real time. This solution is based on AWS serverless services such as API Gateway, Data Firehose, and Amazon Simple Storage Service (Amazon S3), and is able to scale to more than a million terminal devices transmitting an hourly state of health telemetry message over the satellite.

Solution overview

This telemetry message processing begins with an API endpoint created using API Gateway, securing HTTPS transmission over a satellite network. This endpoint receives raw JSON messages and responds with an HTTP 200 success code. We take advantage of the direct integration between API Gateway and Data Firehose to ingest these messages into Amazon S3 in near real time. The default message reception limit on an API Gateway endpoint is 10,000 messages per second, which can be increased upon request.

Upon receiving messages through API Gateway, Data Firehose batches them into 60-second intervals or 1 MB size files, whichever comes first, and delivers them to Amazon S3. This configuration enables near real-time processing, which is essential for timely alerts and responses. We use the built-in features of Data Firehose, including AWS Lambda for necessary data transformation and Amazon Simple Notification Service (Amazon SNS) for near real-time alerts. Additionally, Data Firehose converts JSON data to Parquet format before delivering it to Amazon S3, optimizing data consumption by tools like Amazon Athena, which are ideal for partitioned data formats.

To maintain up-to-date data, an AWS Glue crawler reads and updates the AWS Glue Data Catalog from transformed Parquet files. This crawler runs one time a day by default to optimize costs, but you can adjust its schedule to meet varying end-user requirements.

We use an AWS CloudFormation template to implement the solution architecture, as illustrated in the following diagram.

Cloudformation template to implement the solution architecture

Cloudformation template to implement the solution architecture

For this post, we deliver sample JSON formatted telemetry messages to an API Gateway endpoint test interface to simulate the satellite-powered terminal device functionality. API Gateway integrates with Data Firehose, which uses Lambda to perform the following actions in near real time:

  1. Parse the message and decode the data blob from base64 encoding to utf-8. Most third-party satellite-powered terminal devices transmit messages in an encoded format and require decoding to a standard readable format such as utf-8.
  2. Use Amazon Location and append with location specifics (such as street, city, and ZIP) based on the latitude and longitude of the terminal device.
  3. Detect if the solar panel battery of the terminal device is lower than the defined threshold and generate an alert through Amazon SNS to the user-provided email address. For simplicity, the CloudFormation template creates an SNS topic within the same account instead of a cross-account consumer application. You must subscribe to the topic using an email received at the provided email address.
  4. Ingest the messages in an S3 bucket received in 1 minute or aggregate to 1 MB size files.

The solution uses the following key services:

  • Amazon API Gateway – API Gateway is a fully managed service that makes it straightforward developers to create, publish, maintain, monitor, and secure APIs at any scale. APIs act as the entry point for applications to access data, business logic, or functionality from your backend services.
  • Amazon Data Firehose – Data Firehose is an extract, transform, and load (ETL) service that reliably captures, transforms, and delivers streaming data to data lakes, data stores, and analytics services.
  • AWS Glue – The AWS Glue Data Catalog is your persistent technical metadata store in the AWS Cloud. Each AWS account has one Data Catalog per AWS Region. Each Data Catalog is a highly scalable collection of tables organized into databases. A table is metadata representation of a collection of structured or semi-structured data stored in sources such as Amazon Relational Database Service (Amazon RDS), Apache Hadoop Distributed File System (HDFS), Amazon OpenSearch Service, and others.
  • IAM – With AWS Identity and Access Management (IAM), you can specify who or what can access services and resources in AWS, centrally manage fine-grained permissions, and analyze access to refine permissions across AWS.
  • AWS Lambda – Lambda is a serverless, event-driven compute service that lets you run code for virtually any type of application or backend service without provisioning or managing servers. You can invoke Lambda functions from over 200 AWS services and software as a service (SaaS) applications, and only pay for what you use.
  • Amazon Location Service – Location Service makes it straightforward for developers to add location functionality, such as maps, points of interest, geocoding, routing, tracking, and geofencing, to their applications without sacrificing data security and user privacy.
  • Amazon S3 – Amazon S3 is an object storage service offering industry-leading scalability, data availability, security, and performance. Customers of all sizes and industries can store and protect any amount of data for virtually any use case, such as data lakes, cloud-centered applications, and mobile apps.
  • Amazon SNS – Amazon SNS sends notifications two ways: application-to-application (A2A) and application-to-person (A2P). A2A provides high-throughput, push-based, many-to-many messaging between distributed systems, microservices, and event-driven serverless applications. These applications include Amazon Simple Queue Service (SQS), Data Firehose, Lambda, and other HTTPS endpoints. A2P functionality lets you send messages to your customers with SMS texts, push notifications, and email.

Deploy the solution

AWS CloudFormation creates the API Gateway endpoint, Data Firehose delivery stream, Lambda function, Amazon Location index, SNS topic, S3 bucket, and AWS Glue database, table, and crawler. To deploy the solution, launch the CloudFormation stack and provide the following parameters:

  • S3 bucket name – The bucket that stores terminal device messages ingested in near real time by the Data Firehose delivery stream
  • Email address – The email of the user to subscribe for SNS alerts
  • Database name – The name of the AWS Glue database

Test the solution

The following is a sample JSON state of health telemetry message transmitted by a terminal device:

{
  "packetId": 29957891,
  "deviceType": 1,
  "deviceId": 6113,
  "userApplicationId": 65535,
  "organizationId": 65681,
  "data": "eyJsbiI6LTEwNC45NTUsInNpIjowLjAsImJpIjowLjIxMiwic3YiOjAuMDA4LCJsdCI6MzkuNTc1MiwiYnYiOjMuNzI4LCJkIjoxNjU4NzQ1MzM2LCJuIjo2NjksImEiOjE3MzguMCwicyI6NS4wLCJjIjozMjAuMCwiciI6LTEwMSwidGkiOjAuMDM2fQ==",
  "len": 142,
  "status": 0,
  "hiveRxTime": "2022-07-25T13:03:29"
}

The data blob in the preceding sample telemetry message is encoded in base64. The following chart explains the metadata of each key indicating state of health and location of the terminal device.

Parameter Key Sample Value Notes
Longitude ln -104.955 Negative = Westing from PM
Solar Panel Current si 0.176 (Amps)
Battery Current bi 0.228 (Amps)
Solar Panel Voltage sv 19.088 (Volts)
Latitude lt 39.5751 Positive = Northing from Equator
Battery Voltage bv 4.12 (Volts) Full charge ~4.12V Dead ~ 3.3V
Date and Time d 1658248415 Epoch Seconds
Number of Messages Sent Since Last Power Cycle n 531
Altitude a 1721.0 (Meters) GPS value
Speed s 1.0 (km/h) Stationary terminal reports non-zero value
Course: c 139.0 (degrees) Nautical heading convention
Last RSSI Value r -100 (dBm) >-90 = marginal link.
Modem Current ti 0.04 (Amps)

These telemetry messages can vary based on the default configuration of the device terminal manufacturer or user definitions.

To demonstrate the capability of the solution, we send the sample telemetry message to the API Gateway endpoint through its test interface, as shown in the following screenshot.

sample telemetry message

Sending sample telemetry message

After about a minute, you should see the delivered message to Amazon S3 through Data Firehose in the stage folder.

delivered message to Amazon S3

Delivered message to Amazon S3

You should also receive an SNS alert at the provided email address.

SNS alert message

SNS alert message

To see the results in Athena, we crawl this data with the AWS Glue crawler created by the CloudFormation template. By default, the crawler is scheduled daily to reflect newer records for the day in the stage table.

AWS Glue crawler execution

AWS Glue crawler execution

After the data is crawled successfully, you can query the results in Athena.

Query the results in Athena

Query results in Athena

Best practices and considerations

Keep in mind the following best practices when implementing this solution:

  • Make sure API Gateway is protected using an API key or other authorization method
  • Adhere to the least privilege principle for all created users and roles to mitigate potential security breaches
  • Conduct load testing of the solution using an API simulator tailored to your specific use case
  • Automate the solution using the AWS Cloud Development Kit (AWS CDK), AWS CloudFormation, or your preferred infrastructure as code (IaC) tools

Additionally, Data Firehose now supports zero buffering. For more information, refer to Amazon Kinesis Data Firehose now supports zero buffering.

Conclusion

In this post, we provided a proof of concept to implement a satellite-based remote alerting and response solution to provide time-critical alerts and actionable insights, for use cases in the space analytics domain. Make sure to adhere to AWS best practices and your organizational security policies before deploying this solution in a production environment.

Try out the solution for your own use case, and let us know your feedback and questions in the comments section.


About the authors

Srini Ponnada is a Sr. Data Architect at AWS. He has helped customers build scalable data warehousing and big data solutions for over 20 years. He loves to design and build efficient end-to-end solutions on AWS. In his spare time, he loves walking, and playing Tennis.

Munim Abbasi is currently a Sr. Data Architect at AWS with more than ten years of experience in Data & Analytics domain. Leveraging his core competencies in data architecture, design and engineering, he strives to make his customers empowered through their data by helping them deploy scalable cloud solutions adhering to AWS best practices. Outside of work, he holds great love for music, strength training and family.

Vivek Shrivastava is a Principal Data Architect, Data Lake in AWS Professional Services. He is a big data enthusiast and holds 14 AWS Certifications. He is passionate about helping customers build scalable and high-performance data analytics solutions in the cloud. In his spare time, he loves reading and finds areas for home automation.

Expand data access through Apache Iceberg using Delta Lake UniForm on AWS

Post Syndicated from Tomohiro Tanaka original https://aws.amazon.com/blogs/big-data/expand-data-access-through-apache-iceberg-using-delta-lake-uniform-on-aws/

The landscape of big data management has been transformed by the rising popularity of open table formats such as Apache Iceberg, Apache Hudi, and Linux Foundation Delta Lake. These formats, designed to address the limitations of traditional data storage systems, have become essential in modern data architectures. As organizations adopt various open table formats to suit their specific needs, the demand for interoperability between these formats has grown significantly. This interoperability is crucial for enabling seamless data access, reducing data silos, and fostering a more flexible and efficient data ecosystem.

Delta Lake UniForm is an open table format extension designed to provide a universal data representation that can be efficiently read by different processing engines. It aims to bridge the gap between various data formats and processing systems, offering a standardized approach to data storage and retrieval. With UniForm, you can read Delta Lake tables as Apache Iceberg tables. This expands data access to broader options of analytics engines.

This post explores how to start using Delta Lake UniForm on Amazon Web Services (AWS). You can learn how to query Delta Lake native tables through UniForm from different data warehouses or engines such as Amazon Redshift as an example of expanding data access to more engines.

How Delta Lake UniForm works

UniForm allows other table format clients such as Apache Iceberg to access Delta Lake tables. Under the hood, UniForm generates Iceberg metadata files (including metadata and manifest files) that are required for Iceberg clients to access the underlying data files in Delta Lake tables. Both Delta Lake and Iceberg metadata files reference the same data files. UniForm generates multiple table format metadata without duplicating the actual data files. When an Iceberg client reads a UniForm table, it first accesses the Iceberg metadata files for the UniForm table, which then allows the Iceberg client to read the underlying data files.

There are two options to use UniForm:

  • Create a new Delta Lake table with UniForm
  • Enable UniForm on your existing Delta Lake table

First, to create a new Delta Lake table enabling UniForm, you configure table properties for UniForm in a CREATE TABLE DDL query. The table properties are 'delta.universalFormat.enabledFormats'='iceberg' and 'delta.enableIcebergCompatV2'='true'. When these options are set to the CREATE TABLE query, Iceberg metadata files are generated along with Delta Lake metadata files. In addition to these options, Delta Lake table protocol versions that define supported features by the table such as delta.minReaderVersion and delta.minWriterVersion are required to be set to 2 and 7 or more respectively. For more information about the table protocol versions, refer to What is a table protocol specification? in Delta Lake public document. Appendix 1. Create a new Delta Lake table with UniForm shows an example query to create a new Delta Lake UniForm table.

You can also enable UniForm on an existing Delta Lake table. This option is suitable if you have Delta Lake tables in your environment. Enabling UniForm doesn’t affect your current operations on the Delta Lake tables. To enable UniForm on a Delta Lake table, run REORG TABLE db.existing_delta_lake_table APPLY (UPGRADE UNIFORM(ICEBERG_COMPAT_VERSION=2)). After running this query, Delta Lake automatically generates Iceberg metadata files for the Iceberg client. In the example in this post, you run this option and enable UniForm after you create a Delta Lake table.

For the information about enabling UniForm, refer to Enable Delta Lake UniForm in the Delta Lake public document. Note that the extra package (delta-iceberg) is required to create a UniForm table in AWS Glue Data Catalog. The extra package is also required to generate Iceberg metadata along with Delta Lake metadata for the UniForm table.

Example use case

A fictional company built a data lake with Delta Lake on Amazon Simple Storage Service (Amazon S3) that’s mainly used through Amazon Athena. According to its usage expansion, this company wants to expand data access to cloud-based data warehouses such as Amazon Redshift for flexible analytics use cases.

There are a few challenges to achieve this requirement. Delta Lake isn’t natively supported in Amazon Redshift. For those data warehouses, Delta Lake tables need to be converted to manifest tables, which requires additional operational overhead. You need to run the GENERATE command on Spark or use a crawler in AWS Glue to generate manifest tables, and you need to sync those manifest tables every time the Delta tables are updated.

Delta Lake UniForm can be a solution to meet this requirement. With Delta Lake UniForm, you can make the Delta Table compatible with the other open table formats such as Apache Iceberg, which is natively supported in Amazon Redshift. Users can query those Delta Lake tables as Iceberg tables through UniForm.

The following diagram describes the architectural overview to achieve that requirement.

bdb4538_solution-overview

In this tutorial, you create a Delta Lake table with a synthetic review dataset that includes different products and customer reviews and enable UniForm on that Delta Lake table to make it accessible from Amazon Redshift. Each component works as follows in this scenario:

  • Amazon EMR (Amazon EMR on EC2 cluster with Apache Spark): An Apache Spark application on an Amazon EMR cluster creates a Delta Lake table and enables UniForm on it. Only Delta Lake client can write to the Delta Lake UniForm table, making Amazon EMR act as a writer.
  • Amazon Redshift: Amazon Redshift uses Iceberg clients to read records from the Delta Lake UniForm table. It’s limited to reading records from the table and cannot write to it.
  • Amazon S3 and AWS Glue Data Catalog: These are used to manage the underlying files and the catalog of the Delta Lake UniForm table. The data and metadata files for the table are stored in an S3 bucket. The table is registered in AWS Glue Data Catalog.

Set up resources

In this section, you complete the following resource setup:

  1. Launch an AWS CloudFormation template to configure resources such as S3 buckets, an Amazon Virtual Private Cloud (Amazon VPC) and a subnet, a database for Delta Lake in Data Catalog, AWS Identity and Access Management (IAM) policy and role with required permissions for Amazon EMR Studio, and an EC2 instance profile for Amazon EMR on EC2 cluster
  2. Launch an Amazon EMR on EC2 cluster
  3. Create an Amazon EMR Studio Workspace
  4. Upload a Jupyter Notebook on Amazon EMR Studio Workspace
  5. Launch a CloudFormation template to configure Amazon Redshift Serverless and relevant subnets

Launch a CloudFormation template to configure basic resources

You use a provided CloudFormation template to set up resources to build Delta Lake UniForm environments. The template creates the following resources.

  • An S3 bucket to store the Delta Lake table data
  • An S3 bucket to store an Amazon EMR Studio Workspace metadata and configuration files
  • An IAM role for Amazon EMR Studio
  • An EC2 instance profile for Amazon EMR on EC2 cluster
  • VPC and subnet for an Amazon EMR on EC2 cluster
  • A database for a Delta Lake table in Data Catalog

Complete the following steps to deploy the resources.

  1. Choose Launch stack:

  1. For Stack name, enter delta-lake-uniform-on-aws. For the Parameters, DeltaDatabaseName, PublicSubnetForEMRonEC2, and VpcCIDRForEMRonEC2 are set by default. You can also change the default values. Then, choose Next.
  2. Choose Next.
  3. Choose I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  4. Choose Submit.
  5. After the stack creation is complete, check the Outputs. The resource values are used in the following sections and in the Appendices.

Launch an Amazon EMR on EC2 cluster

Complete the following steps to create an Amazon EMR on EC2 cluster.

  1. Open the Amazon EMR on EC2 console.
  2. Choose Create cluster.
  3. Enter delta-lake-uniform-blog-post in Name and confirm choosing emr-7.3.0 as its release label.
  4. For Application bundle, select Spark 3.5.1Hadoop 3.3.6 and JupyterEnterpriseGateway 2.6.0.
  5. For AWS Glue Data Catalog settings, enable Use for Spark table metadata.
  6. For Networking, enter the values from the CloudFormation Outputs tab for VpcForEMR and PublicSubnetForEMR into Virtual private cloud (VPC) andSubnet respectively. For EC2 security groups, keep Create ElasticMapReduce-Primary for Primary node, and Create ElasticMapReduce-Core for Core and task nodes. The security groups for the Amazon EMR primary and core nodes are automatically created.
  7. For Cluster logs, enter s3://<DeltaLakeS3Bucket>/emr-cluster-logs as the Amazon S3 location. Replace <DeltaLakeS3Bucket> with the S3 bucket from the CloudFormation stack Outputs tab.
  8. For Software settings, select Load JSON from Amazon S3 and enter s3://aws-blogs-artifacts-public/artifacts/BDB-4538/config.json as the Amazon S3 location.
  9. For Amazon EMR service role in Identity and Access Management (IAM) roles section, choose Create a service role. Then, set default to Security Group. If there are existing security groups for Amazon EMR Primary and Core or Task nodes, set those security groups to Security Group.
  10. For EC2 instance profile for Amazon EMR, choose Choose an existing instance profile, and set EMRInstanceProfileRole to Instance profile.
  11. After reviewing the configuration, choose Create cluster.
  12. After the cluster status is Waiting on the Amazon EMR console, the cluster setup is complete (it approximately takes 10 minutes).

Create an Amazon EMR Studio Workspace

Complete the following steps to create an Amazon EMR Studio Workspace to use Delta Lake UniForm on Amazon EMR on EC2.

  1. Open Amazon EMR Studio console.
  2. Choose Create Studio.
  3. For Setup options, choose Custom.
  4. For Studio settings, enter delta-lake-uniform-studio as the Studio name.
  5. For S3 location for Workspace storage, choose Select existing location as a Workspace storage. The S3 location (s3://aws-emr-studio-<ACCOUNT_ID>-<REGION>-delta-lake-uniform-on-aws) can be obtained from EMRStudioS3Bucket on the CloudFormation Outputs tab. Then, choose EMRStudioRole as the IAM role (you can find the IAM Role name on the CloudFormation Outputs tab).
  6. For Workspace settings, enter delta-lake-workspace as the Workspace name.
  7. In Networking and security, choose the VPC ID and Subnet ID that you created in Launch an AWS CloudFormation template. You can obtain the VPC ID and Subnet ID from the VpcForEMR and PublicSubnetForEMR keys on the CloudFormation Outputs tab respectively.
  8. After reviewing the settings, choose Create Studio and launch Workspace.
  9. After creating the Studio Workspace is complete, you are redirected to Jupyter Notebook.

Upload Jupyter Notebook

Complete the following steps to configure a Jupyter Notebook to use Delta Lake UniForm with Amazon EMR.

  1. Download delta-lake-uniform-on-aws.ipynb.
  2. Choose the arrow icon at the top of the page and upload the Notebook you just downloaded.

  1. Choose and open the notebook (delta-lake-uniform-on-aws.ipynb) you uploaded in the left pane.
  2. After the notebook is opened, choose EMR Compute in the navigation pane.

  1. Attach the Amazon EMR on EC2 cluster you created in the previous section. Choose EMR on EC2 cluster and set the cluster you created previously to EMR on EC2 cluster, then choose Attach.
  2. After attaching the cluster is successful, Cluster is attached to the Workspace is displayed on the console.

Create a workgroup and a namespace for Amazon Redshift Serverless

For this step, you configure a workgroup and a namespace for Amazon Redshift Serverless to run queries on a Delta Lake UniForm table. You also configure two subnets in the same VPC created by the CloudFormation stack delta-lake-uniform-on-aws. To deploy the resources, complete the following steps:

  1. Choose Launch stack:

  1. For Stack name, enter redshift-serverless-for-delta-lake-uniform.
  2. For Parameters, enter the Availability Zone and an IP range for each subnet. The VPC ID is automatically retrieved from the CloudFormation stack you created in Launch an AWS CloudFormation template to configure basic resources. If you change the default subnet, note that at least one subnet needs to be the same subnet you created for the Amazon EMR on EC2 cluster (by default, the subnet for Amazon EMR on EC2 cluster is automatically retrieved during this CloudFormation stack creation). You can check the subnet for the cluster on the CloudFormation Outputs Then, choose Next.
  3. Choose Next again, and then choose Submit.
  4. After the stack creation is complete, check the CloudFormation Outputs Make a note of the two Subnet IDs on the Outputs tab to use later in Run queries from Amazon Redshift against the UniForm table.

Now you’re ready to use Delta Lake UniForm on Amazon EMR.

Enable Delta Lake UniForm

Start by creating a Delta Lake table that contains the customer review dataset. After creating the table, run REORG query to enable UniForm on the Delta Lake table.

Create a Delta Lake table

Complete the following steps to create a Delta Lake table based on a customer review dataset and review the table metadata.

  1. Return to the Jupyter Notebook connected to the Amazon EMR on EC2 cluster and run the following cell to add delta-iceberg.jar to use UniForm and configure the spark extension.

  1. Initialize the SparkSession. The following configuration is necessary to use Iceberg through UniForm. Before running the code, replace <DeltaLakeS3Bucket> with the name of the S3 bucket for Delta Lake, which you can find on the CloudFormation stack Outputs tab.

bdb4538-2_2-notebook

  1. Create a Spark DataFrame from customer reviews.

bdb4538-2_3-notebook_df

  1. Create a Delta Lake table with the customer reviews dataset. This step takes approximately 5 minutes.

bdb4538-2_4-notebook

  1. Run DESCRIBE EXTENDED {DB_TBL} in the next cell to review the table. The output includes the table schema, location, table properties, and so on.

bdb4538-2_5-notebook

The Delta Lake table creation is complete. Next, enable UniForm on this Delta Lake table.

Run REORG query to enable UniForm

To allow an Iceberg client to access the Delta Lake table you created, enable UniForm on the table. You can also create a new Delta Lake table with UniForm enabled. For more information, see Appendix 1 at the end of this post. To enable UniForm and review the table metadata, complete the following steps.

  1. Run the following query to enable UniForm on the Delta Lake table. To enable UniForm on an existing Delta Lake table, you run REORG query against the table.

bdb4539-3_1-uniform_reorg

  1. Run DESCRIBE EXTENDED {DB_TBL} in the next cell to review the table metadata and compare it from before and after enabling UniForm. The new properties, such as delta.enableIcebergCompatV2=true and delta.universalFormat.enabledFormats=iceberg, are added to the table properties.

bdb4538-3_2-uniform_metadata

  1. Run aws s3 ls s3://<DeltaLakeS3Bucket>/warehouse/ --recursive to confirm if the Iceberg table metadata is created. Replace <DeltaLakeS3Bucket> with the S3 bucket from the CloudFormation Outputs tab. The following screenshot shows the command output of table metadata and data files. You can confirm that Delta Lake UniForm generates both Iceberg metadata and Delta Lake metadata files as indicated by the red rectangles below.

bdb4538-3_3-uniform_metadata

  1. Before querying the Delta Lake UniForm table from an Iceberg client, run the following analytic query for the Delta Lake UniForm table from Amazon EMR on EC2 side, and review the reviews count by each product category.

bdb4538-3_4-uniform_query

  1. The query result shows the output of the reviews count by product_category:

Enabling UniForm on the Delta Lake table is complete, and now you can query the Delta Lake table from an Iceberg client. Next, you query the Delta Lake table as an Iceberg table from Amazon Redshift.

Run queries against the UniForm table from Amazon Redshift

In the previous section, you enabled UniForm on your existing Delta Lake table. This allows you to run queries on a Delta Lake table as if it were an Iceberg table from Amazon Redshift. In this section, you run an analytic query on the UniForm table using Amazon Redshift Serverless and add records with a new product category to the UniForm table through the Jupyter Notebook connected to the Amazon EMR on EC2 cluster. Then, you verify the added records with another analytic query from Amazon Redshift. You can confirm that Delta Lake UniForm enables Amazon Redshift to query the Delta Lake table through this section.

Query the UniForm table from Amazon Redshift Serverless

  1. Open Amazon Redshift Serverless console
  2. In Namespaces/Workgroups, select the delta-lake-uniform-namespace that you created using the CloudFormation stack.
  3. Choose Query data on the right top corner to open the Amazon Redshift query editor.
  4. After opening the editor, select the delta-lake-uniform-workgroup workgroup in the left pane.
  5. Choose Create connection.
  6. After you successfully create a connection, you can see the delta_uniform_db database and customer_review table you created in the left pane of the editor.
  7. Copy and paste the following analytic query to the editor and choose Run.
SELECT product_category, count(*) as count_by_product_category 
FROM "awsdatacatalog"."delta_uniform_db"."customer_reviews"
GROUP BY product_category ORDER BY count_by_product_category DESC
  1. The editor shows the same result of the review count by product_category as you obtained from Jupyter Notebook in Run REORG query to enable UniForm.

bdb4538-4_1-rs_result

Add new product category records into Delta Lake UniForm table from Amazon EMR

Go back to the Jupyter Notebook on Amazon EMR Workspace to add new records with a new product category (Books) into the Delta Lake UniForm table. After adding the records, query the UniForm table again from Amazon Redshift Serverless.

  1. On the Jupyter Notebook, go to Add new product category records into the UniForm table and run the following cell to load new records.

bdb4538-4_2-append_books

  1. Run the following cell and review the five records with Books as the product category. The following screenshot shows the output of this code.

bdb4538-4_3-append_listbooks

  1. Add the new reviews with Books product category. This takes around 2 minutes.

bdb4538-4_4-append_append

In the next section, you run a query on the UniForm table from Amazon Redshift Serverless to check if the new records with the Books product category have been added.

Review the added records in Delta Lake UniForm table from Amazon Redshift Serverless

To check if the result output includes the records of Books product category:

  1. On the query editor of Amazon Redshift, run the following query and check if the result output includes the records of Books product category.
SELECT product_category, count(*) as count_by_product_category 
FROM "awsdatacatalog"."delta_uniform_db"."customer_reviews"
GROUP BY product_category ORDER BY count_by_product_category DESC
  1. The following screenshot shows the output of the query you ran in the previous step. You can confirm the new product category Books has been added to the table from Amazon Redshift side.

bdb4538-4_5-rs-result
Now you can query from Amazon Redshift against the Delta Lake table by enabling Delta Lake UniForm.

Clean up resources

To clean up your resources, complete the following steps:

  1. In the Amazon EMR Workspaces console, choose Actions and then Delete to delete the workspace.
  2. Choose Delete to delete the Studio.
  3. In the Amazon EMR on EC2 console, choose Terminate to delete the Amazon EMR on EC2 cluster.
  4. In the Amazon S3 console, choose Empty to delete all objects in the following S3 buckets.
    1. The S3 bucket for Amazon EMR Studio such as aws-emr-studio-<ACCOUNT_ID>-<REGION>-delta-lake-uniform-on-aws. Replace <ACCOUNT_ID> and <REGION> with your account ID and the bucket’s region.
    2. The S3 bucket for Delta Lake tables such as delta-lake-uniform-on-aws-deltalakes3bucket-abcdefghijk.
  5. After you confirm the two buckets are empty, delete the CloudFormation stack redshift-serverless-for-delta-lake-uniform.
  6. After the first CloudFormation stack has been deleted, delete the CloudFormation stack delta-lake-uniform-on-aws.

Conclusion

Delta Lake UniForm on AWS represents an advancement in addressing the challenges of data interoperability and accessibility in modern big data architectures. By enabling Delta Lake tables to be read as Apache Iceberg tables, UniForm expands data access capabilities, allowing organizations to use a broader range of analytics engines and data warehouses such as Amazon Redshift.

The practical implications of this technology are substantial, offering new possibilities for data analysis and insights across diverse platforms. As organizations continue to navigate the complexities of big data, solutions like Delta Lake UniForm that promote interoperability and reduce data silos will become increasingly valuable.

By adopting these advanced open table formats and using cloud platforms such as AWS, organizations can build more robust and efficient data ecosystems. This approach not only enhances the value of existing data assets but also fosters a more agile and adaptable data strategy, ultimately driving innovation and improving decision-making processes in our data-driven world.

Appendix 1: Create a new Delta Lake table with UniForm

You can create a Delta Lake table with UniForm enabled using the following DDL.

CREATE TABLE IF NOT EXISTS delta_uniform_db.customer_reviews_create (
   marketplace string,
   customer_id string,
   review_id string,
   product_id string,
   product_title string,
   star_rating bigint,
   helpful_votes bigint,
   total_votes bigint,
   insight string,
   review_headline string,
   review_body string,
   review_date timestamp,
   review_year bigint,
   product_category string)
USING delta
TBLPROPERTIES ( 
   'delta.universalFormat.enabledFormats'='iceberg',
   'delta.enableIcebergCompatV2'='true',
   'delta.minReaderVersion'='2',
   'delta.minWriterVersion'='7')

Appendix 2: Run queries from Snowflake against the UniForm table

Delta Lake UniForm also allows you to run queries on a Delta Lake table from Snowflake. In this section, you run the same analytic query on the UniForm table using Snowflake as you previously did using Amazon Redshift Serverless in Run queries from Amazon Redshift against the UniForm table. Then you confirm that the query results from Snowflake match the results obtained from the Amazon Redshift Serverless query.

Configure IAM roles for Snowflake to access AWS Glue Data Catalog and Amazon S3

To query the Delta Lake UniForm table in Data Catalog from Snowflake, the following configurations are required.

  1. IAM roles: Create IAM roles for Snowflake to access Data Catalog and Amazon S3.
  2. Data Catalog integration with Snowflake: Snowflake provides two catalog options for Iceberg tables such as Using Snowflake as the Iceberg catalog and Using an external catalog such as Data Catalog. In this post, you choose AWS Glue Data Catalog as an external catalog. For information about the catalog options, refer to Iceberg catalog options in the Snowflake public documentation.
  3. An external volume creation for Amazon S3: To access the UniForm table from Snowflake, an external volume for Amazon S3 needs to be configured. With this configuration, Snowflake can connect the S3 bucket that you created for Iceberg tables. For information about the external volume, refer to Configure an external volume for Iceberg tables.

Create IAM roles for Snowflake to access AWS Glue Data Catalog and Amazon S3

Create the following two IAM roles for Snowflake to access AWS Glue Data Catalog and Amazon S3.

  • SnowflakeIcebergGlueCatalogRole: This IAM role is used for Snowflake to access the Delta Lake UniForm table in AWS Glue Data Catalog.
  • SnowflakeIcebergS3Role: This IAM role is used for Snowflake to access the table’s underlying data in the S3 bucket.

To configure the IAM roles, complete the following steps:

  1. Choose Launch stack:

  1. Enter snowflake-iceberg as the stack name and choose Next.
  2. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  3. Choose Submit.
  4. After the stack creation is complete, check the CloudFormation Outputs tab. Make a note of the names and ARNs of the two IAM roles, which are used in the following section.

Create an AWS Glue Data Catalog Integration

Create a catalog integration for AWS Glue Data Catalog. For more information about the catalog integration for AWS Glue Data Catalog, refer to Configure a catalog integration for AWS Glue in the Snowflake public documentation. To configure the catalog integration, complete the following steps:

  1. Access your Snowflake account and open an empty worksheet (query editor).
  2. Run the following query and create a catalog integration with AWS Glue Data Catalog. Replace <YOUR_ACCOUNT_ID> with the IAM role ARN from the snowflake-iceberg CloudFormation Ouputs tab, and replace <REGION> with the region of AWS Glue Data Catalog.
CREATE CATALOG INTEGRATION glue_catalog_integration
  CATALOG_SOURCE=GLUE
  CATALOG_NAMESPACE='delta_uniform_db'
  TABLE_FORMAT=ICEBERG
  GLUE_AWS_ROLE_ARN='arn:aws:iam::<YOUR_ACCOUNT_ID>:role/SnowflakeIcebergGlueCatalogRole'
  GLUE_CATALOG_ID='<YOUR_ACCOUNT_ID>'
  GLUE_REGION='<REGION>'
  ENABLED=TRUE;
  1. Retrieve GLUE_AWS_IAM_USER_ARN and GLUE_AWS_EXTERNAL_ID by using DESCRIBE CATALOG INTEGRATION glue_catalog_integration in the editor. The output is similar to the following:
+------------------------------------------------------------------------------------------------------------------------------+
| property                 | property_type | property_value                                                 | property_default |
|--------------------------+---------------+----------------------------------------------------------------+------------------|
| ENABLED                  | Boolean       | true                                                           | false            |
| CATALOG_SOURCE           | String        | GLUE                                                           |                  |
| CATALOG_NAMESPACE        | String        | delta_uniform_db                                               |                  |
| TABLE_FORMAT             | String        | ICEBERG                                                        |                  |
| REFRESH_INTERVAL_SECONDS | Integer       | 30                                                             | 30               |
| GLUE_AWS_ROLE_ARN        | String        | arn:aws:iam::123456789012:role/SnowflakeIcebergGlueCatalogRole |                  |
| GLUE_CATALOG_ID          | String        | 123456789012                                                   |                  |
| GLUE_REGION              | String        | us-east-1                                                      |                  |
| GLUE_AWS_IAM_USER_ARN    | String        | arn:aws:iam::123456789012:user/<ID>                            |                  |
| GLUE_AWS_EXTERNAL_ID     | String        | An external ID specified on the IAM Role trust relationships   |                  |
| COMMENT                  | String        |                                                                |                  |
+------------------------------------------------------------------------------------------------------------------------------+
  1. Update the IAM role you created using the CloudFormation stack to enable Snowflake to access AWS Glue Data Catalog using that IAM role. Open Trust Relationships of SnowflakeIcebergGlueCatalogRole on the IAM console, choose Edit and update the trust relationship using the following policy. Replace <GLUE_AWS_IAM_USER_ARN> and <GLUE_AWS_EXTERNAL_ID> with the names you obtained in the previous step.
{
   "Version": "2012-10-17",
   "Statement": [
	  {
		 "Sid": "",
		 "Effect": "Allow",
		 "Principal": {
			"AWS": "<GLUE_AWS_IAM_USER_ARN>"
		 },
		 "Action": "sts:AssumeRole",
		 "Condition": {
			"StringEquals": {
			   "sts:ExternalId": "<GLUE_AWS_EXTERNAL_ID>"
			}
		 }
	  }
   ]
}

You completed setting up the IAM role for Snowflake to access your Data Catalog resources. Next, configure the IAM role for Amazon S3 access.

Register Amazon S3 as an external volume

In this section, you configure an external volume for Amazon S3. Snowflake accesses the UniForm table data files in S3 through the external volume. For the configuration of an external volume for S3, refer to Configure an external volume for Amazon S3 in Snowflake public documentation. To configure the external volume, complete the following steps:

  1. In the query editor, run the following query to create an external volume for the Delta Lake S3 bucket. Replace <DeltaLakeS3Bucket> with the name of the S3 bucket that you created in Launch a CloudFormation template to configure basic resources from the CloudFormation Outputs tab. Replace <ACCOUNT_ID> with your AWS account ID.
CREATE OR REPLACE EXTERNAL VOLUME delta_lake_uniform_s3
   STORAGE_LOCATIONS =
	  (
		 (
			NAME = 'delta-lake-uniform-on-aws'
			STORAGE_PROVIDER = 'S3'
			STORAGE_BASE_URL = 's3://<DeltaLakeS3Bucket>'
			STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::<ACCOUNT_ID>:role/SnowflakeIcebergS3Role'
		 )
	  );
  1. Retrieve STORAGE_AWS_IAM_USER_ARN and STORAGE_AWS_EXTERNAL_ID by running DESCRIBE EXTERNAL VOLUME delta_lake_uniform_s3 in the editor. The output is similar to the following:
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| parent_property   | property           | property_type | property_value                                                                                                                       | property_default |
|-------------------+--------------------+---------------+--------------------------------------------------------------------------------------------------------------------------------------+------------------|
|                   | ALLOW_WRITES       | Boolean       | true                                                                                                                                 | true             |
| STORAGE_LOCATIONS | STORAGE_LOCATION_1 | String        | {"NAME":"uniform_s3_location","STORAGE_PROVIDER":"S3","STORAGE_BASE_URL":"s3://<DeltaLakeS3Bucket>","STORAGE_ALLOWED_LOCATIONS":["s3 |                  |
|                   |                    |               | ://<DeltaLakeS3Bucket>/*"],"STORAGE_REGION":"us-east-1","PRIVILEGES_VERIFIED":true,"STORAGE_AWS_ROLE_ARN":"arn:aws:iam::123456789012 |                  |
|                   |                    |               | :role/SnowflakeIcebergS3Role","STORAGE_AWS_IAM_USER_ARN":"arn:aws:iam::123456789012:user/<ID>","STORAGE_AWS_EXTERNAL_ID":"<External  |                  |
|                   |                    |               | ID>",... }                                                                                                                           |                  |
| STORAGE_LOCATIONS | ACTIVE             | String        | uniform_s3_location                                                                                                                  |                  |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
  1. Update the IAM role you created using the CloudFormation template (in Create IAM roles for Snowflake to access AWS Glue Data Catalog and Amazon S3) to enable Snowflake to use this IAM role. Open Trust Relationships of SnowflakeIcebergS3Role on the IAM console, choose Edit, and update the trust relationship with the following policy. Replace <STORAGE_AWS_IAM_USER_ARN> and <STORAGE_AWS_EXTERNAL_ID> with the values from the previous step.
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "AWS": "<STORAGE_AWS_IAM_USER_ARN&>"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "<STORAGE_AWS_EXTERNAL_ID>"
        }
      }
    }
  ]
}

The next step is to create an Iceberg table to run queries from Snowflake.

Create an Iceberg table in Snowflake

In this section, you create an Iceberg table in Snowflake. The table is an entry point for Snowflake to access the Delta Lake UniForm table in AWS Glue Data Catalog. To create the table, complete the following steps:

  1. (Optional) If you don’t have a database in Snowflake, run CREATE DATABASE <DATABASE_NAME>, replacing <DATABASE_NAME> with a unique database name for the Iceberg table.
  2. Run the following query in the Snowflake query editor. In this case, the database delta_uniform_snow_db is chosen for the table. Configure the following parameters:
    1. EXTERNAL_VOLUME: created by CREATE OR REPLACE EXTERNAL VOLUME query in the previous section, such as delta_lake_uniform_s3.
    2. CATALOG: created by the CREATE CATALOG INTEGRATION query in the previous section, such as glue_catalog_integration.
    3. CATALOG_TABLE_NAME: the name of Delta Lake UniForm table you created in Data Catalog such as customer_reviews.

The complete query is below:

CREATE OR REPLACE ICEBERG TABLE customer_reviews_snow
  EXTERNAL_VOLUME='delta_lake_uniform_s3'
  CATALOG='glue_catalog_integration'
  CATALOG_TABLE_NAME='customer_reviews';

After the table creation is complete, you’re ready to query the UniForm table in AWS Glue Data Catalog from Snowflake.

Query the UniForm table from Snowflake

In this step, you query the UniForm table from Snowflake. Paste and run the following analytic query in the Snowflake query editor.

SELECT product_category, count(*) as count_by_product_category 
FROM customer_reviews_snow 
GROUP BY product_category ORDER BY count_by_product_category DESC

The query result shows the same output as you saw in Review the added records in Delta Lake UniForm table from Amazon Redshift Serverless section.

+--------------------------------------------------+
| PRODUCT_CATEGORY     | COUNT_BY_PRODUCT_CATEGORY |
|----------------------+---------------------------|
| Office_Products      | 9673711                   |
| Books                | 9672664                   |
| Apparel              | 6448747                   |
| Computers            | 3224215                   |
| Beauty_Personal_Care | 3223599                   |
+--------------------------------------------------+

Now you can query from Snowflake against the Delta Lake table by enabling Delta Lake UniForm.


About the Authors

Tomohiro Tanaka is a Senior Cloud Support Engineer at Amazon Web Services. He’s passionate about helping customers use Apache Iceberg for their data lakes on AWS. In his free time, he enjoys a coffee break with his colleagues and making coffee at home.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

How Volkswagen Autoeuropa built a data solution with a robust governance framework, simplifying access to quality data using Amazon DataZone

Post Syndicated from Dhrubajyoti Mukherjee original https://aws.amazon.com/blogs/big-data/how-volkswagen-autoeuropa-built-a-data-solution-with-a-robust-governance-framework-simplifying-access-to-quality-data-using-amazon-datazone/

This is a joint post co-authored with Martin Mikoleizig from Volkswagen Autoeuropa.

This second post of a two-part series that details how Volkswagen Autoeuropa, a Volkswagen Group plant, together with AWS, built a data solution with a robust governance framework using Amazon DataZone to become a data-driven factory. Part 1 of this series focused on the customer challenges, overall solution architecture and solution features, and how they helped Volkswagen Autoeuropa overcome their challenges. This post dives into the technical details, highlighting the robust data governance framework that enables ease of access to quality data using Amazon DataZone.

At Amazon, we work backward, a systematic way to vet ideas and create new products. The key tenet of this approach is to start by defining the customer experience, then iteratively work backward from that point until the team achieves clarity of thought around what to build. The first section of this post discusses how we aligned the technical design of the data solution with the data strategy of Volkswagen Autoeuropa. Next, we detail the governance guardrails of the Volkswagen Autoeuropa data solution. Finally, we highlight the key business outcomes.

Aligning the solution with the data strategy

At an early stage of the project, the Volkswagen Autoeuropa and AWS team identified that a data mesh architecture for the data solution aligns with the Volkswagen Autoeuropa’s vision of becoming a data-driven factory. With this in mind, the team implemented the following steps:

  • Define data domains – In a workshop, the team identified the data landscape and its distribution in Volkswagen Autoeuropa. Next, the team grouped the data assets of the organization along the lines of business and defined the data domains. Because Volkswagen Autoeuropa is at an early stage of their data mesh journey, defining data domains along the lines of business is the recommended approach. As the data solution evolves, Volkswagen Autoeuropa might consider other criteria such as business subdomains to define data domains. The team defined more than five data domains, such as production, quality, logistics, planning, and finance.
  • Identify pioneer cases – The team identified the pioneer use cases that onboard the data solution first, to validate its business value. The team identified two use cases. The first use case helps predict test results during the car assembly process. The second use case enables the creation of reports containing shop floor key metrics for different management levels. The following criteria were considered to identify these use cases:
    • Use cases that deliver measurable business value for Volkswagen Autoeuropa.
    • Use cases with high AWS maturity.
    • Use cases whose requirements can be met with the first release version of the data solution.
  • Onboard key data products – The team identified the key data products that enabled these two use cases and aligned to onboard them into the data solution. These data products belonged to data domains such as production, finance, and logistics. In addition, the team aligned on business metadata attributes that would help with data discovery. The data products are classified as either source-based data or consumer-based data. Source-based data is the unaltered, raw data that is generated from source systems (for example, quality data, safety data) and is useful for other business use cases. Consumer-based data is the aggregated and transformed data from source systems. Reuse of consumer-based data saves cost in extract, transform, and load (ETL) implementation and system maintenance.

In addition to the preceding steps, the team established a data quality framework to improve the quality of the data product registered in the data solution. The following table shows the mapping of the data mesh-based solution components to Amazon DataZone and AWS Glue features. The table also provides generic examples of the components in the automotive industry.

Data Solution Components AWS Service Features Generic Examples
Data domains Amazon DataZone projects and Amazon DataZone domain units Production, logistics
Use cases Amazon DataZone projects Smart manufacturing, predictive maintenance
Data products Amazon DataZone assets Sales data, sensor data
Business metadata Amazon DataZone glossaries and metadata forms Data product owner information, data refresh frequency
Data quality framework AWS Glue Data Quality  A quality score of 92%

Empowering teams with a governance framework

This section discusses the governance framework that was put in place to empower the teams at Volkswagen Autoeuropa by enhancing their analytics journey. It highlights the guardrails that enable ease of access to quality data.

Business metadata

Business metadata helps users understand the context of the data, which can lead to increased trust in the data. Moreover, establishing a common set of attributes of the data products promotes a consistent experience for the users. In addition to the business context, at Volkswagen Autoeuropa, the metadata includes information related to data classification and if the data contains personally identifiable information (PII). The data solution uses Amazon DataZone glossaries and metadata forms to provide business context to their data. Apart from the previous benefits, using the appropriate keywords in Amazon DataZone glossary terms and metadata forms can help with the search and filtering capability of data products in the Amazon DataZone data portal.

Data quality framework

The data quality framework is a comprehensive solution designed to streamline the process of data quality checks and publishing a quality score. It uses AWS Glue Data Quality to generate recommendation rulesets, run orchestrated jobs, store results, and send notifications. This framework can be seamlessly integrated into an AWS Glue job, providing a quality score for data pipeline jobs. The quality score of a data product is published in the Amazon DataZone data portal for consumers to evaluate. The key components of the solution are as follows:

  • Recommendation ruleset generation – The framework generates tailored rulesets based on metadata from the AWS Glue Data Catalog table, providing relevant and comprehensive quality checks.
  • Orchestrated job execution – Jobs are run in AWS Step Functions to perform data quality checks using the generated rulesets against data sources, evaluating data quality based on defined rules and criteria.
  • Result storage and notification – Results, including quality scores, quality status, and rulesets checked, are stored in an Amazon Simple Storage Service (Amazon S3) bucket, maintaining a historical record. End-users receive notifications with relevant details.
  • Data quality score publishing – The quality scores are published in the Amazon DataZone data portal, enabling consumers to access and evaluate data quality.
  • Subscription and quality score requirements – Consumers can subscribe to data sources or targets based on their desired quality score thresholds, making sure they receive data that meets their specific needs and standards.
  • Integration and extensibility – The framework is designed for seamless integration into existing AWS Glue jobs or data pipelines and provides a flexible and extensible architecture for customization and enhancement.

Federated governance

Federated governance empowers producer and consumer teams to operate independently while adhering to a central governance model. For the data solution at Volkswagen Autoeuropa, this meant a centralized team defined the governance guardrails and decentralized data teams employed those guardrails. The following are a few examples of how the team established federated governance in Volkswagen Autoeuropa:

  • Management of Amazon DataZone glossaries and metadata forms – In this mechanism, the Volkswagen Autoeuropa IT team defined the Amazon DataZone glossaries and metadata forms in a central manner. The data teams used them to publish the data assets in the Amazon DataZone. This provides consistency of business metadata across the organization. The following figure explains the process.
    The workflow in the Amazon DataZone data portal consists of the following steps:
    1. The data solution administrator belonging to the Volkswagen Autoeuropa IT team aligns with stakeholders such as data producers, data consumers, and source system owners, and maintains the business metadata using the Amazon DataZone glossaries and metadata forms.
    2. The producer project teams use the Amazon DataZone glossary terms and fill the Amazon DataZone metadata forms to enrich the inventory assets.
    3. After the business metadata is populated, the team publishes the assets in the Amazon DataZone data portal.
  • Management of Amazon DataZone project membership – In this scenario, the management of Amazon DataZone project membership is delegated to a designated administrator of the project. The following figure explains the process.
    The workflow consists of the following steps:
    1. The data solution administrator belonging to the Volkswagen Autoeuropa IT team provisions the Amazon DataZone project and environment using automation. The data solution administrator is the owner of the project.
    2. The data solution administrator delegates the management of the Amazon DataZone project membership to a designated administrator by assigning the owner role.
    3. The Amazon DataZone project administrator assigns the contributor role to eligible users.
    4. The users access the Amazon DataZone project and its assets from the Amazon DataZone data portal.

Authentication and authorization

The Amazon DataZone portal supports two types of authorizations: AWS Identity and Access Management (IAM) roles and AWS IAM Identity Center users. The data solution supports both of these authorization methods. The choice of authentication mechanism is a function of the type of authorization used for Amazon DataZone.

For IAM role authorization, an IAM role is created for each user, incorporating a prefix. Each data solution user role has a permission to list the Amazon DataZone domains (datazone:ListDomains) and to get the data portal login URL (datazone:GetIamPortalLoginUrl) in the Amazon DataZone AWS account. For reasons that are out of scope for this post, there could only be three SAML federated roles in an AWS account in the customer environment. As such, the team didn’t have a dedicated SAML federated role for each Amazon DataZone user. The data solution user role implemented a trust policy allowing the user’s AWS Security Token Service (AWS STS) federated user session principal Amazon Resource Name (ARN). If you don’t have limitations on the number of SAML federated roles per AWS account, you can make all data solution user roles SAML federated roles and update the trust policy accordingly.

For IAM Identity Center authorization, the configuration is done either at the AWS Organizations level or AWS account level in IAM Identity Center. Because there are currently no APIs available for identity source configuration in IAM Identity Center, the team followed the appropriate instructions to configure the identity source on the AWS Management Console.

After the chosen authorization option is activated, Amazon DataZone administrators grant the IAM principals (IAM role or IAM Identity Center user) access to the Amazon DataZone portal. For more details, refer to Manage users in the Amazon DataZone console.

Business outcomes

Volkswagen Autoeuropa and AWS established an iterative mechanism to enable the continuous growth of the data solution. This iterative improvement is expressed as a flywheel as shown in the following figure.

The outcome of each component of the flywheel powers the next component, creating a virtuous cycle. The data solution flywheel consists of five components:

  1. Data solution growth – The primary focus of the flywheel is to accelerate the growth of the data solution. This growth is measured by metrics such as number of data products, number of use cases onboarded into the solution, and number of users.
  2. Enhancing user experience – This component focuses on enhancing the user experience of the data solution. One way to measure the user experience is through user feedback surveys.
  3. Data solution use cases – Improved, positive user experience with the data solution contributes to the increased number of use cases that want to onboard the data solution.
  4. Data producers and consumers – As the number of use cases increases, so does the number of data producers and consumers. Data producers make data available to power the use cases. Data consumers use the data to drive the use cases.
  5. Selection of data products – After data producers onboard the data solution, they publish the assets in the Amazon DataZone data portal. This leads to a larger selection of data products. This, in turn, creates a positive experience for the data solution users.

In addition to the previous components, the positive user experience is reinforced by improving governance guardrails, increasing number of reusable assets, and maximizing operational excellence.

As of writing this post, Volkswagen Autoeuropa reduced the time to discover data from days to minutes using the data solution. This led to approximately 384 times improvement in data discovery time. Data access took several weeks before the Volkswagen Autoeuropa and AWS collaboration. With the help of the data solution powered by Amazon DataZone, the data access time was reduced to minutes. Overall, the data solution resulted in regaining between 48 hours and weeks of customer productivity over the course of a month.

The data solution powered by Amazon DataZone is driving measurable business impact for Volkswagen Autoeuropa. It enables Volkswagen Autoeuropa to deliver digital use cases faster, with less effort, and a higher overall quality. Volkswagen Autoeuropa believes that Amazon DataZone will be key in their journey to become a data-driven factory and to leverage AI.

Conclusion

This post explored how Volkswagen Autoeuropa built a robust and scalable data solution using Amazon DataZone. The first step was to align the solution with Volkswagen Autoeuropa’s overarching data strategy to drive business value.

The establishment of a comprehensive governance framework was central to this effort. This framework encompasses key components, such as business metadata, data quality, federated governance, access controls, and security, which maintain the trustworthiness and reliability of Volkswagen Autoeuropa’s data assets. The post highlighted the Volkswagen Autoeuropa data solution flywheel, showcasing how the solution enabled improved decision-making, increased operational efficiency, and accelerated digital transformation initiatives across the organization.

The data solution built at Volkswagen Autoeuropa is one of the first implementations within the Volkswagen Group and is a blueprint for other Volkswagen production plants.

“This project is a blueprint for other Volkswagen production plants. By involving the AWS team and using Amazon DataZone, we are able to govern our data centrally and make it accessible in an automated and secure way.”

– Daniel Madrid, Head of IT, Volkswagen Autoeuropa.

If you’re looking to harness the power of data mesh to drive innovation and business value within your organization, we’ve got you covered. In Strategies for building a data mesh-based enterprise solution on AWS, we dive deep into the key considerations and current recommendations to establish a robust, scalable, and well-governed data mesh on AWS. This documentation covers everything from aligning your data mesh with overall business strategy to implementing the data mesh strategy framework.

To get hands-on experience with real-world code examples, see our GitHub repository. This open source project provides a step-by-step blueprint for constructing a data mesh architecture using the powerful capabilities of Amazon DataZone, AWS Cloud Development Kit (AWS CDK), and AWS CloudFormation.


About the Authors

BDB-4558-DhrubaDhrubajyoti Mukherjee is a Cloud Infrastructure Architect with a strong focus on data strategy, data analytics, and data governance at AWS. He uses his deep expertise to provide guidance to global enterprise customers across industries, helping them build scalable and secure AWS solutions that drive meaningful business outcomes. Dhrubajyoti is passionate about creating innovative, customer-centric solutions that enable digital transformation, business agility, and performance improvement. An active contributor to the AWS community, Dhrubajyoti authors AWS Prescriptive Guidance publications, blog posts, and open source artifacts, sharing his insights and best practices with the broader community. Outside of work, Dhrubajyoti enjoys spending quality time with his family and exploring nature through his love of hiking mountains.

BDB-4558-RaviRavi Kumar is a Data Architect and Analytics expert at AWS, where he finds immense fulfilment in working with data. His days are dedicated to designing and analyzing complex data systems, uncovering valuable insights that drive business decisions. Outside of work, he unwinds by listening to music and watching movies, activities that allow him to recharge after a long day of data wrangling.

Martin Mikoleizig studied mechanical engineering and production technology at the RWTH Aachen University before starting to work in Dr. h.c. Ing. F. Porsche AG 2015 as a production planner for the engine assembly. Over several years as a Project Manager on Testing Technology for new engine models, he also introduced several innovations like human-machine collaborations and intelligent assistance systems. Starting in 2017, he was responsible for the shop floor IT team of the module lines in Zuffenhausen before he became responsible for the planning of the E-Drive assembly at Porsche. Additionally, he was responsible for the Digitalisation Strategy of the Production Ressort at Porsche. In October 2022, he was assigned to Volkswagen Autoeuropa in Portugal in the role of a Digital Transformation Manager for the plant, driving the digital transformation towards a data-driven factory.

BDB-4558-WeiWeizhou Sun is a Lead Architect at AWS, specializing in digital manufacturing solutions and IoT. With extensive experience in Europe, she has enhanced operational efficiencies, reducing latency and increasing throughput. Weizhou’s expertise includes industrial computer vision, predictive maintenance, and predictive quality, consistently delivering top performance and client satisfaction. A recognized thought leader in IoT and remote driving, she has contributed to business growth through innovations and open source work. Committed to knowledge sharing, Weizhou mentors colleagues and contributes to practice development. Known for her problem-solving skills and customer focus, she delivers solutions that exceed expectations. In her free time, Weizhou explores new technologies and fosters a collaborative culture.

BDB-4558-AjinkyaAjinkya Patil is a Senior Security Architect with AWS Professional Services, specializing in security consulting for customers in the automotive industry. Since joining AWS in 2019, he has played a key role in helping automotive companies design and implement robust security solutions on AWS. Ajinkya is an active contributor to the AWS community, having presented at AWS re:Inforce and authored articles for the AWS Security Blog and AWS Prescriptive Guidance. Outside of his professional pursuits, Ajinkya is passionate about travel and photography, often capturing the diverse landscapes he encounters on his journeys.

BDB-4558-AdjoaAdjoa Taylor has over 20 years of experience in industrial manufacturing, providing industry and technology consulting services, digital transformation, and solution delivery. Currently, Adjoa leads Product Centric Digital Transformation, enabling customers in solving complex manufacturing problems using smart factory and industry-leading transformation mechanisms. Most recently, she drives value with AI/ML and generative AI use cases for the plant floor. Adjoa is an experienced leader, having spent over 20 years of her career delivering projects in countries throughout North America, Latin America, Europe, and Asia. Adjoa brings deep experience across multiple business segments with a focus on business outcome-driven solutions. Adjoa is passionate about helping customers solve problems while realizing the art of the possible through implementing value-based solutions.

Streamlining AWS Glue Studio visual jobs: Building an integrated CI/CD pipeline for seamless environment synchronization

Post Syndicated from Andrei Maksimov original https://aws.amazon.com/blogs/big-data/streamlining-aws-glue-studio-visual-jobs-building-an-integrated-ci-cd-pipeline-for-seamless-environment-synchronization/

Many Amazon Web Services (AWS) customers have integrated their data across multiple sources using AWS Glue, a serverless data integration service. By providing seamless integration throughout the development lifecycle, AWS Glue enables organizations to make data-driven business decisions.

AWS Glue Studio visual jobs provide a graphical interface called the visual editor that you can use to author extract, transform, and load (ETL) jobs in AWS Glue visually. The visual editor maintains a visual representation that a variety of data sources, transformations, and data sinks. With its intuitive interface, you can easily create large-scale data integration jobs without needing coding expertise, simplifying workflows and eliminating the need for manual ETL script programming.

As data engineers increasingly rely on the AWS Glue Studio visual editor to create data integration jobs, the need for a streamlined development lifecycle and seamless synchronization between environments has become paramount. Additionally, managing versions of visual directed acyclic graphs (DAGs) is crucial for tracking changes, collaboration, and maintaining consistency across environments.

This post introduces an end-to-end solution that addresses these needs by combining the power of the AWS Glue Visual Job API, a custom AWS Glue Resource Sync Utility, and an based continuous integration and continuous deployment (CI/CD) pipeline.

A few common questions from our customers include:

  • What are the best practices for moving our workloads from a pre-production environment to production?
  • What are the recommended best practices for provisioning data integration components?
  • How can I build AWS Glue visual jobs in the development environment and automatically propagate them to the production account using the CI/CD pipeline?
  • How can I version control and track changes to my AWS Glue Studio visual jobs?

End-to-end development lifecycle for data integration pipeline

The software development lifecycle on AWS has six phases: plan, design, implement, test, deploy, and maintain, as shown in the following diagram.

SDLC

For more information regarding each component, check out End-to-end development lifecycle for data engineers to build a data integration pipeline using AWS Glue.

AWS Glue Resource Sync Utility

As part of synchronizing AWS Glue visual jobs across different environments, requirements include:

  • Manage version control of visual DAGs by tracking changes to AWS Glue Studio visual jobs using version control systems such as Git
  • Promote AWS Glue visual jobs from a pre-production environment to a production environment
  • Transfer ownership of AWS Glue visual jobs between different AWS accounts
  • Replicate AWS Glue visual jobs from one AWS Region to another as part of a disaster recovery strategy

The AWS Glue Resource Sync Utility is a Python application developed on top of the AWS Glue Visual Job API, designed to synchronize AWS Glue Studio visual jobs across different accounts without losing the visual representation. It operates by using source and target AWS environment profiles. Optionally, a list of jobs for synchronization can be provided along with a mapping file to replace environment-specific resources.

For more information on the AWS Glue Resource Sync Utility, refer to Synchronize your AWS Glue Studio Visual Jobs to different environments.

Solution overview

As shown in the following diagram, this solution uses three separate AWS accounts. One account is designated for the development environment, another for the production environment, and a third to host the CI/CD infrastructure and pipeline.

Solution Overview

The solution emphasizes version controlling AWS Glue Studio visual jobs by serializing them into JSON files and storing them in a Git repository. As a result, you can:

  • Track changes to your visual DAGs over time.
  • Collaborate with team members.
  • Restore and deploy visual DAGs in different environments seamlessly.

The AWS account responsible for hosting the CI/CD pipeline is composed of three key components:

  • Managing AWS Glue Job updates – Provides smooth updates and maintenance of AWS Glue jobs.
  • Cross-Account Access Management – Enables secure promotion of updates from the development environment to the production environment.
  • Version Control Integration – Incorporates serialized visual DAGs into the CI/CD pipeline for deployment to target environments.

You can create AWS Glue Studio visual jobs using the intuitive visual editor in your development account. After these jobs are configured, they can serialize the visual DAGs into JSON files and commit them to a Git repository. The CI/CD pipeline detects changes to the repository and automatically triggers the deployment process.

The pipeline includes a step where the AWS Glue Resource Sync Utility deserializes the visual DAGs from the committed JSON files and deploys them to the production environment. This approach promotes consistent deployment of jobs while maintaining their visual representation.

The solution uses the AWS Glue Visual Job API, AWS Glue Resource Sync Utility, and AWS CDK to streamline deployment across environments. It enables seamless synchronization and consistent versioning of AWS Glue jobs between development and production, preserving visual workflows and reducing manual tasks. The solution consists of two main parts:

  • Initial steps (one-time setup) – Setting up the development environment, bootstrapping AWS environments, deploying the CI/CD pipeline, and integrating the AWS Glue Resource Sync Utility
  • Day-to-day development (repeated) – Ongoing activities such as creating visual jobs, serializing them, committing changes to the repository, deploying to production through the pipeline, and verifying the jobs

The solution follows these high-level steps for the initial setup:

  1. Set up the development environment
  2. Bootstrap your AWS environments
  3. Deploy the CI/CD pipeline
  4. Configure AWS developer tools connection on GitHub
  5. Integrate the CI/CD pipeline with the AWS Glue Resource Sync Utility

The solution follows these high-level steps for the day-to-day development:

  1. Create visual jobs in the development account
  2. Serialize visual jobs
  3. Commit changes to Git repository
  4. Deploy visual jobs to production
  5. Verify visual jobs in production

Prerequisites

Before you begin, make sure you have the following:

  • GitHub account
  • Git (git command)
  • Python 3.9 or later
  • Package installer for Python (pip command)
  • AWS CDK Toolkit (cdk command) 2.155.0 or later
  • AWS CLI configured with appropriate credentials for your accounts
  • Three AWS accounts:
    • Development account
    • Production account
    • Pipeline account (for hosting the CI/CD pipeline)

Technical solution walkthrough

This section provides a detailed guide to setting up and using an automated CI/CD pipeline for AWS Glue Studio visual jobs.

Initial steps (one-time setup)

In this section, we walk through the foundational steps required to establish the CI/CD pipeline for AWS Glue Studio visual jobs. These initial steps set up the necessary infrastructure and configurations, providing a smooth and automated deployment process across your development and production environments.

Set up the development environment

To set up the development environment, follow these steps:

  1. Fork the aws-glue-cdk-baseline repository
  2. Clone the forked repository:
git clone https://github.com/<YOUR-GITHUB-USERNAME>/aws-glue-cdk-baseline.git

cd aws-glue-cdk-baseline
  1. Create and activate a Python virtual environment:
python3 -m venv .venv

# On Windows, use .venv\\Scripts\\activate.bat
source .venv/bin/activate
  1. Install required dependencies:
pip install -r requirements.txt

pip install -r requirements-dev.txt
  1. To configure the default settings, edit the default-config.yaml file with your AWS account details and replace placeholders with your AWS account details:
  2. Pipeline account: awsAccountId and awsRegion.
  3. Development account: awsAccountId and awsRegion.
  4. Production account: awsAccountId and awsRegion.

Bootstrap your AWS environments

Bootstrapping prepares your AWS accounts for AWS CDK deployments. To bootstrap your AWS environments, run the following commands, replacing placeholders with your account numbers, Regions, and AWS CLI profiles:

# Bootstrap the pipeline account
cdk bootstrap aws://<PIPELINE-ACCOUNT-NUMBER>/<REGION> --profile <PIPELINE-PROFILE>

# Bootstrap the development account, trusting the pipeline account
cdk bootstrap aws://<DEV-ACCOUNT-NUMBER>/<REGION> --profile <DEV-PROFILE> --trust <PIPELINE-ACCOUNT-NUMBER>

# Bootstrap the production account, trusting the pipeline account
cdk bootstrap aws://<PROD-ACCOUNT-NUMBER>/<REGION> --profile <PROD-PROFILE> --trust <PIPELINE-ACCOUNT-NUMBER>

Deploy the CI/CD pipeline

Deploy the pipeline stack to your pipeline account:

cdk deploy --profile <PIPELINE-PROFILE>

This command creates:

  • The pipeline stack in the pipeline account
  • The AWS Glue app stack in the development account

Configure AWS developer tools connection to GitHub

To establish a connection between AWS CodePipeline and your GitHub repository, follow these steps:

  1. Create a GitHub connection
  2. In the AWS Management Console for your pipeline account, navigate to AWS CodePipeline
  3. In the navigation pane, choose Connections
  4. Choose Create connection
  5. Select GitHub as the source provider
  6. Authorize the connection
  7. Provide a connection name (such as MyGitHubConnection)
  8. Choose Connect to GitHub
  9. Follow the prompts to authorize AWS CodePipeline to access your GitHub account
  10. Make sure that the connection has access to your forked aws-glue-cdk-baseline repository
  11. Note the connection Amazon Resource Name (ARN)
  12. After the connection is established, note the Connection ARN because you’ll need it when configuring the pipeline

Integrate the CI/CD pipeline with the AWS Glue Resource Sync Utility

To integrate the AWS Glue Resource Sync Utility into the pipeline to automate the synchronization of AWS Glue visual jobs, follow these steps:

  1. Download the sync.py script from the AWS Glue Samples repository:
wget https://raw.githubusercontent.com/aws-samples/aws-glue-samples/master/utilities/resource_sync/sync.py \
-O aws_glue_cdk_baseline/job_scripts/sync.py
  1. Create a new file aws_glue_cdk_baseline/job_scripts/generate_mapping.py with the following content:
import yaml
import json
 
def generate_mapping():
    with open('default-config.yaml', 'r') as config_file:
        config = yaml.safe_load(config_file)
    mapping = {
        f"s3://aws-glue-assets-{config['devAccount']['awsAccountId']}-{config['devAccount']['awsRegion']}": f"s3://aws-glue-assets-{config['prodAccount']['awsAccountId']}-{config['prodAccount']['awsRegion']}",
        f"arn:aws:iam::{config['devAccount']['awsAccountId']}:role/service-role/AWSGlueServiceRole": f"arn:aws:iam::{config['prodAccount']['awsAccountId']}:role/service-role/AWSGlueServiceRole",
        f"s3://dev-glue-data-{config['devAccount']['awsAccountId']}-{config['prodAccount']['awsRegion']}": f"s3://prod-glue-data-{config['prodAccount']['awsAccountId']}-{config['prodAccount']['awsRegion']}"
    }
    with open('mapping.json', 'w') as mapping_file:
        json.dump(mapping, mapping_file, indent=2)
 
if __name__ == "__main__":
    generate_mapping()

This script generates a mapping.json file that the sync.py script will use to synchronize the jobs between the development and production environments. The mapping.json file contains the mapping of the development environment assets to the production environment assets:

  • The s3://aws-glue-assets-* Amazon Simple Storage Service (Amazon S3) bucket contains the AWS Glue Studio visual job definitions
  • The arn:aws:iam::*:role/service-role/AWSGlueServiceRole AWS Identity and Access Management (IAM) role is used by the AWS Glue Studio jobs to access AWS resources
  • The s3://dev-glue-data-* and s3://prod-glue-data-* S3 buckets contain scripts and data used by the AWS Glue Studio jobs
  1. Update the aws_glue_cdk_baseline/pipeline_stack.py file to include a step that deserializes the JSON file and deploys the AWS Glue jobs to the production environment:
from typing import Dict
import aws_cdk as cdk
from aws_cdk import (
    Stack,
    aws_iam as iam
)
from constructs import Construct
from aws_cdk.pipelines import CodePipeline, CodePipelineSource, CodeBuildStep
from aws_glue_cdk_baseline.glue_app_stage import GlueAppStage
 
GITHUB_REPO = "YOUR-GITHUB-USERNAME/aws-glue-cdk-baseline"
GITHUB_BRANCH = "main"
GITHUB_CONNECTION_ARN = "YOUR-GITHUB-CONNECTION-ARN"
 
class PipelineStack(Stack):
 
    def __init__(self, scope: Construct, construct_id: str, config: Dict, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)
 
        source = CodePipelineSource.connection(
            GITHUB_REPO,
            GITHUB_BRANCH,
            connection_arn=GITHUB_CONNECTION_ARN
        )
 
        pipeline = CodePipeline(self, "GluePipeline",
            pipeline_name="GluePipeline",
            cross_account_keys=True,
            docker_enabled_for_synth=True,
            synth=CodeBuildStep("CdkSynth",
                input=source,
                install_commands=[
                    "pip install -r requirements.txt",
                    "pip install -r requirements-dev.txt",
                    "npm install -g aws-cdk",
                ],
                commands=[
                    "cdk synth",
                ]
            )
        )
 
        # Add development stage
        dev_stage = GlueAppStage(self, "DevStage", config=config, stage="dev", 
            env=cdk.Environment(
                account=str(config['devAccount']['awsAccountId']),
                region=config['devAccount']['awsRegion']
            ))
        pipeline.add_stage(dev_stage)

        # Add production stage
        prod_stage = GlueAppStage(self, "ProdStage", config=config, stage="prod", 
            env=cdk.Environment(
                account=str(config['prodAccount']['awsAccountId']),
                region=config['prodAccount']['awsRegion']
            ))
        pipeline.add_stage(prod_stage)
 
        # Glue Resource Sync as a separate step in the pipeline
        pipeline.add_wave("GlueJobSync").add_post(CodeBuildStep("GlueJobSync",
            input=source,
            commands=[
                "python $(pwd)/aws_glue_cdk_baseline/job_scripts/generate_mapping.py",
                "python aws_glue_cdk_baseline/job_scripts/sync.py "
                   "--dst-role-arn arn:aws:iam::{0}:role/GlueCrossAccountRole-prod "
                   "--dst-region {1} "
                   "--deserialize-from-file aws_glue_cdk_baseline/resources/resources.json "
                   "--config-path mapping.json "
                   "--targets job,catalog "
                   "--skip-prompt".format(
                       config['prodAccount']['awsAccountId'],
                       config['prodAccount']['awsRegion']
                   ),
            ],
            role_policy_statements=[
                iam.PolicyStatement(
                    actions=[
                        "sts:AssumeRole",
                    ],
                    resources=["*"]
                )
            ]
        ))

Replace the placeholders in the pipeline_stack.py file with your values:

  • GITHUB_REPO with the name of your GitHub repository
  • GITHUB_BRANCH with the name of the branch you want to use for the pipeline
  • GITHUB_CONNECTION_ARN with the ARN of the GitHub connection you created in Step 4
  1. Update the aws_glue_cdk_baseline/glue_app_stack.py file to create a cross-account role with the necessary permissions to access the development environment resources:
    self.cross_account_role = self.create_cross_account_role(
        f"GlueCrossAccountRole-{stage}",
        str(config['pipelineAccount']['awsAccountId'])
    )
 
    def create_cross_account_role(self, role_name: str, trusted_account_id: str):
        return iam.Role(self, f"{role_name}CrossAccountRole",
            role_name=role_name,
            assumed_by=iam.AccountPrincipal(trusted_account_id),
            managed_policies=[iam.ManagedPolicy.from_aws_managed_policy_name("AdministratorAccess")]
        )
 
    @property
    def cross_account_role_arn(self):
        return self.cross_account_role.role_arn

    @property
    def cross_account_role_arn(self):
        return self.glue_app_stack.cross_account_role_arn

Check the andreimaksimov/aws-glue-cdk-baseline for a complete diff.

  1. Commit your changes to the repository:
git add aws_glue_cdk_baseline/job_scripts/sync.py
git add aws_glue_cdk_baseline/job_scripts/generate_mapping.py
git add pipeline_stack.py

git commit -m "Integrate Glue Resource Sync Utility into the pipeline"

git push

Day-to-day development (repeated)

With the initial setup complete, you can now proceed with your regular development activities. This section outlines the steps you’ll repeat during your day-to-day work to develop, version control, and deploy AWS Glue visual jobs.

Create visual jobs in the development account

In this step, you’ll use AWS Glue Studio to create and configure your visual jobs within the development environment.

  1. In your development account, in AWS Glue Studio, select AWS Glue Studio
  2. To create a new visual job, choose Create job
  3. Choose Visual with a blank canvas and use the visual editor to design your ETL job
  4. Configure the job settings:
  5. Job name: Provide a meaningful name
  6. IAM role: Select an IAM role with necessary permissions
  7. Other configurations: Adjust as needed
  8. To save the job, choose Save

Repeat these steps to create additional jobs as required.

Serialize visual jobs

To serialize your visual jobs to enable version control and preparation for deployment, follow these steps:

  1. Run the AWS Glue Resource Sync Utility:
python sync.py \
  --src-role-arn arn:aws:iam::<DEV-ACCOUNT-NUMBER>:role/GlueCrossAccountRole-dev \
  --src-region us-east-1 \
  --serialize-to-file resources.json \
  --targets job,catalog \
  --skip-prompt
  1. Replace <DEV-ACCOUNT-NUMBER> with your development account number
  2. Replace <DEV-REGION> with your development Region (for example, us-east-1)
  3. Verify the serialized file:
  4. Locate JSON in aws_glue_cdk_baseline/resources/
  5. Make sure it contains the definitions of your visual jobs

Commit changes to Git repository

To commit changes to the Git repository, follow these steps:

  1. Add the serialized resources to Git:
git add aws_glue_cdk_baseline/resources/resources.json
  1. Commit your changes:
git commit -m "Add serialized Glue Visual Jobs"
  1. Push to GitHub:
git push

This action triggers the CI/CD pipeline.

Deploy visual jobs to production

The CI/CD pipeline automatically deploys the following changes:

  • Synthesize the AWS CDK application
  • Deploy to the development environment
  • Deploy to the production environment
  • Execute the AWS Glue Resource Sync Utility

The following screenshot shows the CI/CD pipeline.

CICD Pipeline

Verify visual jobs in production

After the pipeline has completed the deployment, it’s important to verify that the visual jobs are correctly reflected in the production environment. To do so, follow these steps:

  1. In the production account, on the AWS Glue Studio console, select AWS Glue Studio
  2. Verify the deployed jobs:
  3. Make sure that the visual jobs are present
  4. Open each job to confirm that the visual DAGs are preserved

By following these steps in your day-to-day workflow, you make sure that your AWS Glue visual jobs are version-controlled, consistent across environments, and that your production environment reflects the latest tested changes.

Version control for AWS Glue visual jobs

By serializing AWS Glue Studio visual jobs to JSON files and committing them to a Git repository, you enable version control for your data integration workflows. By following this approach you can:

  • Track Changes – Monitor modifications to your AWS Glue jobs over time
  • Collaborate – Work with team members on developing and refining jobs
  • Restore and deploy – Easily restore jobs in other accounts or environments

The serialization and deserialization steps are integral to your development and deployment process, making sure that all changes are captured and seamlessly propagated.

Conclusion

By combining the AWS Glue Visual Job API, AWS Glue Resource Sync Utility, and an AWS CDK based CI/CD pipeline, we’ve crafted a comprehensive solution for managing AWS Glue Studio visual jobs across different environments. This integrated approach offers several benefits:

  • Version control integration – Manage and track changes to your AWS Glue visual jobs using Git, enabling collaboration and change tracking
  • Streamlined development – Easily develop and test AWS Glue jobs using the Visual Editor in the development environment
  • Automated deployment – Use a CI/CD pipeline to automatically deploy serialized visual DAGs to the production environment
  • Environment consistency – Promote consistency across development and production environments by using the same job definitions
  • Visual representation preservation – Maintain the visual DAG representation when synchronizing jobs between environments

This solution empowers data engineers to focus on building robust data integration pipelines while automating the complexities of managing and deploying AWS Glue Studio visual jobs across multiple environments.

We encourage you to try this solution and adapt it to your needs. As always, we welcome your feedback and suggestions for further improvements.


About the Authors

Andrei MaksimovAndrei Maksimov is an AWS Senior Cloud Infrastructure Architect specializing in cloud infrastructure, software development, and DevOps. He designs and implements scalable, secure, and efficient cloud solutions and helps customers optimize their cloud environments. Outside of work, Andrei enjoys participating in hackathons, contributing to open source projects, and exploring the latest advancements in AI. You can connect with him on LinkedIn.

David ZhangDavid Zhang is an AWS Data Architect specializing in designing and implementing analytics infrastructure, data management, ETL, and extensive data systems. He helps customers modernize their AWS data platforms. David is also an active speaker at AWS conferences and contributor to AWS conferences, technical content, and open source initiatives. He enjoys playing volleyball, tennis, and weightlifting in his free time. Feel free to connect with him on LinkedIn.

Noritaka SekiyamaNoritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for designing AWS features, implementing software artifacts, and helping with customer architectures. In his spare time, he enjoys watching anime on Prime Video. You can connect with him on LinkedIn.

How Volkswagen Autoeuropa built a data mesh to accelerate digital transformation using Amazon DataZone

Post Syndicated from Dhrubajyoti Mukherjee original https://aws.amazon.com/blogs/big-data/how-volkswagen-autoeuropa-built-a-data-mesh-to-accelerate-digital-transformation-using-amazon-datazone/

This is a joint blog post co-authored with Martin Mikoleizig from Volkswagen Autoeuropa.

Volkswagen Autoeuropa is a Volkswagen Group plant that produces the T-Roc. The plant is located near Lisbon, Portugal and produces about 934 cars per day. In 2023, Volkswagen Autoeuropa represented 1.3% of the national GDP of Portugal and 4% in national export of goods impact with a sales volume of 3.3511 billion Euros. Volkswagen Autoeuropa aims to become a data-driven factory and has been using cutting-edge technologies to enhance digitalization efforts.

In this post, we discuss how Volkswagen Autoeuropa used Amazon DataZone to build a data marketplace based on data mesh architecture to accelerate their digital transformation. The data mesh, built on Amazon DataZone, simplified data access, improved data quality, and established governance at scale to power analytics, reporting, AI, and machine learning (ML) use cases. As a result, the data solution offers benefits such as faster access to data, expeditious decision making, accelerated time to value for use cases, and enhanced data governance.

Understanding Volkswagen Autoeuropa’s challenges

At the time of writing this post, Volkswagen Autoeuropa has already implemented more than 15 successful digital use cases in the context of real-time visualization, business intelligence, industrial computer vision, and AI.

Before the AWS partnership, Volkswagen Autoeuropa faced the following challenges.

  • Long lead time to access data – The digital use cases launched by Volkswagen Autoeuropa spent most of their project time getting access to the data that was relevant to their use cases. After the right data for the use case was found, the IT team provided access to the data through manual configuration. The lead time to access data was often from several days to weeks.
  • Insufficient data governance and auditing – Data was shared directly to use cases by copying it. Therefore, the IT team connected the data manually from their sources to the desired destinations multiple times. This process wasn’t centrally tracked to discover any information on the data sharing process. For example, if the data was copied in the past, how many use cases have access to the data, when access was granted, and who granted the access.
  • Redundant effort to process the same information – Because the IT team copied the data sources based on the exact use case requirements, they shared specific columns of the tables from the data. As additional use cases requested access to the same data with different column requirements, even more copies of the data were created.
  • Repeated process to establish security and governance guardrails – Each time the IT and the security team provided a connection to a new data source, they had to set up the security and governance guardrails. This required repeated manual effort.
  • Data quality issues – Because the data was processed redundantly and shared multiple times, there was no guarantee of or control over the quality of the data. This led to reduced trust in the data.
  • Absence of data catalog and metadata management – Data didn’t have any metadata associated with it, and so use cases couldn’t consume the data without further explanation from the data source owners and specialists. Furthermore, no process to discover new data existed. Similar to the consumption process, use cases would consult specialists to understand the context of the data and if it could provide value.

Envisioning a data solution for Volkswagen Autoeuropa

To address these challenges, Volkswagen Autoeuropa embarked on a bold vision. They envisioned a seamless data consumption process, similar to an online shopping experience. They envisioned a data marketplace where data users could browse and access high-quality, secure data with clear specifications, business context, and relevant attributes. This vision materialized into a project aimed at transforming data accessibility and governance as the foundation for the digital ecosystem. The vision to be realized: Data as seamless as online shopping.

In collaboration with Amazon Web Services (AWS), Volkswagen Autoeuropa joined the Enhanced Plant Onboarding Program of the Global Volkswagen Group’s Digital Production Platform (DPP EPO) strategy. Through this partnership, AWS and Volkswagen Autoeuropa created a data marketplace that significantly improved data availability.

In the discovery phase of the project, Volkswagen Autoeuropa and AWS evaluated several options to build the data solution. In the end, Volkswagen Autoeuropa chose a solution based on data mesh architecture using Amazon DataZone. Being a managed service, Amazon DataZone provided the necessary speed and agility to build the solution. At the same time, it led to higher operational efficiencies and lower operational overhead. The team adopted a data mesh architecture because the principles of the data mesh aligned with Volkswagen Autoeuropa’s vision of being a data driven factory.

Solution overview

This section describes the key features and architecture of the Volkswagen Autoeuropa data solution. The solution is based on a data mesh architecture.

Data solution features

The following figure shows the key capabilities of the Volkswagen Autoeuropa data solution.

The key capabilities of the solution are:

  • Data quality – In the solution, we’ve built a data quality framework to streamline the process of data quality checks and publishing quality scores. It uses AWS Glue Data Quality to generate recommendation rulesets, run orchestrated jobs, store results, and send notifications to users. This framework can be seamlessly integrated into AWS Glue jobs, providing a quality score for data pipeline jobs. In addition, the quality score is published in the Amazon DataZone data portal, allowing consumers to subscribe to the data based on its quality score.Assigning a quality score to the data helps build trust in the data, and shifts the responsibility of maintaining data quality to the data owner. As a result, the quality of the results delivered by these use cases improves.
  • Data registration – The producers sign in to the Amazon DataZone data portal using their AWS Identity and Access Management (IAM) credentials or single sign-on with integration through AWS IAM Identity Center. They register their data assets, which are stored in Amazon Simple Storage Service (Amazon S3), in the Amazon DataZone data catalog. The metadata of the data assets is stored in an AWS Glue catalog and made available in the business data catalog of Amazon DataZone and in the Amazon DataZone data source. The producers add business context such as business unit name, data owner contact information, and data refresh frequency using Amazon DataZone glossaries and metadata forms. In addition, they use generative AI capabilities to generate business metadata. After the business metadata is generated, they review the changes and modify the metadata if needed.Because all data products in Volkswagen Autoeuropa are now registered in the same location, the likelihood of data duplication is significantly reduced. Moreover, the data producers are improving the quality of the data by adding business context to it.
  • Data discovery – The consumers sign in to the Amazon DataZone data portal using their IAM credentials or single sign-on with integration through IAM Identity Center and search the data using keywords in the search bar. After the results are returned, they can further filter the results using glossary terms and project names. Finally, they review the business metadata of the data assets to evaluate if the data is relevant to their business use cases. They can check the quality score of the data assets and the refresh schedule for their use cases.With a data discovery capability in place, consumers can gain information about the data without the need to consult the source system owners or specialists.
  • Data access management – When the consumers find a data asset that’s relevant to their use case, they request access to it using the subscription feature of Amazon DataZone. Data is classified as public, internal, and confidential. For public and internal data assets, the access request is automatically approved. For confidential data assets, the data producer team reviews the access request and either accepts or rejects the subscription request.With a central place to manage data access, data owners can view which use cases have access to their data and when the access request was granted. The fine-grained access control feature of Amazon DataZone gives data owners granular control of their data at the row and column levels.
  • Data consumption – Upon approval of the subscription request, Amazon DataZone provisions the backend infrastructure to make the data accessible to the corresponding consumers. After this process is complete, the consumers can access the data through Amazon Athena using the deep link feature of Amazon DataZone. The data consumption pattern in Volkswagen Autoeuropa supports two use cases:
    • Cloud-to-cloud consumption – Both data assets and consumer teams or applications are hosted in the cloud.
    • Cloud-to-on-premises consumption – Data assets are hosted in the cloud and consumer use cases or applications are hosted on-premises.

Requirements specific to a use case requires access to the relevant data assets; sharing data to use cases using Amazon DataZone doesn’t require creating multiple copies. As a result, duplication and processing of data. Furthermore, by reducing the number of copies of the data, the overall quality of the data products improves. In addition, the backend automation of Amazon DataZone to make data available to use cases reduces the manual effort and improves the lead time to access data.

  • Single collaborative environment – The Amazon DataZone data portal provides a single collaborative environment to the users in Volkswagen Autoeuropa. Data consumers such as use case owners, data engineers, data scientists, and ML engineers can browse and request access to data assets. At the same time, data producers, such as use case owners and source system owners, can publish and curate their data in the Amazon DataZone data portal. This collaborative experience promotes teamwork and accelerates the realization of business value. Furthermore, the security and governance guardrails scales across the organization as the number of use cases increases.

Data solution architecture

The following figure displays the reference architecture of the data solution at Volkswagen Autoeuropa. In the next part of the post, we discuss how we arrived at the solution.

The architecture includes:

  1. The data from SAP applications, manufacturing execution systems (MES), and supervisory control and data acquisition (SCADA) systems is ingested into the producer accounts of Volkswagen Autoeuropa.
  2. In the producer account, raw data is transformed using AWS Glue. The technical metadata of the data is stored in AWS Glue catalog. The data quality is measured using the data quality framework. The data stored in Amazon Simple Storage Service (Amazon S3) is registered as an asset in the Amazon DataZone data catalog hosted in the central governance account.
  3. The central governance account hosts the Amazon DataZone domain and the related Amazon DataZone data portal. The AWS accounts of the data producers and consumers are associated with the Amazon DataZone domain. Amazon DataZone projects belonging to the data producers and consumers are created under the related Amazon DataZone domain units.
  4. Consumers of the data products sign in to the Amazon DataZone data portal hosted in the central governance account using their IAM credentials or single sign-on with integration through IAM Identity Center. They search, filter, and view asset information (for example, data quality, business, and technical metadata).
  5. After the consumer finds the asset they need, they request access to the asset using the subscription feature of Amazon DataZone. Based on the validity of the request, the asset owner approves or rejects the request.
  6. After the subscription request is granted and fulfilled, the asset is accessed in the consumer account for a one-time query using Athena and Microsoft Power BI applications hosted on premises. This consumption pattern can be extended for AI and machine learning (AI/ML) model development using Amazon SageMaker and reporting purposes using Amazon QuickSight.

User journey

After discussing the desired system with the use case teams and stakeholders and analyzing the current workflow, Volkswagen Autoeuropa grouped the user personas of the data solution into three main categories: data producer, data consumer, and data solution administrator. This sets the foundation for the desired user experience and what’s needed to achieve the solution goals.

Data producer

Data producers create the data products in the data solution. There are two types of data producers.

  • Data source owners – Data source owners publish the raw data in the Amazon DataZone data portal. These data products are attributed as source-based data.
  • Use case owners – Use case owners publish data that’s fit for consumption by other use cases. These data products are called consumer-based data.

The following figure shows the user journey of a data producer:

 

A data producer’s journey includes:

  1. Identify data of interest
    1. Identify data (Volkswagen Autoeuropa network).
    2. Perform data quality checks (Volkswagen Autoeuropa network).
  2. Connect data to the data solution
    1. Ingest data into the data solution (Amazon DataZone portal).
    2. Start process to connect data using AWS Glue.
  3. Locate the data source in the data solution
    1. Register data (Amazon DataZone portal).
    2. Add data to the inventory in Amazon DataZone.
  4. Add or edit metadata
    1. Add or edit metadata (Amazon DataZone portal).
    2. Publish data assets (Amazon DataZone portal).
  5. Approve or reject subscription request
    1. Review subscription requests.
  6. Maintain data assets
    1. Manage data assets (Amazon DataZone portal).

Data consumer

Data consumers use data for business analytics, machine learning, AI, and business reporting. Data consumers are data engineers, data scientists, ML engineers, and business users. The following diagram shows the journey of a data consumer.

A data consumer’s journey includes:

  1. Access Amazon DataZone portal
    1. Amazon DataZone portal – Access is granted based on the user’s assigned domain and projects.
  2. Search for data assets
    1. Data assets in Amazon DataZone portal – Search for data and brows the results by glossary terms or the project name. Use additional filters to refine the results.
  3. View business metadata
    1. Select a data asset to see additional information – Review the description, data quality score and metadata.
  4. Request access to data (subscribe)
    1. Subscribe to request access.
    2. After the subscription request is approved, review the data products that you have access to.
    3. Query the data to view and consume the data.
  5. Retrieve additional data
    1. Repeat the steps as needed to access and retrieve additional data.

Data solution administrator

Data solution administrators are responsible for performing administrative tasks on the data solution. The following figure shows the common tasks performed by the data solution administrator.

A data administrator’s journey includes:

  1. Manage projects
    1. Manage Amazon DataZone domain.
    2. Manage Amazon DataZone projects within the domain.
  2. Manage environment
    1. Set up the environment to manage the infrastructure.
  3. Manage business metadata glossary
    1. Manage and enable Amazon DataZone glossaries and metadata forms.
  4. Manage data assets
    1. Manage assets.
    2. Query the data to view and consume the data.
  5. Manage access to data solution
    1. Monitor and revoke access when appropriate.

Conclusion

In this post, you learned how Volkswagen Autoeuropa embarked on a bold vision to become a data driven factory. It shows how this vision was put into action by building a data solution based on data mesh architecture using Amazon DataZone. It highlights the key features and architecture of the data solutions and presents the user journey. As of writing this post, Volkswagen Autoeuropa reduced the data discovery time from days to minutes using the data solution. The time to access data took several weeks before the Volkswagen Autoeuropa and AWS collaboration. Now, with the help of the data solution, the data access time has been reduced to several minutes.

In May 2024, the team achieved a major milestone by successfully offering data on the data solution and transporting it instantly to Power BI, a process that previously took several weeks.

“After one year of work, we did the full roundtrip from offering data on our new data marketplace built using Amazon DataZone to transporting it instantly to third-party tools, a process that previously took several weeks. This was a big achievement for our team.”

– Jorge Paulino, Product owner of the data solution. Volkswagen Autoeuropa.

The next post of the two-part series details discusses how we built the solution, its technical details, and the business value created.

If you want to harness the agility and scalability of a data mesh architecture and Amazon DataZone to accelerate innovation and drive business value for your organization, we have the resources to get you started. Be sure to check out the AWS Prescriptive Guidance: Strategies for building a data mesh-based enterprise solution on AWS. This comprehensive guide covers the key considerations and best practices for establishing a robust, well-governed data mesh on AWS. From aligning your data mesh with overall business strategy to scaling the data mesh across your organization, this Prescriptive Guidance provides a clear roadmap to help you succeed.

If you’re curious to get hands-on, see the GitHub repository: Building an enterprise Data Mesh with Amazon DataZone, Amazon DataZone, AWS CDK, and AWS CloudFormation. This open source project delivers a step-by-step guide to build a data mesh architecture using Amazon DataZone, AWS Cloud Development Kit (AWS CDK), and AWS CloudFormation.


About the Authors

Dhrubajyoti Mukherjee is a Cloud Infrastructure Architect with a strong focus on data strategy, data analytics, and data governance at Amazon Web Services (AWS). He uses his deep expertise to provide guidance to global enterprise customers across industries, helping them build scalable and secure AWS solutions that drive meaningful business outcomes. Dhrubajyoti is passionate about creating innovative, customer-centric solutions that enable digital transformation, business agility, and performance improvement. An active contributor to the AWS community, Dhrubajyoti authors AWS Prescriptive Guidance publications, blog posts, and open-source artifacts, sharing his insights and best practices with the broader community. Outside of work, Dhrubajyoti enjoys spending quality time with his family and exploring nature through his love of hiking mountains.

Ravi Kumar is a Data Architect and Analytics expert at Amazon Web Services; he finds immense fulfillment in working with data. His days are dedicated to designing and analyzing complex data systems, uncovering valuable insights that drive business decisions. Outside of work, he unwinds by listening to music and watching movies, activities that allow him to recharge after a long day of data wrangling.

Martin Mikoleizig studied mechanical engineering and production technology at the RWTH Aachen University before starting to work in Dr. h.c. Ing. F. Porsche AG 2015 as a production planner for the engine assembly. In several years as a Project Manager on Testing Technology for new engine models he also introduced several innovations like human-machine-collaborations and intelligent assistance systems. From 2017, he was responsible for the Shopfloor IT team of the module lines in Zuffenhausen before he became responsible for the Planning of the E-Drive assembly at Porsche. Beside this he was responsible for the Digitalisation Strategy of the Production Ressort at Porsche. Since October 2022, he has been assigned to Volkswagen Autoeuropa in Portugal in the role of a Digital Transformation Manager for the plant driving the Digital Transformation towards a Data Driven Factory.

Weizhou Sun is a Lead Architect at Amazon Web Services, specializing in digital manufacturing solutions and IoT. With extensive experience in Europe, she has enhanced operational efficiencies, reducing latency and increasing throughput. Weizhou’s expertise includes Industrial Computer Vision, predictive maintenance, and predictive quality, consistently delivering top performance and client satisfaction. A recognized thought leader in IoT and remote driving, she has contributed to business growth through innovations and open-source work. Committed to knowledge sharing, Weizhou mentors colleagues and contributes to practice development. Known for her problem-solving skills and customer focus, she delivers solutions that exceed expectations. In her free time, Weizhou explores new technologies and fosters a collaborative culture.

Shameka Almond is an Advisory Consultant at Amazon Web Services. She works closely with enterprise customers to help them better understand the business impact and value of implementing data solutions, including data governance best practices. Shameka has over a decade of wide-ranging IT experience in the manufacturing and aerospace industries, and the nonprofit sector. She has supported several data governance initiatives, helping both public and private organizations identify opportunities for improvement and increased efficiency. Outside of the office she enjoys hosting large family gatherings, and supporting community outreach events dedicated to introducing students in K-12 to STEM.

Adjoa Taylor has over 20 years of experience in industrial manufacturing, providing industry and technology consulting services, digital transformation, and solution delivery. Currently Adjoa leads Product Centric Digital Transformation, enabling customers to solve complex manufacturing problems by leveraging Smart Factory and Industry leading transformation mechanisms. Most recently driving value with AI/ML and generative AI use-cases for the plant floor. Adjoa is an experienced leader spending over 20 years of her career delivering projects in countries throughout North America, Latin America, Europe, and Asia. Through prior roles, Adjoa brings deep experience across multiple business segments with a focus on business outcome driven solutions. Adjoa is passionate about helping customers solve problems while realizing the art of the possible via the right impacting value-based solution.

Modernize your legacy databases with AWS data lakes, Part 2: Build a data lake using AWS DMS data on Apache Iceberg

Post Syndicated from Shaheer Mansoor original https://aws.amazon.com/blogs/big-data/modernize-your-legacy-databases-with-aws-data-lakes-part-2-build-a-data-lake-using-aws-dms-data-on-apache-iceberg/

This is part two of a three-part series where we show how to build a data lake on AWS using a modern data architecture. This post shows how to load data from a legacy database (SQL Server) into a transactional data lake (Apache Iceberg) using AWS Glue. We show how to build data pipelines using AWS Glue jobs, optimize them for both cost and performance, and implement schema evolution to automate manual tasks. To review the first part of the series, where we load SQL Server data into Amazon Simple Storage Service (Amazon S3) using AWS Database Migration Service (AWS DMS), see Modernize your legacy databases with AWS data lakes, Part 1: Migrate SQL Server using AWS DMS.

Solution overview

In this post, we go over the process of building a data lake, providing the rationale behind the different decisions, and share best practices when building such a solution.

The following diagram illustrates the different layers of the data lake.

Overall Architecture

To load data into the data lake, AWS Step Functions can define a workflow, Amazon Simple Queue Service (Amazon SQS) can track the order of incoming files, and AWS Glue jobs and the Data Catalog can be used create the data lake silver layer. AWS DMS produces files and writes these files to the bronze bucket (as we explained in Part 1).

We can turn on Amazon S3 notifications and push the new arriving file names to an SQS first-in-first-out (FIFO) queue. A Step Functions state machine can consume messages from this queue to process the files in the order they arrive.

For processing the files, we need to create two types of AWS Glue jobs:

  • Full load – This job loads the entire table data dump into an Iceberg table. Data types from the source are mapped to an Iceberg data type. After the data is loaded, the job updates the Data Catalog with the table schemas.
  • CDC – This job loads the change data capture (CDC) files into the respective Iceberg tables. The AWS Glue job implements the schema evolution feature of Iceberg to handle schema changes such as addition or deletion of columns.

As in Part 1, the AWS DMS jobs will place the full load and CDC data from the source database (SQL Server) in the raw S3 bucket. Now we process this data using AWS Glue and save it to the silver bucket in Iceberg format. AWS Glue has a plugin for Iceberg; for details, see Using the Iceberg framework in AWS Glue.

Along with moving data from the bronze to the silver bucket, we also create and update the Data Catalog for further processing the data for the gold bucket.

The following diagram illustrates how the full load and CDC jobs are defined inside the Step Functions workflow.

Step Functions for loading data into the lake

In this post, we discuss the AWS Glue jobs for defining the workflow. We recommend using AWS Step Functions Workflow Studio, and setting up Amazon S3 event notifications and an SNS FIFO queue to receive the filename as messages.

Prerequisites

To follow the solution, you need the following prerequisites set up as well as certain access rights and AWS Identity and Access Management (IAM) privileges:

  • An IAM role to run Glue jobs
  • IAM privileges to create AWS DMS resources (this role was created in Part 1 of this series; you can use the same role here)
  • The AWS DMS job from Part 1 working and producing files for the source database on Amazon S3.

Create an AWS Glue connection for the source database

We need to create a connection between AWS Glue and the source SQL Server database so the AWS Glue job can query the source for the latest schema while loading the data files. To create the connection, follow these steps:

  1. On the AWS Glue console, choose Connections in the navigation pane.
  2. Choose Create custom connector.
  3. Give the connection a name and choose JDBC as the connection type.
  4. In the JDBC URL section, enter the following string and replace the name of your source database endpoint and database that was set up in Part 1: jdbc:sqlserver://{Your RDS End Point Name}:1433/{Your Database Name}.
  5. Select Require SSL connection, then choose Create connector.

Clue Connections

Create and configure the full load AWS Glue job

Complete the following steps to create the full load job:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Choose Script editor and select Spark.
  3. Choose Start fresh and select Create script.
  4. Enter a name for the full load job and choose the IAM role (mentioned in the prerequisites) for running the job.
  5. Finish creating the job.
  6. On the Job details tab, expand Advanced properties.
  7. In the Connections section, add the connection you created.
  8. Under Job parameters, pass the following arguments to the job:
    1. target_s3_bucket – The silver S3 bucket name.
    2. source_s3_bucket – The raw S3 bucket name.
    3. secret_id – The ID of the AWS Secrets Manager secret for the source database credentials.
    4. dbname – The source database name.
    5. datalake-formats – This sets the data format to iceberg.

Glue Job Parameters

The full load AWS Glue job starts after the AWS DMS task reaches 100%. The job loops over the files located in the raw S3 bucket and processes them one at time. For each file, the job infers the table name from the file name and gets the source table schema, including column names and primary keys.

If the table has one or more primary keys, the job creates an equivalent Iceberg table. If the job has no primary key, the file is not processed. In our use case, all the tables have primary keys, so we enforce this check. Depending on your data, you might need to handle this scenario differently.

You can use the following code to process the full load files. To start the job, choose Run.

import sys, boto3, json
import boto3
import json
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import SparkSession

#Get the arguments passed to the script
args = getResolvedOptions(sys.argv, ['JOB_NAME',
                           'target_s3_bucket',
                           'secret_id',
                           'source_s3_bucket'])
dbname = "AdventureWorks"
schema = "HumanResources"

#Initialize parameters
target_s3_bucket = args['target_s3_bucket']
source_s3_bucket = args['source_s3_bucket']
secret_id = args['secret_id']
unprocessed_tables = []
drop_column_list = ['db', 'table_name', 'schema_name', 'Op', 'last_update_time']  # DMS added columns

#Helper Function: Get Credentials from Secrets Manager
def get_db_credentials(secret_id):
    secretsmanager = boto3.client('secretsmanager')
    response = secretsmanager.get_secret_value(SecretId=secret_id)
    secrets = json.loads(response['SecretString'])
    return secrets['host'], int(secrets['port']), secrets['username'], secrets['password']

#Helper Function: Load Iceberg table with Primary key(s)
def load_table(full_load_data_df, dbname, table_name):

    try:
        full_load_data_df = full_load_data_df.drop(*drop_column_list)
        full_load_data_df.createOrReplaceTempView('full_data')

        query = """
        CREATE TABLE IF NOT EXISTS glue_catalog.{0}.{1}
        USING iceberg
        LOCATION "s3://{2}/{0}/{1}"
        AS SELECT * FROM full_data
        """.format(dbname, table_name, target_s3_bucket)
        spark.sql(query)
        
        #Update Table property to accept Schema Changes
        spark.sql("""ALTER TABLE glue_catalog.{0}.{1} SET TBLPROPERTIES (
                      'write.spark.accept-any-schema'='true'
                    )""".format(dbname, table_name))
        
    except Exception as ex:
        print(ex)
        failed_table = {"table_name": table_name, "Reason": ex}
        unprocessed_tables.append(failed_table)
        
def get_table_key(host, port, username, password, dbname):
    
    jdbc_url = "jdbc:sqlserver://{0}:{1};databaseName={2}".format(host, port, dbname)
    
    connectionProperties = {
      "user" : username,
      "password" : password
    }
    
    spark.read.jdbc(url=jdbc_url, table='INFORMATION_SCHEMA.TABLE_CONSTRAINTS', properties=connectionProperties).createOrReplaceTempView("TABLE_CONSTRAINTS")
    spark.read.jdbc(url=jdbc_url, table='INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE', properties=connectionProperties).createOrReplaceTempView("CONSTRAINT_COLUMN_USAGE")
    df_table_pkeys = spark.sql("select c.TABLE_NAME, C.COLUMN_NAME as primary_key FROM TABLE_CONSTRAINTS T JOIN CONSTRAINT_COLUMN_USAGE C ON C.CONSTRAINT_NAME=T.CONSTRAINT_NAME WHERE T.CONSTRAINT_TYPE='PRIMARY KEY'")
    return df_table_pkeys


#Setup Spark configuration for reading and writing Iceberg tables
spark = (
    SparkSession.builder
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.glue_catalog.warehouse", "s3://{0}".format(dbname))
    .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
    .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    .getOrCreate()
)


#Initialize MSSQL credentials
host, port, username, password = get_db_credentials(secret_id)

#Initialize primary keys for all tables
df_table_pkeys = get_table_key(host, port, username, password, dbname)

#Read Full load csv files from s3
s3 = boto3.client('s3')
full_load_tables = s3.list_objects_v2(Bucket=source_s3_bucket, Prefix="raw/{0}/{1}".format(args['dbname'], args['schema']))

#Loop over files
for item in full_load_tables['Contents']:
    pkey_list = []
    table_name = item["Key"].split("/")[3].lower()
    print("Table name {0}".format(table_name))
    current_table_df = df_table_pkeys.where(df_table_pkeys.TABLE_NAME == table_name)

    # Only Process tables with at least 1 Primary key
    if not current_table_df.isEmpty():
        for i in current_table_df.collect():
            pkey_list.append(i["primary_key"])
    else:
        failed_table = {"table_name": table_name, "Reason": "No primary key"}
        unprocessed_tables.append(failed_table)
        # ToDo Handle these cases

    full_data_path = "s3://{0}/{1}".format(source_s3_bucket, item['Key'])
    full_load_data_df = (spark
                        .read
                        .option("header", True)
                        .option("inferSchema", True)
                        .option("recursiveFileLookup", "true")
                        .csv(full_data_path)
                        )

    primary_key = ",".join(pkey_list)

    if table_name not in unprocessed_tables:
        load_table(full_load_data_df, dbname, table_name)

When the job is complete, it creates the database and tables in the Data Catalog, as shown in the following screenshot.

Data lake silver layer data

Create and configure the CDC AWS Glue job

The CDC AWS Glue job is created similar to the full load job. As with the full load AWS Glue job, you need to use the source database connection and pass the job parameters with one additional parameter, cdc_file, which contains the location of the CDC file to be processed. Because a CDC file can contain data for multiple tables, the job loops over the tables in a file and loads the table metadata from the source table ( RDS column names).

If the CDC operation is DELETE, the job deletes the records from the Iceberg table. If the CDC operation is INSERT or UPDATE, the job merges the data into the Iceberg table.

You can use the following code to process the CDC files. To start the job, choose Run

import sys
import boto3
import json
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import SparkSession

# Get the arguments passed to the script
args = getResolvedOptions(sys.argv, ['JOB_NAME',
                           'target_s3_bucket',
                           'secret_id',
                           'source_s3_bucket',
                           'cdc_file'])
dbname = "AdventureWorks"
schema = "HumanResources"
target_s3_bucket = args['target_s3_bucket']
source_s3_bucket = args['source_s3_bucket']
secret_id = args['secret_id']
cdc_file = args['cdc_file']
unprocessed_tables = []
drop_column_list = ['db', 'table_name', 'schema_name', 'Op', 'last_update_time']  # DMS added columns
source_s3_cdc_file_key = "raw/AdventureWorks/cdc/" + cdc_file



# Helper Function: Get Credentials from Secrets Manager
def get_db_credentials(secret_id):
    secretsmanager = boto3.client('secretsmanager')
    response = secretsmanager.get_secret_value(SecretId=secret_id)
    secrets = json.loads(response['SecretString'])
    return secrets['host'], int(secrets['port']), secrets['username'], secrets['password']

# Helper Function: Column names from RDS
def get_table_colums(table, host, port, username, password, dbname):

    jdbc_url = "jdbc:sqlserver://{0}:{1};databaseName={2}".format(host, port, dbname)
    
    connectionProperties = {
      "user" : username,
      "password" : password
    }
    
    spark.read.jdbc(url=jdbc_url, table='INFORMATION_SCHEMA.COLUMNS', properties= connectionProperties).createOrReplaceTempView("TABLE_COLUMNS")
    columns = list((row.COLUMN_NAME) for (index, row) in spark.sql("select TABLE_NAME, TABLE_CATALOG, COLUMN_NAME from TABLE_COLUMNS where TABLE_NAME = '{0}' and TABLE_CATALOG = '{1}'".format(table, dbname)).select("COLUMN_NAME").toPandas().iterrows())
    return columns

# Helper Function: Get Colum names and datatypes from RDS
def get_table_colum_datatypes(table, host, port, username, password, dbname):

    jdbc_url = "jdbc:sqlserver://{0}:{1};databaseName={2}".format(host, port, dbname)
    
    connectionProperties = {
      "user" : username,
      "password" : password
    }
    
    spark.read.jdbc(url=jdbc_url, table='INFORMATION_SCHEMA.COLUMNS', properties= connectionProperties).createOrReplaceTempView("TABLE_COLUMNS")
    return spark.sql("select TABLE_NAME, COLUMN_NAME, DATA_TYPE from TABLE_COLUMNS WHERE TABLE_NAME ='{0}'".format(table))

# Helper Function: Setup the primary key condition
def get_iceberg_table_condition(database, tablename):
    
    jdbc_url = "jdbc:sqlserver://{0}:{1};databaseName={2}".format(host, port, database)
    
    connectionProperties = {
      "user" : username,
      "password" : password
    }
    
    spark.read.jdbc(url=jdbc_url, table='INFORMATION_SCHEMA.TABLE_CONSTRAINTS', properties=connectionProperties).createOrReplaceTempView("TABLE_CONSTRAINTS")
    spark.read.jdbc(url=jdbc_url, table='INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE', properties=connectionProperties).createOrReplaceTempView("CONSTRAINT_COLUMN_USAGE")
    
    condition = ''
    
    for key in spark.sql("select C.COLUMN_NAME FROM TABLE_CONSTRAINTS T JOIN CONSTRAINT_COLUMN_USAGE C ON C.CONSTRAINT_NAME=T.CONSTRAINT_NAME WHERE T.CONSTRAINT_TYPE='PRIMARY KEY' AND c.TABLE_NAME = '{0}'".format(table)).collect():
        condition += "target.{0} = source.{0} and".format(key.COLUMN_NAME)
    return condition[:-4]

    
# Read incoming data from Amazon S3
def read_cdc_S3(source_s3_bucket, source_s3_cdc_file_key):
    
    inputDf = (spark
                    .read
                    .option("header", False)
                    .option("inferSchema", True)
                    .option("recursiveFileLookup", "true")
                    .csv("s3://" + source_s3_bucket + "/" + source_s3_cdc_file_key)
                    )
    return inputDf

# Setup Spark configuration for reading and writing Iceberg tables
spark = (
    SparkSession.builder
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.glue_catalog.warehouse", "s3://{0}".format(target_s3_bucket))
    .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
    .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    .getOrCreate()
)

#Initialize MSSQL credentials
host, port, username, password = get_db_credentials(secret_id)

#Read the cdc file 
cdc_df = read_cdc_S3(source_s3_bucket, source_s3_cdc_file_key)

tables = cdc_df.toPandas()._c1.unique().tolist()

#Loop over tables in the cdc file
for table in tables:
    #Create dataframes for delets and for inserts and updates
    table_df_deletes = cdc_df.where((cdc_df._c1 == table) & (cdc_df._c0 == "D")).drop(cdc_df.columns[0], cdc_df.columns[1], cdc_df.columns[2], cdc_df.columns[3])
    table_df_upserts = cdc_df.where((cdc_df._c1 == table) & ((cdc_df._c0 == "I") | (cdc_df._c0 == "U"))).drop(cdc_df.columns[0], cdc_df.columns[1], cdc_df.columns[2], cdc_df.columns[3])
    
    #Update column names for the dataframes
    columns = get_table_colums(table, host, port, username, password, dbname) 
    selectExpr = [] 

    for column in columns: 
        selectExpr.append(cdc_df.where((cdc_df._c1 == table)).drop(cdc_df.columns[0], cdc_df.columns[1], cdc_df.columns[2], cdc_df.columns[3]).columns[columns.index(column)] + " as " + column)

    table_df_deletes = table_df_deletes.selectExpr(selectExpr) 
    table_df_upserts = table_df_upserts.selectExpr(selectExpr)
    
    #Process Deletes
    if table_df_deletes.count() > 0:
        
        print("Delete Triggered")
        table_df_deletes.createOrReplaceTempView('deleted_rows')
        
        sql_string = """MERGE INTO glue_catalog.{0}.{1} target
                        USING (SELECT * FROM deleted_rows) source
                        ON {2}
                        WHEN MATCHED 
                        THEN DELETE""".format(database, table.lower(), get_iceberg_table_condition(database, table.lower()))
        spark.sql(sql_string)
    
    if table_df_upserts.count() > 0:
        print("Upsert triggered")

        #Upsert Records when there are Schema Changes
        if len(table_df_upserts.columns) != len(columns):

            #Handle column deletes
            if len(table_df_upserts.columns) < len(columns):

                drop_columns = list(set(columns) - set(table_df_upserts.columns))

                for drop_column in drop_columns:
                    sql_string = """
                                    ALTER TABLE glue_catalog.{0}.{1}
                                    DROP COLUMN {2}""".format(dbname.lower(), table.lower(), drop_column)
                    spark.sql(sql_string)

            #Handle column additions
            elif len(table_df_upserts.columns) > len(columns):

                column_datatype_df = get_table_colum_datatypes(table, host, port, username, password, dbname)
                add_columns = list(set(table_df_upserts.columns) - set(columns))

                for add_column in add_columns:

                    #Set Iceberg data type
                    data_type = list((row.DATA_TYPE) for (index, row) in column_datatype_df.filter("COLUMN_NAME='{0}'".format(add_column)).select("DATA_TYPE").toPandas().iterrows())[0]

                    # Convert MSSQL Datatypes to Iceberg supported datatypes
                    if data_type.lower() in ["varchar", "char"]:
                        data_type = "string"

                    if data_type.lower() in ["bigint"]:
                        data_type = "long"

                    if data_type.lower() in ["array"]:
                        data_type = "list"

                    sql_string = """
                                    ALTER TABLE glue_catalog.{0}.{1}
                                    ADD COLUMN {2} {3}""".format(dbname.lower(), table.lower(), add_column, data_type)
                    spark.sql(sql_string)
                    
            #Create statement to update columns
            update_table_column_list = ""
            insert_column_list = ""
            columns = get_table_colums(table, host, port, username, password, dbname)             

            for column in columns:

                update_table_column_list+="""target.{0}=source.{0},""".format(column)
                insert_column_list+="""source.{0},""".format(column)

            table_df_upserts.createOrReplaceTempView('updated_rows')

            sql_string = """MERGE INTO glue_catalog.{0}.{1} target
                            USING (SELECT * FROM updated_rows) source
                            ON {2}
                            WHEN MATCHED 
                            THEN UPDATE SET {3} 
                            WHEN NOT MATCHED THEN INSERT ({4}) VALUES ({5})""".format(dbname.lower(), 
                                                                                      table.lower(), 
                                                                                      get_iceberg_table_condition(dbname.lower(), table.lower()), 
                                                                                      update_table_column_list.rstrip(","), 
                                                                                      ",".join(columns), 
                                                                                      insert_column_list.rstrip(","))

            spark.sql(sql_string)

    
print("CDC job complete")

The Iceberg MERGE INTO syntax can handle cases where a new column is added. For more details on this feature, see the Iceberg MERGE INTO syntax documentation. If the CDC job needs to process many tables in the CDC file, the job can be multi-threaded to process the file in parallel.

 

Configure EventBridge notifications, SQS queue, and Step Functions state machine

You can use EventBridge notifications to send notifications to EventBridge when certain events occur on S3 buckets, such as when new objects are created and deleted. For this post, we’re interested in the events when new CDC files from AWS DMS arrive in the bronze S3 bucket. You can create event notifications for new objects and insert the file names into an SQS queue. A Lambda function within Step Functions would consume from the queue, extract the file name, start a CDC Glue job, and pass the file name as a parameter to the job.

AWS DMS CDC files contain database insert, update, and delete statements. We need to process these in order, so we use an SQS FIFO queue, which preserves the order of messages in which they arrive. You can also configure Amazon SQS to set a time to live (TTL); this parameter defines how long a message stays in the queue before it expires.

Another important parameter to consider when configuring an SQS queue is the message visibility timeout value. While a message is being processed, it disappears from the queue to make sure that the message isn’t consumed by multiple consumers (AWS Glue jobs in our case). If the message is consumed successfully, it should be deleted from the queue before the visibility timeout. However, if the visibility timeout expires and the message isn’t deleted, the message reappears in the queue. In our solution, this timeout must be greater than the time it takes for the CDC job to process a file.

Lastly, we recommend using Step Functions to define a workflow for handling the full load and CDC files. Step Functions has built-in integrations to other AWS services like Amazon SQS, AWS Glue, and Lambda, which makes it a good candidate for this use case.

The Step Functions state machine starts with checking the status of the AWS DMS task. The AWS DMS tasks can be queried to check the status of the full load, and we check the value of the parameter FullLoadProgressPercent. When this value gets to 100%, we can start processing the full load files. After the AWS Glue job processes the full load files, we start polling the SQS queue to check the size of the queue. If the queue size is greater than 0, this means new CDC files have arrived and we can start the AWS Glue CDC job to process these files. The AWS Glue jobs processes the CDC files and deletes the messages from the queue. When the queue size reaches 0, the AWS Glue job exits and we loop in the Step Functions workflow to check the SQS queue size.

Because the Step Functions state machine is supposed to run indefinitely, it’s good to keep in mind that there will be service limits you need to adhere to. Namely, the maximum runtime, which is 1 year, and maximum run history size, i.e., state transitions or events for a state machine which is 25,000. We recommend adding an additional step at the end to check if either of these conditions are being met to stop the current state machine run and start a new one.

The following diagram illustrates how you can use Step Functions state machine history size to monitor and start a new Step Functions state machine run.

Step Functions Workflow

Configure the pipeline

The pipeline needs to be configured to address cost, performance, and resilience goals. You might want a pipeline that can load fresh data into the data lake and make it available quickly, and you might also want to optimize costs by loading large chunks of data into the data lake. At the same time, you should make the pipeline resilient and be able to recover in case of failures. In this section, we cover the different parameters and recommended settings to achieve these goals.

Step Functions is designed to process incoming AWS DMS CDC files by running AWS Glue jobs. AWS Glue jobs can take a couple of minutes to boot up, and when they’re running, it’s efficient to process large chunks of data. You can configure AWS DMS to write CSV files to Amazon S3 by configuring the following AWS DMS task parameters:

  • CdcMaxBatchInterval – Defines the maximum time limit AWS DMS will wait before writing a batch to Amazon S3
  • CdcMinFileSize – Defines the minimum file size AWS DMS will write to Amazon S3

Whichever condition is met first will invoke the write operation. If you want to prioritize data freshness, you should have a short CdcMaxBatchInterval value (10 seconds) and a small CdcMinFileSize value (1–5 MB). This will result in many small CSV files being written to Amazon S3 and will invoke a lot of AWS Glue jobs to process the data, making the extract, transform, and load (ETL) process faster. If you want to optimize costs, you should have a moderate CdcMaxBatchInterval (minutes) and a large CdcMinFileSize value (100–500 MB). In this scenario, we start a few AWS Glue jobs that will process large chunks of data, making the ETL flow more efficient. In a real-world use case, the required values for these parameters might fall somewhere that’s a good compromise between throughput and cost. You can configure these parameters when creating a target endpoint using the AWS DMS console, or by using the create-endpoint command in the AWS Command Line Interface (AWS CLI).

For the full list of parameters, see Using Amazon S3 as a target for AWS Database Migration Service.

Choosing the right AWS Glue worker types for the full load and CDC jobs is also crucial for performance and cost optimization. The AWS Glue (Spark) workers range from G1X to G8X, which have an increasing number of data processing units (DPUs). Full load files are usually much larger in size compared to CDC files, and therefore it’s more cost- and performance-effective to select a larger worker. For CDC files, it would be more cost-effective to select a smaller worker because files sizes are smaller.

You should design the Step Functions state machine in such a way that if anything fails, the pipeline can be redeployed after repair and resume processing from where it left off. One important parameter here is TTL for the messages in the SQS queue. This parameter defines how long a message stays in the queue before expiring. In case of failures, we want this parameter to be long enough for us to deploy a fix. Amazon SQS has a maximum of 14 days for a message’s TTL. We recommend setting this to a large enough value to minimize messages being expired in case of pipeline failures.

Clean up

Complete the following steps to clean up the resources you created in this post:

  1. Delete the AWS Glue jobs:
    1. On the AWS Glue console, choose ETL jobs in the navigation pane.
    2. Select the full load and CDC jobs and on the Actions menu, choose Delete.
    3. Choose Delete to confirm.
  2. Delete the Iceberg tables:
    1. On the AWS Glue console, under Data Catalog in the navigation pane, choose Databases.
    2. Choose the database in which the Iceberg tables reside.
    3. Select the tables to delete, choose Delete, and confirm the deletion.
  3. Delete the S3 bucket:
    1. On the Amazon S3 console, choose Buckets in the navigation pane.
    2. Choose the silver bucket and empty the files in the bucket.
    3. Delete the bucket.

Conclusion

In this post, we showed how to use AWS Glue jobs to load AWS DMS files into a transactional data lake framework such as Iceberg. In our setup, AWS Glue provided highly scalable and simple-to-maintain ETL jobs. Furthermore, we share a proposed solution using Step Functions to create an ETL pipeline workflow, with Amazon S3 notifications and an SQS queue to capture newly arriving files. We shared how to design this system to be resilient towards failures and to automate one of the most time-consuming tasks in maintaining a data lake: schema evolution.

In Part 3, we will share how to process the data lake to create data marts.


About the Authors

Shaheer Mansoor is a Senior Machine Learning Engineer at AWS, where he specializes in developing cutting-edge machine learning platforms. His expertise lies in creating scalable infrastructure to support advanced AI solutions. His focus areas are MLOps, feature stores, data lakes, model hosting, and generative AI.

Anoop Kumar K M is a Data Architect at AWS with focus in the data and analytics area. He helps customers in building scalable data platforms and in their enterprise data strategy. His areas of interest are data platforms, data analytics, security, file systems and operating systems. Anoop loves to travel and enjoys reading books in the crime fiction and financial domains.

Sreenivas Nettem is a Lead Database Consultant at AWS Professional Services. He has experience working with Microsoft technologies with a specialization in SQL Server. He works closely with customers to help migrate and modernize their databases to AWS.

Control your AWS Glue Studio development interface with AWS Glue job mode API property

Post Syndicated from Shovan Kanjilal original https://aws.amazon.com/blogs/big-data/control-your-aws-glue-studio-development-interface-with-aws-glue-job-mode-api-property/

In recent years, as the importance of big data has grown, efficient data processing and analysis have become crucial factors in determining a company’s competitiveness. AWS Glue, a serverless data integration service for integrating data across multiple data sources at scale, addresses these data processing needs. Among its features, the AWS Glue Jobs API stands out as a particularly noteworthy tool.

The AWS Glue Jobs API is a robust interface that allows data engineers and developers to programmatically manage and run ETL jobs. By using this API, it becomes possible to automate, schedule, and monitor data pipelines, enabling efficient operation of large-scale data processing tasks.

To improve customer experience with the AWS Glue Jobs API, we added a new property describing the job mode corresponding to script, visual, or notebook. In this post, we explore how the updated AWS Glue Jobs API works in depth and demonstrate the new experience with the updated API.

JobMode property

A new property JobMode describes the mode of AWS Glue jobs (script, visual, or notebook) to improve your UI experience. AWS Glue users can use the mode that best fits your preference. Some extract, transform, and load (ETL) developers prefer to use visual mode and create visual jobs using AWS Glue Studio visual editor. Some data scientists prefer to use notebooks jobs and use AWS Glue Studio notebooks. Some data engineers and developers prefer to implement script through the AWS Glue Studio script editor or preferred integrated development environment (IDE). After the job is created with the preferred mode, you can search for it by filtering on the job mode within your saved AWS Glue jobs page and find it easily. Additionally, if you are migrating existing iPython notebook files to AWS Glue Studio notebook jobs, you can now choose and set the job mode and do so for multiple jobs using this new API property, as demonstrated in this post.

How CreateJob API works with the new JobMode property

You can use CreateJob API to create AWS Glue script or a visual or notebook job. The following is an example of how it works for a visual job using AWS SDK for Python (Boto3): (replace <your-bucket-name> with your S3 bucket)

CODE_GEN_JSON_STR = '''
{
  "node-1": {
    "S3ParquetSource": {
      "Name": "Amazon S3",
      "Paths": [
        "s3://aws-bigdata-blog/generated_synthetic_reviews/data/product_category=Books/"
      ],
      "Exclusions": [],
      "Recurse": true,
      "AdditionalOptions": {
        "EnableSamplePath": false,
        "SamplePath": "s3://aws-bigdata-blog/generated_synthetic_reviews/data/product_category=Books/73612da260b94159b705cf4df12364cb_0.snappy.parquet"
      },
      "OutputSchemas": [
        {
          "Columns": [
            {
              "Name": "marketplace",
              "Type": "string"
            },
            {
              "Name": "customer_id",
              "Type": "string"
            },
            {
              "Name": "review_id",
              "Type": "string"
            },
            {
              "Name": "product_id",
              "Type": "string"
            },
            {
              "Name": "product_title",
              "Type": "string"
            },
            {
              "Name": "star_rating",
              "Type": "bigint"
            },
            {
              "Name": "helpful_votes",
              "Type": "bigint"
            },
            {
              "Name": "total_votes",
              "Type": "bigint"
            },
            {
              "Name": "insight",
              "Type": "string"
            },
            {
              "Name": "review_headline",
              "Type": "string"
            },
            {
              "Name": "review_body",
              "Type": "string"
            },
            {
              "Name": "review_date",
              "Type": "timestamp"
            },
            {
              "Name": "review_year",
              "Type": "bigint"
            }
          ]
        }
      ]
    }
  },
  "node-2": {
    "DropFields": {
      "Name": "Drop Fields",
      "Inputs": [
        "node-1"
      ],
      "Paths": [
        [
          "review_headline"
        ],
        [
          "review_body"
        ],
        [
          "review_date"
        ]
      ]
    }
  },
  "node-3": {
    "S3DirectTarget": {
      "Name": "Amazon S3",
      "Inputs": [
        "node-2"
      ],
      "PartitionKeys": [],
      "Path": "s3://<your-bucket-name>/data/jobmode-blog/output/parquet/",
      "Compression": "snappy",
      "Format": "parquet",
      "SchemaChangePolicy": {
        "EnableUpdateCatalog": false
      }
    }
  }
}
'''

glue_client = boto3.client('glue')
codeGenJson = json.loads(constants.CODE_GEN_JSON_STR, strict=False)

# Call the create_job method
try:
    glue_client.create_job(
        Name="glue-visual-job",
        Description="Glue Visual ETL job",
        Command={'Name': 'glueetl', 'ScriptLocation': "s3://aws-glue-assets-<account-id>-<region>/scripts/glue-visual-job", 'PythonVersion': "3"},
        WorkerType=constants.WORKERTYPE,
        NumberOfWorkers="G.1X",
        Role=<role-arn>,  
        GlueVersion="4.0",        
        CodeGenConfigurationNodes=codeGenJson,
        JobMode="VISUAL"
    )
    print("Successfully created Glue job")
except Exception as e:
    print(f"Error creating Glue job: {str(e)}")

CODE_GEN_JSON_STR represents the visual nodes for the AWS Glue Job. There are three nodes: node-1 uses S3 source, node-2 does transformation, and node-3 uses S3 target. The script instantiates the AWS Glue Boto3 client, loads the JSON, and calls the create_job. JobMode is set to VISUAL.

After you run the Python script, a new job is created. The following screenshot shows how the created job looks in AWS Glue visual editor.

There are three nodes in the visual directed acyclic graph (DAG): node 1 sources product review data for the product_category book from the public S3 bucket, node-2 drops some of the fields that aren’t needed for downstream systems, and node-3 persists the transformed data in a local S3 bucket.

How CloudFormation works with the new JobMode property

You can use AWS CloudFormation to create different types of AWS Glue jobs by specifying the JobMode parameter with the AWS::Glue::Job resource. The supported job modes include:

  • SCRIPT
  • VISUAL
  • NOTEBOOK

In this example, you create a AWS Glue notebook job using AWS CloudFormation, which requires setting the JobMode parameter to NOTEBOOK.

  1. Create a Jupyter Notebook file containing your logic and code, and save the notebook file with a descriptive name, such as my-glue-notebook.ipynb. Alternatively you can download the notebook file, and rename it to my-glue-notebook.ipynb.
  2. Upload the Notebook file to the notebooks/ folder within the aws-glue-assets-<account-id>-<region> S3 bucket.
  3. Create a new CloudFormation template to create a new AWS Glue job, specifying the NotebookJobName parameter as the same name as the Notebook file. Here’s the sample snippet of CloudFormation template:
    AWSTemplateFormatVersion: '2010-09-09'
    Description: CloudFormation template for creating an AWS Glue ETL job using a Jupyter Notebook
    
    Parameters:
      NotebookJobName:
        Type: String
        Description: Name of the AWS Glue ETL Notebook job
    
    Resources:
      GlueJobRole:
        Type: AWS::IAM::Role
        Properties:
          RoleName: !Sub ${AWS::StackName}-GlueJobRole
          AssumeRolePolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Principal:
                  Service:
                    - glue.amazonaws.com
                Action:
                  - sts:AssumeRole
          ManagedPolicyArns:
            - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
          Policies:
            - PolicyName: GlueJobS3Access
              PolicyDocument:
                Version: '2012-10-17'
                Statement:
                  - Effect: Allow
                    Action:
                      - iam:PassRole
                    Resource:
                      - !Sub arn:aws:iam::${AWS::AccountId}:role/${AWS::StackName}-GlueJobRole
    
      ETLNotebookJob:
        Type: AWS::Glue::Job
        Properties:
          Name: !Ref NotebookJobName
          Description: ETL job using a Jupyter Notebook
          Role: !GetAtt GlueJobRole.Arn
          Command:
            Name: glueetl
            PythonVersion: '3'
            ScriptLocation: !Sub s3://aws-glue-assets-${AWS::AccountId}-${AWS::Region}/scripts/${NotebookJobName}.py
          DefaultArguments:
            '--job-bookmark-option': job-bookmark-enable
          JobMode: NOTEBOOK
    
    Outputs:
      ETLNotebookJobName:
        Value: !Ref ETLNotebookJob
        Description: Name of the ETL Notebook job

  4. Deploy the CloudFormation template. For NotebookJobName, enter same name as the notebook file.
  5. Verify that the AWS Glue job you created is listed and that it has the name you specified in the CloudFormation template.

AWS Glue notebook shows the Notebook job that contains the existing cells that you had in the ipynb file. You can review the job details to confirm it’s configured correctly.

Console experience

On the AWS Glue console, in the navigation pane, choose ETL Jobs to observe all your ETL jobs listed. Here you have different columns Job name, Type, Created by, Last modified, and AWS Glue version. You can sort and filter by these columns. The following screenshot shows how it looks.

We also enhanced the console experience with the JobMode introduction. The Created by column on the console gives you information about JobMode of the job. You can filter access jobs created by VISUAL, NOTEBOOK, or SCRIPT, as shown in the following screenshot.

This new console experience helps you search and discover your jobs based on JobMode.

Conclusion

This post demonstrated how AWS Glue Job API works with the newly introduced job mode property. With the new property, you can explicitly choose the mode of each job. The steps instructed detailed usage in API, AWS SDK, and CloudFormation. Additionally, the property makes it straightforward to search and discover your jobs quickly on the AWS Glue console.


About the Authors

Shovan Kanjilal is a Senior Analytics and Machine Learning Architect with Amazon Web Services. He is passionate about helping customers build scalable, secure, and high-performance data solutions in the cloud.

Manoj Shunmugam is a DevOps Consultant in Professional Services at Amazon Web Services. He works with customers to establish infrastructures using cloud-centered and/or container-based platforms in the AWS Cloud.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling on his road bike.

Gal HeyneGal Heyne is a Product Manager for AWS Glue with a strong focus on AI/ML, data engineering, and BI. She is passionate about developing a deep understanding of customers’ business needs and collaborating with engineers to design easy-to-use data products.

Demystify data sharing and collaboration patterns on AWS: Choosing the right tool for the job

Post Syndicated from Ramakant Joshi original https://aws.amazon.com/blogs/big-data/demystify-data-sharing-and-collaboration-patterns-on-aws-choosing-the-right-tool-for-the-job/

Data is the most significant asset of any organization. However, enterprises often encounter challenges with data silos, insufficient access controls, poor governance, and quality issues. Embracing data as a product is the key to address these challenges and foster a data-driven culture.

In this context, the adoption of data lakes and the data mesh framework emerges as a powerful approach. By decentralizing data ownership and distribution, enterprises can break down silos and enable seamless data sharing. Cataloging data, making the data searchable, implementing robust security and governance, and establishing effective data sharing processes are essential to this transformation. AWS offers services like AWS Data Exchange, AWS Glue, AWS Clean Rooms and Amazon DataZone to help organizations unlock the full potential of their data.

Personas

Let’s identify the various roles involved in the data sharing process.

First of all, there are data producers, which might include internal teams/systems, third-party producers, and partners. The data consumers include internal stakeholders/systems, external partners, and end-customers. At the core of this ecosystem lies the enterprise data platform. When considering enterprises, numerous personas come into play:

  • Line of business users – These personas need to classify data, add business context, collaborate effectively with other lines of business, gain enhanced visibility into business key performance indicators (KPIs) for improved outcomes, and explore opportunities for monetizing data
  • Partners – Partners should be able to share data, collaborate with other partners and customers.
  • Data scientists and business analysts – These personas should be able to access the data, analyze it and generate actionable business insights
  • Data engineers – Data engineers are tasked with building the proper data pipeline and cataloging the data that meets the diverse needs of stakeholders, including business analysts, data scientists, partners, and line of business users
  • Data security and governance officers – Data security involves making sure producers and consumers have appropriate access to the data, implementing right access permissions, and maintaining compliance with industry regulations, particularly in highly regulated sectors like healthcare, life sciences, and financial services. This persona is also responsible for enhancing data governance by tracking lineage, and establishing data mesh policies

Choosing the right tool for the job

Now that you have identified the various personas, it’s important to select the appropriate tools for each role:

  • Starting with the producers, if your data source includes a software as a service (SaaS) platform, AWS Glue offers options to automate data flows between software service providers and AWS services.
  • For producers seeking collaboration with partners, AWS Clean Rooms facilitates secure collaboration and analysis of collective datasets without the need to share or duplicate underlying data.
  • When dealing with third-party data sources, AWS Data Exchange simplifies the discovery, subscription, and utilization of third-party data from a diverse range of producers or providers. As a producer, you can also monetize your data through the subscription model using AWS Data Exchange.
  • Within your organization, you can democratize data with governance, using Amazon DataZone, which offers built-in governance features.
  • For SaaS consumers, AWS Glue supports bidirectional transfer and serves both as a producer and consumer tool for various SaaS providers.

Let’s briefly describe the capabilities of the AWS services we referred above:

AWS Glue is a fully managed, serverless, and scalable extract, transform, and load (ETL) service that simplifies the process of discovering, preparing, and loading data for analytics. It provides data catalog, automated crawlers, and visual job creation to streamline data integration across various data sources and targets.

AWS Data Exchange enables you to find, subscribe to, and use third-party datasets in the AWS Cloud. It also provides a platform through which a data producer can make their data available for consumption for subscribers. It is a data marketplace featuring over 300 providers offering thousands of datasets accessible through files, Amazon Redshift tables, and APIs. This service supports consolidated billing and subscription management, offering you the flexibility to explore 1,000 free datasets and samples. You don’t need to set up a separate billing mechanism or payment method specifically for AWS Data Exchange subscriptions.

AWS Clean Rooms is designed to assist companies and their partners in securely analyzing and collaborating on collective datasets without revealing or sharing underlying data. You can swiftly create a secure data clean room, fostering collaboration with other entities on the AWS Cloud to derive unique insights for initiatives such as advertising campaigns or research and development. This service protects underlying data through a comprehensive set of privacy-enhancing controls and flexible analysis rules tailored to specific business needs.

Amazon DataZone is a data management service that makes it fast and straightforward to catalog, discover, share, and govern data stored across AWS, on-premises, and third-party sources. With Amazon DataZone, administrators and data stewards who oversee an organization’s data assets can manage and govern access to data using fine-grained controls. These controls are designed to grant access with the right level of privileges and context. Amazon DataZone makes it straightforward for engineers, data scientists, product managers, analysts, and business users to access data throughout an organization so they can discover, use, and collaborate to derive data-driven insights.

Use cases

Let’s review some example use cases to understand how these diverse services can be effectively applied within a business context to achieve the desired outcomes. In this particular scenario, we focus on a company named AnyHealth, which operates in the healthcare and life sciences sector. This company encompasses multiple lines of businesses, specializing in the sale of various scientific equipment. Three key requirements have been identified:

  • Sales and customer visibility by line of business – AnyHealth wants to gain insights into the sales performance and customer demands specific to each line of business. This necessitates a comprehensive view of sales activities and customer requirements tailored to individual lines of business.
  • Cross-organization supply chain and inventory visibility – The company faces challenges related to supply chain and inventory management, especially in global crisis situations like a pandemic. They want to address instances where inventory items are idle in one line of business while there is demand for the same items in another. To overcome this, they want to establish cross-organizational visibility of supply chain and inventory data, breaking down silos and achieving prompt responses to business demands.
  • Cross-sell and up-sell opportunities – AnyHealth intends to boost sales by implementing cross-selling and up-selling strategies. To achieve this, they plan to use machine learning (ML) models to extract insights from data. These insights will then be provided to sales representatives and resellers, enabling them to identify and capitalize on opportunities effectively.

In the following sections, we discuss how to address each requirement in more detail and the AWS services that best fit each solution.

Sales and customer visibility by line of business

The first requirement involves obtaining visibility into sales and customer demand by line of business. The key consumers of this data include line of business leaders, business analysts, and various other business stakeholders.

The initial step is to ingest sales and order data into the platform. Currently, this data is centralized in the ERP system, specifically SAP. The objective is to regularly retrieve this data and capture any changes that occur. The data engineers are instrumental in building this pipeline. Given that we are dealing with a SaaS integration, AWS Glue is the logical choice for seamless data ingestion.

Next, we focus on building the enterprise data platform where the accumulated data will be hosted. This platform will incorporate robust cataloging, making sure the data is easily searchable, and will enforce the necessary security and governance measures for selective sharing among business stakeholders, data engineers, analysts, security and governance officers. In this context, Amazon DataZone is the optimal choice for managing the enterprise data platform.

As stated earlier, the first step involves data ingestion. Data is ingested from a third-party vendor SaaS solution (SAP), and the data engineer uses AWS Glue. Utilizing the SAP data connector, the data engineer establishes a connection with the SAP environment, running scheduled jobs.

The data lands in Amazon Simple Storage Service (Amazon S3). Additional AWS Glue jobs are created to transform and curate the data. The curated data is placed in a designated bucket and AWS Glue crawlers are run to catalog the data. This cataloged data is then managed through Amazon DataZone.

In Amazon DataZone, the data security officer creates the corporate domain. She/he creates producer projects and enables access to data engineers, and business analysts. Data engineers ensure sales and customer data is available from the source into the Amazon DataZone project. Business analysts enhance the data with business metadata/glossaries and publish the same as data assets or data products. The data security officer sets permissions in Amazon DataZone to allow users to access the data portal. Users can search for assets in the Amazon DataZone catalog, view the metadata assigned to them, and access the assets.

Amazon Athena is used to query, and explore the data. Amazon QuickSight is used to read from Amazon Athena and generate reports that is consumed by the line of business users and other stakeholders.

The following diagram illustrates the solution architecture using AWS services.

Cross-organization supply chain and inventory visibility

For the second requirement, the objective is to achieve visibility of supply chain and inventory across the organization. The key stakeholders remain line of business users. They would like to get a cross-organization visibility of supply chain and inventory data. The aim is to ingest supply chain and inventory information in a scheduled manner from the ERP system (SAP), and also capture any changes in the supply chain and inventory data. The persona involved in setting up the data ingestion pipeline is a data engineer. Given that we are extracting data from SAP, AWS Glue is the suitable choice for this requirement.

The next step involves obtaining economic indicators and weather information from third-party sources. AnyHealth, with its diverse lines of business, including one that manufactures medical equipment such as inhalers for asthma treatment, recognizes the significance of collecting weather information, particularly data about pollen, because it directly impacts the patient population. Additionally, socioeconomic conditions play a crucial role in government-assisted programs related to out-of-hospital care. To incorporate this third-party data, AWS Data Exchange is the logical choice.

Finally, all the accumulated data needs to be hosted on the enterprise data platform, with cataloging, and robust security and governance measures. In this context, Amazon DataZone is the preferred solution.

The pipeline begins with the ingestion of data from SAP, facilitated by AWS Glue. The data lands in Amazon S3, where AWS Glue jobs are used to curate the data, generate curated tables, and then AWS Glue crawlers are used to catalog the data.

AWS Data Exchange serves as the platform for collecting economic trends and weather information. The business analyst leverages AWS Data Exchange to retrieve data from various sources. In the AWS Data Exchange marketplace, they identify the data set, subscribe to the data, and subsequently consume it. Any changes in the source data invokes events, which updates the data object in the Amazon S3 bucket.

Amazon DataZone is used to manage and govern the datalake. Similar to the first use case, the data security officer creates a producer project. The data owner from LoB creates supply chain and inventory data assets in the producer project and publishes the same. From the consumer perspective, the data security officer also creates a consumer project, which allows the sales and marketing teams from different LoBs to search for the supply chain and inventory data published by the producer. Consumers request access to the published supply chain and inventory data, and the producer grants the necessary access. Amazon Athena is used to query, and explore the data. Amazon QuickSight is used to read from Amazon Athena and generate reports.

The following diagram illustrates this architecture.

Cross-sell and up-sell opportunities

The third requirement involves identifying cross-sell and up-sell opportunities. The key business consumers in this context are the sales representatives and resellers. AnyHealth operates globally, selling products in Europe, America, and Asia. Direct business transactions with consumers occur in America and Europe, and resellers facilitate sales in Asia, where AnyHealth lacks a direct relationship with the consumers.

The enterprise data platform is used to host and analyze the sales data and identify the customer demand. This data platform is managed by Amazon Data Zone. Cross-sell and up-sell opportunities, derived through ML models, are integrated into the customer relationship management (CRM) system, which in this case is Salesforce. Sales representatives access this data from Salesforce to engage with the market and collaborate with customers. AWS Glue is used for this integration.

Typically, resellers don’t provide their partners direct access to their customer data. Although AnyHealth doesn’t have direct access, understanding customer personas and profile information is essential to equip resellers with right offers to cross-sell and up-sell products. AWS Clean Rooms enables collaboration on collective datasets with stringent security controls, enabling insights without sharing the underlying data.

By addressing these requirements, AnyHealth can effectively identify and capitalize on cross-sell and up-sell opportunities, tailoring their approach based on the distinct dynamics of direct and reseller-based business models across various regions.

The initial step in the architecture involves a pipeline where SAP data is ingested into Amazon S3 and curated using AWS Glue job. The curated data is cataloged, governed and managed using Amazon DataZone.

In this scenario, where sales and customer information are acquired, data scientists build ML models to identify cross-sell and upsell opportunities. Using Amazon DataZone, these opportunities are shared with line of business users, providing transparency regarding the opportunities presented to sales reps and resellers. The cross-sell and upsell insights are pushed to Salesforce through AWS Glue, with an event-driven workflow for timely communication to sales reps. However, for resellers, a different pipeline is needed as AnyHealth doesn’t have direct access to the customer sales data. AnyHealth uses AWS Clean Rooms for this purpose.

With AWS Clean Rooms, the collaboration is started by AnyHealth (the collaboration initiator) who invites resellers to join. Resellers participate in the collaboration, and share the customer profile and segment information, while maintaining privacy by excluding customer names and contact details. AnyHealth uses the customer profile information and order trends to identify cross-sell and upsell opportunities. These opportunities are shared with the reseller to pursue further and position products in the market.

The following diagram illustrates this architecture.

Final architecture

Let’s now examine the complete architecture which covers all three use cases. In this architecture, purpose-built services like AWS Data Exchange, AWS Glue, AWS Clean Rooms and Amazon DataZone, have been used. The seamless integration of these services works cohesively to achieve end-to-end business objectives.

The following diagram illustrates this architecture.

To strengthen the security posture of your cloud infrastructure, we recommend using AWS Identity and Access Management (IAM), which allows you to manage access to AWS resources by creating users, groups, and roles with specific permissions. Additionally, you can use AWS Key Management Service (AWS KMS), which enables you to create, manage, and control encryption keys used to protect your data, so only authorized entities can access sensitive information. To provide an audit trail for compliance, you can use AWS CloudTrail, which records API calls made within your AWS account.

Conclusion

In this post, we discussed how to choose right tool for building an enterprise data platform and enabling data sharing, collaboration and access within your organization and with third-party providers. We addressed three business use cases using AWS Glue, AWS Data Exchange, AWS Clean Rooms, and Amazon DataZone through three different use cases.

To learn more about these services, check out the AWS Blogs for Amazon DataZone, AWS Glue, AWS Clean Rooms, and AWS Data Exchange.


About the authors

Ramakant Joshi is an AWS Solutions Architect, specializing in the analytics and serverless domain. He has a background in software development and hybrid architectures, and is passionate about helping customers modernize their cloud architecture.

Debaprasun Chakraborty is an AWS Solutions Architect, specializing in the analytics domain. He has around 20 years of software development and architecture experience. He is passionate about helping customers in cloud adoption, migration and strategy.

Enriching metadata for accurate text-to-SQL generation for Amazon Athena

Post Syndicated from Naidu Rongali original https://aws.amazon.com/blogs/big-data/enriching-metadata-for-accurate-text-to-sql-generation-for-amazon-athena/

Extracting valuable insights from massive datasets is essential for businesses striving to gain a competitive edge. Enterprise data is brought into data lakes and data warehouses to carry out analytical, reporting, and data science use cases using AWS analytical services like Amazon Athena, Amazon Redshift, Amazon EMR, and so on. Amazon Athena provides interactive analytics service for analyzing the data in Amazon Simple Storage Service (Amazon S3). Amazon Redshift is used to analyze structured and semi-structured data across data warehouses, operational databases, and data lakes. Amazon EMR provides a big data environment for data processing, interactive analysis, and machine learning using open source frameworks such as Apache Spark, Apache Hive, and Presto. These data processing and analytical services support Structured Query Language (SQL) to interact with the data.

Writing SQL queries requires not just remembering the SQL syntax rules, but also knowledge of the tables metadata, which is data about table schemas, relationships among the tables, and possible column values. Large language model (LLM)-based generative AI is a new technology trend for comprehending a large corpora of information and assisting with complex tasks. Can it also help write SQL queries? The answer is yes.

Generative AI models can translate natural language questions into valid SQL queries, a capability known as text-to-SQL generation. Although LLMs can generate syntactically correct SQL queries, they still need the table metadata for writing accurate SQL query. In this post, we demonstrate the critical role of metadata in text-to-SQL generation through an example implemented for Amazon Athena using Amazon Bedrock. We discuss the challenges in maintaining the metadata as well as ways to overcome those challenges and enrich the metadata.

Solution overview

This post demonstrates text-to-SQL generation for Athena using an example implemented using Amazon Bedrock. We use Anthropic’s Claude 2.1 foundation model (FM) in Amazon Bedrock as the LLM. Amazon Bedrock models are invoked using Amazon SageMaker. Working examples are designed to demonstrate how various details included in the metadata influences the SQL generated by the model. These examples use synthetic datasets created in AWS Glue and Amazon S3. After we review the significance of these metadata details, we’ll delve into the challenges encountered in gathering the required level of metadata. Subsequently, we’ll explore strategies for overcoming these challenges.

The examples implemented in the workflow are illustrated in the following diagram.

the solution architecture and workflow

Figure 1. The solution architecture and workflow.

The workflow follows the following sequence:

  1. A user asks a text-based question which can be answered by querying relevant AWS Glue tables through Athena.
  2. Table metadata is fetched from AWS Glue.
  3. The tables’ metadata and SQL generating instructions are added to the prompt template. The Claude AI model is invoked by passing the prompt and the model parameters.
  4. The Claude AI model translates the user intent (question) to SQL based on the instructions and tables’ metadata.
  5. The generated Athena SQL query is run.
  6. The generated Athena SQL query and the SQL query results are returned to the user.

Prerequisites

These prerequisites are given If you want to try this example yourself. You can skip this prerequisites section if you want to understand the example without implementing it. The example centers on invoking Amazon Bedrock models using SageMaker, so we need to set up a few resources in an AWS Account. The relevant CloudFormation template, Jupyter Notebooks, and details of launching the necessary AWS services are covered in this section. The CloudFormation template creates the SageMaker instance with the necessary S3 bucket and IAM role permissions to run AWS Glue commands, Athena SQL, and invoke Amazon Bedrock AI models. The two Jupyter Notebooks (0_create_tables_with_metadata.ipynb and 1_text-to-sql-for-athena.ipynb) provide working code snippets to create the necessary tables and generate the SQL using the Claude AI model on Amazon Bedrock.

Granting Anthropic’s Claude permissions on Amazon Bedrock 

  • Have an AWS account and sign in using the AWS Management Console.
  • Change the AWS Region to US West (Oregon).
  • Navigate to the AWS Service Catalog console and choose Amazon Bedrock.
  • On the Amazon Bedrock console, choose Model Access in the navigation pane.
  • Choose Manage model access.
  • Select the Claude
  • Choose Request model access if you’re requesting the model access for the first time. Otherwise choose Save Changes.

Deploying the CloudFormation stack

BDB-4100-CFN-Launch-Stack

After launching the CloudFormation stack:

  • On the Create stack page, choose Next
  • On the Specify stack details page, choose Next
  • On the Configure stack options page, choose Next
  • On the Review and create page, select I acknowledge that AWS CloudFormation might create IAM resources
  • Choose Submit

Downloading Jupyter Notebooks to  SageMaker 

  • In the AWS Management Console, choose the name of the currently displayed Region and change it to US West (Oregon).
  • Navigate to the AWS Service Catalog console and choose Amazon SageMaker.
  • On the Amazon SageMaker console, choose Notebook in the navigation pane.
  • Choose Notebook instances.
  • Select the SageMakerNotebookInstance created by the texttosqlmetadata CloudFormation stack.
  • Under Actions, choose Open Jupyter
  • Navigate to Jupyter console, select New, and then choose Console.
  • Run the following Shell script commands in the console to copy the Jupyter Notebooks.
    cd /home/ec2-user/SageMaker
    BASE_S3_PATH="s3://aws-blogs-artifacts-public/artifacts/BDB-4265"
    aws s3 cp "${BASE_S3_PATH}/0_create_tables_with_metadata.ipynb" ./
    aws s3 cp "${BASE_S3_PATH}/1_text_to_sql_for_athena.ipynb" ./
    

  • Open each downloaded Notebook and update the values of the athena_results_bucket, aws_region, and athena_workgroup variables based on the outputs from the texttosqlmetadata CloudFormation

Solution implementation

If you want to try this example yourself, try the CloudFormation template provided in the previous section. In the subsequent sections, we will illustrate how each element of the metadata included in the prompt influences the SQL query generated by the model.

  1. The steps in the 0_create_tables_with_metadata.ipynb Jupyter Notebook create Amazon S3 files with synthetic data for employee and department datasets, creates employee_dtls and department_dtls Glue tables pointing to those S3 buckets, and extracts the following metadata for these two tables.
    CREATE EXTERNAL TABLE employee_dtls (
    	id int COMMENT 'Employee id',
    	name string COMMENT 'Employee name',
    	age int COMMENT 'Employee age',
    	dept_id int COMMENT 'Employee Departments ID',
    	emp_category string COMMENT 'Employee category. Contains TEMP For temporary, PERM for permanent, CONTR for contractors ',
    	location_id int COMMENT 'Location identifier of the Employee',
    	joining_date date COMMENT 'Joining date of the Employee',
    	CONSTRAINT pk_1 PRIMARY KEY  (id) ,
    	CONSTRAINT FK_1 FOREIGN KEY (dept_id) REFERENCES department_dtls(id)
    ) 
    PARTITIONED BY (
    	region_id string COMMENT 'Region identifier. Contains AMER for Americas, EMEA for Europe, the Middle East, and Africa, APAC for Asia Pacific countries'
    );
    
    CREATE EXTERNAL TABLE department_dtls (
    	id int COMMENT 'Department id',
    	name string COMMENT 'Department name',
    	location_id int COMMENT 'Location identifier of the Department'
    )

  2. The metadata extracted in the previous step provides column descriptions. For the region_id partition column and emp_category column, the description provides possible values along with their meaning. The metadata also has foreign key constraint details. AWS Glue doesn’t provide a way to specify the primary key and foreign key constraints, so use custom keys in the AWS Glue table-level parameters as an alternative to gather primary key and foreign keys while creating the AWS Glue table.
    # Define the table schema
    employee_table_input = {
        'Name': employee_table_name,
        'PartitionKeys': [
            {'Name': 'region_id', 'Type': 'string', 'Comment': 'Region identifier. Contains AMER for Americas, EMEA for Europe, the Middle East, and Africa, APAC for Asia Pacific countries'}
        ],
        'StorageDescriptor': {
            'Columns': [
                {'Name': 'id', 'Type': 'int', 'Comment': 'Employee id'},
           …
            ],
            'Location': employee_s3_path,
         …
        'TableType': 'EXTERNAL_TABLE',
        'Parameters': {
            'classification': 'csv',
            'primary_key': 'CONSTRAINT pk_1 PRIMARY KEY  (id)',
            'foreign_key_1': 'CONSTRAINT FK_1 FOREIGN KEY (dept_id) REFERENCES department_dtls(id)'          
        }
    }
    
    # Create the table
    response = glue_client.create_table(DatabaseName=database_name, TableInput=employee_table_input)
    

  3. The steps in the 1_text-to-sql-for-athena.ipynb Jupyter notebook create the following wrapper function to interact with Claude FM on Amazon Bedrock to generate SQL based on user-provided text wrapped up in a prompt. This function hard codes the model’s parameters and model ID for demonstrating the basic functionality.
    def interactWithClaude(prompt):
    
        body = json.dumps(
            {
                "prompt": prompt,
                "max_tokens_to_sample": 2048,
                "temperature": 1,
                "top_k": 250,
                "top_p": 0.999,
                "stop_sequences": [],
            }
        )
        modelId = "anthropic.claude-v2"  
        accept = "application/json"
        contentType = "application/json"
        response = bedrock_client.invoke_model(
            body=body, modelId=modelId, accept=accept, contentType=contentType
        )
        response_body = json.loads(response.get("body").read())
        response_text_claude = response_body.get("completion")
        return response_text_claude

  4. Define the following set of instructions for generating Athena SQL query. These SQL generating instructions specify which compute engine the SQL query should run on and other instructions to guide the model in generating the SQL query. These instructions are included in the prompt sent to the Bedrock model.
    athena_sql_generating_instructions = """
    Read database schema inside the <database_schema></database_schema> tags which contains a list of table names and their schemas to do the following:
        1. Create a syntactically correct AWS Athena query to answer the question.
        2. For tables with partitions, include the filters on the relevant partition columns.
        3. Include only relevant columns for the given question.
        4. Use only the column names that are listed in the schema description. 
        5. Qualify column names with the table name.
        6. Avoid joins to a table if there is no column required from the table.
        7. Convert Strings to Date type while filtering on Date type columns
        8. Return the sql query inside the <SQL></SQL> tab.
    """

  5. Define different prompt templates for demonstrating the importance of metadata in text-to-SQL generation. These templates have placeholders for SQL query generating instructions and tables metadata.
    athena_prompt1 = """
    Human:  You are an AWS Athena query expert whose output is a valid sql query. You are given the following Instructions for building the AWS Athena query.
    <Instructions>
    {instruction_dtls}
    </Instructions>
            
    Only use the following tables defined within the database_schema and table_schema XML-style tags:
    
    <database_schema>
    <table_schema>
    CREATE EXTERNAL TABLE employee_dtls (
      id int,
      name string,
      age int ,
      dept_id int,
      emp_category string ,
      location_id int ,
      joining_date date
    ) PARTITIONED BY (
      region_id string
      )
    </table_schema>
    
    <table_schema>
    CREATE EXTERNAL TABLE department_dtls (
      id int,
      name string ,
      location_id int 
    )
    </table_schema>
    </database_schema>
    
    Question: {question}
    
    Assistant: 
    """

  6. Generate the final prompt by passing the question and instruction details as arguments to the prompt template. Then, invoke the model.
    question_asked = "List of permanent employees who work in North America and  joined after Jan 1 2024"
    prompt_template_for_query_generate = PromptTemplate.from_template(athena_prompt1)
    prompt_data_for_query_generate = prompt_template_for_query_generate.format(question=question_asked,instruction_dtls=athena_sql_generating_instructions)
    llm_generated_response = interactWithClaude(prompt_data_for_query_generate)
    print(llm_generated_response.replace("<sql>", "").replace("</sql>", " ")  )
    

  7. The model generates the SQL query for the user question by using the instructions and table details provided in the prompt.
    SELECT employee_dtls.id, employee_dtls.name, employee_dtls.age, employee_dtls.dept_id, employee_dtls.emp_category
    FROM employee_dtls 
    WHERE employee_dtls.region_id = 'NA' 
      AND employee_dtls.emp_category = 'permanent'
      AND employee_dtls.joining_date > CAST('2024-01-01' AS DATE)
    

Significance of prompts and metadata in text-to-SQL generation

Understanding the details of tables and the data they contain is essential for both human SQL experts and generative AI-based text-to-SQL generation. These details, collectively known as metadata, provide crucial context for writing SQL queries. For the text-to-SQL example implemented in the previous section, we used prompts to convey specific instructions and table metadata to the model, enabling it to perform user tasks effectively. A question arises on what level of details we need to include in the table metadata. To clarify this point, we asked the model to generate SQL query for the same question three times with different prompts each time.

Prompt with no metadata

For the first test, we used a basic prompt containing just the SQL generating instructions and no table metadata. The basic prompt helped the model generate a SQL query for the given question, but it’s not helpful because the model made assumptions about table names, column names, and literal values used in the filter expressions.

Question: List of permanent employees who work in North America and joined after January 1, 2024.

Prompt definition:

Human: You are an Amazon Athena query expert whose output is a valid sql query. You are given the following Instructions for building the Amazon Athena query.
<Instructions>
{instruction_dtls}
</Instructions>

Question: {question}
Assistant:

SQL query generated:

SELECT emp.employee_id, emp.first_name, emp.last_name, emp.department_id
FROM employee emp
WHERE emp.contract = 'Permanent'
AND emp.region = 'North America'
AND CAST(emp.start_date AS  DATE) > CAST('2024-01-01' AS DATE)

Prompt with basic metadata

For solving the problem of assumed table names and column names, we added table metadata in DDL format in the second prompt. As a result, the model used the correct column names and data types and restricted the DATE casting to a literal string value. It got the SQL query syntactically correct, but one issue remains: the model assumed the literal values used in the filter expressions.

Question: List of permanent employees who work in North America and joined after January 1, 2024.

Prompt definition:

Human: You are an Amazon Athena query expert whose output is a valid sql query. You are given the following Instructions for building the Amazon Athena query.
<Instructions>
{instruction_dtls}
</Instructions>

Only use the following tables defined within the database_schema and table_schema XML-style tags:

<database_schema>
<table_schema>
CREATE EXTERNAL TABLE employee_dtls (
  id int,
  name string,
  age int ,
  dept_id int,
  emp_category string ,
  location_id int ,
  joining_date date
) PARTITIONED BY (
  region_id string
  )
</table_schema>

<table_schema>
CREATE EXTERNAL TABLE department_dtls (
  id int,
  name string ,
  location_id int 
)
</table_schema>
</database_schema>

Question: {question}
Assistant:

SQL query generated:

SELECT employee_dtls.id, employee_dtls.name, employee_dtls.age, employee_dtls.dept_id, employee_dtls.emp_category
FROM employee_dtls 
WHERE employee_dtls.region_id = 'NA' 
  AND employee_dtls.emp_category = 'permanent'
  AND employee_dtls.joining_date > CAST('2024-01-01' AS DATE)

Prompt with enriched metadata

Now we need to figure out how to provide the possible values of a column to the model. One way could be including metadata in the column for low cardinality columns. So we added column descriptions along with possible values in the third prompt. As a result, the model included the correct literal values in the filter expressions and gave accurate SQL query.

Question: List of permanent employees who work in North America and joined after Jan 1, 2024.

Prompt definition:

Human: You are an Amazon Athena query expert whose output is a valid sql query. You are given the following Instructions for building the Amazon Athena query.
<Instructions>
{instruction_dtls}
</Instructions>

Only use the following tables defined within the database_schema and table_schema XML-style tags:

<database_schema>
<table_schema>
CREATE EXTERNAL TABLE employee_dtls (
id int COMMENT 'Employee id',
name string COMMENT 'Employee name',
age int COMMENT 'Employee age',
dept_id int COMMENT 'Employee Departments ID',
emp_category string COMMENT 'Employee category. Contains TEMP For temporary, PERM for permanent, CONTR for contractors ',
location_id int COMMENT 'Location identifier of the Employee',
joining_date date  COMMENT 'Employee joining date',
CONSTRAINT pk_1 PRIMARY KEY  (id) ,
CONSTRAINT FK_1 FOREIGN KEY (dept_id) REFERENCES department_dtls(id)
) PARTITIONED BY (
region_id string COMMENT 'Region identifier. Contains AMER for Americas, EMEA for Europe, the Middle East, and Africa, APAC for Asia Pacific countries'
)
</table_schema>

<table_schema>
CREATE EXTERNAL TABLE department_dtls (
id int COMMENT 'Department id',
name string COMMENT 'Department name',
location_id int COMMENT 'Location identifier of the Department'
)
</table_schema>
</database_schema>

Question: {question}
Assistant:

SQL query generated:

SELECT employee_dtls.id, 
employee_dtls.name
FROM employee_dtls 
WHERE employee_dtls.emp_category = 'PERM'
  AND employee_dtls.region_id = 'AMER' 
  AND employee_dtls.joining_date > CAST('2024-01-01' AS DATE)

Prompt with foreign key constraints in the Metadata

Note that when we added the finer details to the metadata of the third prompt, we included foreign key constraints as well. This is done to help the model generate SQL for advanced queries that require joins. Adding foreign key constraints to the metadata helps the model identify the correct columns to be used in the join conditions. To demonstrate this point, we asked the model to write SQL for showing department details along with the employee details. For showing the department details, we need the department_dtls table. The model added department_dtls table to the SQL query and identified the right columns for the join condition based on foreign key constraint details included in the metadata.

Question: List of permanent employees who work in North America and joined after Jan 1, 2024.

SQL query generated:

SELECT
  employee_dtls.name AS employee_name,
  employee_dtls.age,
  department_dtls.name AS department_name
FROM employee_dtls 
JOIN department_dtls 
  ON employee_dtls.dept_id = department_dtls.id
WHERE 
  employee_dtls.emp_category = 'PERM'
  AND employee_dtls.region_id = 'AMER' 
  AND employee_dtls.joining_date > CAST('2024-01-01' AS DATE)

Additional observations

Though the model included relevant employee attributes in the SELECT clause, the exact list of attributes it included varied each time. Even for the same prompt definition, the model provided a varying list of attributes. The model randomly used one of the two approaches for casting the string literal value to date type. The first approach uses CAST('2024-01-01' AS DATE) and the second approach uses DATE '2024-01-01'.

Challenges in maintaining the metadata

Now that you understand how maintaining detailed metadata along with foreign key constraints helps the model in generating accurate SQL queries, let’s discuss how you can gather the necessary details of table metadata. The data lake and database catalogs support gathering and querying metadata, including table and column descriptions. However, making sure that these descriptions are accurate and up-to-date poses several practical challenges, such as:

  1. Creating database objects with useful descriptions requires collaboration between technical and business teams to write detailed and meaningful descriptions. As tables undergo schema changes, updating metadata for each change can be time-consuming and requires effort.
  2. Maintaining lists of possible values for the columns requires continuous updates.
  3. Adding data transformation details to metadata can be challenging because of the dispersed nature of this information across data processing pipelines, making it difficult to extract and incorporate into table-level metadata.
  4. Adding data lineage details to metadata faces challenges because of the fragmented nature of this information across data processing pipelines, making extraction and integration into table-level metadata complex.

Specific to the AWS Glue Data Catalog, more challenges arise, such as the following:

  1. Creating AWS Glue tables through crawlers doesn’t automatically generate table or column descriptions, requiring manual updates to table definitions from the AWS Glue console.
  2. Unlike traditional relational databases, AWS Glue tables don’t explicitly define or enforce primary keys or foreign keys. AWS Glue tables operate on a schema-on-read basis, where the schema is inferred from the data when querying. Therefore, there’s no direct support for specifying primary keys, foreign keys, or column descriptions in AWS Glue tables like there is in traditional databases.

Enriching the metadata

Listed here some ways that you can overcome the previously mentioned challenges in maintaining the metadata.

  • Enhance the table and column descriptions: Documenting table and column descriptions requires a good understanding of the business process, terminology, acronyms, and domain knowledge. The following are the different methods you can use to get these table and column descriptions into the AWS Glue Data Catalog.
    • Use generative AI to generate better documentation: Enterprises often document their business processes, terminologies, and acronyms and make them accessible through company-specific portals. By following naming conventions for tables and columns, consistency in object names can be achieved, making them more relatable to business terminology and acronyms. Using generative AI models on Amazon Bedrock, you can enhance table and column descriptions by feeding the models with business terminology and acronym definitions along with the database schema objects. This approach reduces the time and effort required to generate detailed descriptions. The recently released metadata feature in Amazon DataZoneAI recommendations for descriptions in Amazon DataZone, is along these principles. After you generate the descriptions, you can update the column descriptions using any of the following options.
      • From the AWS Glue catalog UI
      • Using the AWS Glue SDK similar to Step 3a : Create employee_dtls Glue table for querying from Athena in the 0_create_tables_with_metadata.ipynb Jupyter Notebook
      • Add the COMMENTS in the DDL script of the table.
        CREATE EXTERNAL TABLE <table_name> 
        ( column1 string COMMENT '<column_description>' ) 
        PARTITIONED BY ( column2 string COMMENT '<column_description>' )

  • For AWS Glue tables cataloged from other databases:
    • You can add table and column descriptions from the source databases using the crawler in AWS Glue.
    • You can configure the EnableAdditionalMetadata Crawler option to crawl metadata such as comments and raw data types from the underlying data sources. The AWS Glue crawler will then populate the additional metadata in AWS Glue Data Catalog. This provides a way to document your tables and columns directly from the metadata defined in the underlying database.
  • Enhance the metadata with data profiling: As demonstrated in the previous section, providing the list of values in the employee category column and their meaning helped in generating the SQL query with more accurate filter conditions. We can provide such a list of values or data characteristics in the column descriptions with the help of data profiling. Data profiling is the process of analyzing and understanding the data and its characteristics as distinct values. By using data profiling insights, we can enhance column descriptions.
  • Enhance the metadata with details of partitions and a range of partition values: As demonstrated in the previous section, providing the list of partition values and their meaning in the partition column description helped in generating the SQL with more accurate filter conditions. For list partitions, we can add the list of the partition values and their meanings to the partition column description. For range partitions, we can add more context on the grain of the values like daily, monthly, and a specific range of values to the column description.

Enriching the prompt

You can enhance the prompts with query optimization rules like partition pruning. In the athena_sql_generating_instructions, defined as part of the 1_text-to-sql-for-athena.ipynb Jupyter Notebook, we added an instruction “For tables with partitions, include the filters on the relevant partition columns”. This instruction guides the model on how to handle partition pruning. In the example, we observed that the model added the relevant partition filter on the region_id partition column. These partition filters will speed up the SQL query execution and is one of the top query optimization techniques. You can add more such query optimization rules to the instructions. You can enhance these instructions with relevant SQL examples.

Cleanup

To clean up the resources, start by cleaning up the S3 bucket that was created by the CloudFormation stack. Then delete the CloudFormation stack using the following steps.

  • In the AWS Management Console, choose the name of the currently displayed Region and change it to US West (Oregon).
  • Navigate to AWS CloudFormation.
  • Choose Stacks.
  • Select texttosqlmetadata
  • Choose Delete.

Conclusion

The example presented in the post highlights the importance of enriched metadata in generating accurate SQL query using the text-to-SQL capabilities of  Anthropic’s Claude model on Amazon Bedrock and discusses multiple ways to enrich the metadata. Amazon Bedrock is at the center of this text-to-SQL generation. Amazon Bedrock can help you build various generative AI applications including the metadata generation use case mentioned in the previous section. To get started with Amazon Bedrock, we recommend following the quick start in the GitHub repo and familiarizing yourself with building generative AI applications. After getting familiar with generative AI applications, see the GitHub Text-to-SQL workshop to learn more text-to-SQL techniques. See Build a robust Text-to-SQL solution and Best practices for Text-to-SQL for the recommended architecture and best practices to follow while implementing text-to-SQL generation.


About the author

Naidu Rongali is a Big Data and ML engineer at Amazon. He designs and develops data processing solutions for data intensive analytical systems supporting Amazon retail business. He has been working on integrating generative AI capabilities into the data lake and data warehouse systems using Amazon Bedrock AI models. Naidu has a PG diploma in Applied Statistics from the Indian Statistical Institute, Calcutta and BTech in Electrical and Electronics from NIT, Warangal. Outside of his work, Naidu practices yoga and goes trekking often.

Unleash deeper insights with Amazon Redshift data sharing for data lake tables

Post Syndicated from Mohammed Alkateb original https://aws.amazon.com/blogs/big-data/unleash-deeper-insights-with-amazon-redshift-data-sharing-for-data-lake-tables/

Amazon Redshift has established itself as a highly scalable, fully managed cloud data warehouse trusted by tens of thousands of customers for its superior price-performance and advanced data analytics capabilities. Driven primarily by customer feedback, the product roadmap for Amazon Redshift is designed to make sure the service continuously evolves to meet the ever-changing needs of its users.

Over the years, this customer-centric approach has led to the introduction of groundbreaking features such as zero-ETL, data sharing, streaming ingestion, data lake integration, Amazon Redshift ML, Amazon Q generative SQL, and transactional data lake capabilities. The latest innovation in Amazon Redshift data sharing capabilities further enhances the service’s flexibility and collaboration potential.

Amazon Redshift now enables the secure sharing of data lake tables—also known as external tables or Amazon Redshift Spectrum tables—that are managed in the AWS Glue Data Catalog, as well as Redshift views referencing those data lake tables. This breakthrough empowers data analytics to span the full breadth of shareable data, allowing you to seamlessly share local tables and data lake tables across warehouses, accounts, and AWS Regions—without the overhead of physical data movement or recreating security policies for data lake tables and Redshift views on each warehouse.

By using granular access controls, data sharing in Amazon Redshift helps data owners maintain tight governance over who can access the shared information. In this post, we explore powerful use cases that demonstrate how you can enhance cross-team and cross-organizational collaboration, reduce overhead, and unlock new insights by using this innovative data sharing functionality.

Overview of Amazon Redshift data sharing

Amazon Redshift data sharing allows you to securely share your data with other Redshift warehouses, without having to copy or move the data.

Data shared between warehouses doesn’t require the data to be physically copied or moved—instead, data remains in the original Redshift warehouse, and access is granted to other authorized users as part of a one-time setup. Data sharing provides granular access control, allowing you to control which specific tables or views are shared, and which users or services can access the shared data.

Since consumers access the shared data in-place, they always access the latest state of the shared data. Data sharing even allows for the automatic sharing of new tables created after that datashare was established.

You can share data across different Redshift warehouses within or across AWS accounts, and you can also do cross-region data sharing. This allows you to share data with partners, subsidiaries, or other parts of your organization, and enables the powerful workload isolation use case, as shown in the following diagram. With the seamless integration of Amazon Redshift with AWS Data Exchange, data can also be monetized and shared publicly, and public datasets such as census data can be added to a Redshift warehouse with just a few steps.

Figure 1: Amazon Redshift data sharing between producer and consumer warehouses

Figure 1: Amazon Redshift data sharing between producer and consumer warehouses

The data sharing capabilities in Amazon Redshift also enable the implementation of a data mesh architecture, as shown in the following diagram. This helps democratize data within the organization by reducing barriers to accessing and using data across different business units and teams. For datasets with multiple authors, Amazon Redshift data sharing supports both read and write use cases (write in preview at the time of writing). This enables the creation of 360-degree datasets, such as a customer dataset that receives contributions from multiple Redshift warehouses across different business units in the organization.

Figure 2: Data mesh architecture using Amazon Redshift data sharing

Figure 2: Data mesh architecture using Amazon Redshift data sharing

Overview of Redshift Spectrum and data lake tables

In the modern data organization, the data lake has emerged as a centralized repository—a single source of truth where all data within the organization ultimately resides at some point in its lifecycle. Redshift Spectrum enables seamless integration between the Redshift data warehouse and customers’ data lakes, as shown in the following diagram. With Redshift Spectrum, you can run SQL queries directly against data stored in Amazon Simple Storage Service (Amazon S3), without the need to first load that data into a Redshift warehouse. This allows you to maintain a comprehensive view of your data while optimizing for cost-efficiency.

Figure 3: Amazon Redshift bridges the data warehouse and data lake by enabling querying of data lake tables in-place

Figure 3: Amazon Redshift bridges the data warehouse and data lake by enabling querying of data lake tables in-place

Redshift Spectrum supports a variety of open file formats, including Parquet, ORC, JSON, and CSV, as well as open table formats such as Apache Iceberg, all stored in Amazon S3. It runs these queries using a dedicated fleet of high-performance servers with low-latency connections to the S3 data lake. Data lake tables can be added to a Redshift warehouse either automatically through the Data Catalog, in the Amazon Redshift Query Editor, or manually using SQL commands.

From a user experience standpoint, there is little difference between querying a local Redshift table vs. a data lake table. SQL queries can be reused verbatim to perform the same aggregations and transformations on data residing in the data lake, as shown in the following examples. Additionally, by using columnar file formats like Parquet and pushing down query predicates, you can achieve further performance enhancements.

The following SQL is for a sample query against local Redshift tables:

SELECT top 10 mylocal_schema.sales.eventid, sum(mylocal_schema.sales.pricepaid) FROM mylocal_schema.sales, event
WHERE mylocal_schema.sales.eventid = event.eventid
AND mylocal_schema.sales.pricepaid > 30
GROUP BY mylocal_schema.sales.eventid
ORDER BY 2 DESC;

The following SQL is for the same query, but against data lake tables:

SELECT top 10 myspectrum_schema.sales.eventid, sum(myspectrum_schema.sales.pricepaid) FROM myspectrum_schema.sales, event
WHERE myspectrum_schema.sales.eventid = event.eventid
AND myspectrum_schema.sales.pricepaid > 30
GROUP BY myspectrum_schema.sales.eventid
ORDER BY 2 desc;

To maintain robust data governance, Redshift Spectrum integrates with AWS Lake Formation, enabling the consistent application of security policies and access controls across both the Redshift data warehouse and S3 data lake. When Lake Formation is used, Redshift producer warehouses first share their data with Lake Formation rather than directly with other Redshift consumer warehouses, and the data lake administrator grants fine-grained permissions for Redshift consumer warehouses to access the shared data. For more information, see Centrally manage access and permissions for Amazon Redshift data sharing with AWS Lake Formation.

In the past, however, sharing data lake tables across Redshift warehouses presented challenges. It wasn’t possible to do so without having to mount the data lake tables on each individual Redshift warehouse and then recreate the related security policies.

This barrier has now been addressed with the introduction of data sharing support for data lake tables. You can now share data lake tables just like any other table, using the built-in data sharing capabilities of Amazon Redshift. By combining the power of Redshift Spectrum data lake integration with the flexibility of Amazon Redshift data sharing, organizations can unlock new levels of cross-team collaboration and insights, while maintaining robust data governance and security controls.

For more information about Redshift Spectrum, see Getting started with Amazon Redshift Spectrum.

Solution overview

In this post, we describe how to add data lake tables or views to a Redshift datashare, covering two key use cases:

  • Adding a late-binding view or materialized view to a producer datashare that references a data lake table
  • Adding a data lake table directly to a producer datashare

The first use case provides greater flexibility and convenience. Consumers can query the shared view without having to configure fine-grained permissions. The configuration, such as defining permissions on data stored in Amazon S3 with Lake Formation, is already handled on the producer side. You only need to add the view to the producer datashare one time, making it a convenient option for both the producer and the consumer.

An additional benefit of this approach is that you can add views to a datashare that join data lake tables with local Redshift tables. When these views are shared, you can relegate the trusted business logic to just the producer side.

Alternatively, you can add data lake tables directly to a datashare. In this case, consumers can query the data lake tables directly or join them with their own local tables, allowing them to add their own conditional logic as needed.

Add a view that references a data lake table to a Redshift datashare

When you create data lake tables that you intend to add to a datashare, the recommended and most common way to do this is to add a view to the datashare that references a data lake table or tables. There are three high-level steps involved:

  1. Add the Redshift view’s schema (the local schema) to the Redshift datashare.
  2. Add the Redshift view (the local view) to the Redshift datashare.
  3. Add the Redshift external schemas (for the tables referenced by the Redshift view) to the Redshift datashare.

The following diagram illustrates the full workflow.

Figure 4: Sharing data lake tables via Amazon Redshift views

Figure 4: Sharing data lake tables via Amazon Redshift views

The workflow consists of the following steps:

  1. Create a data lake table on the datashare producer. For more information on creating Redshift Spectrum objects, see External schemas for Amazon Redshift Spectrum. Data lake tables to be shared can include Lake Formation registered tables and Data Catalog tables, and if using the Redshift Query Editor, these tables are automatically mounted.
  2. Create a view on the producer that references the data lake table that you created.
  3. Create a datashare, if one doesn’t already exist, and add objects to your datashare, including the view you created that references the data lake table. For more information, see Creating datashares and adding objects (preview).
  4. Add the external schema of the base Redshift table to the datashare (this is true of both local base tables and data lake tables). You don’t have to add a data lake table itself to the datashare.
  5. On the consumer, the administrator makes the view available to consumer database users.
  6. Database consumer users can write queries to retrieve data from the shared view and join it with other tables and views on the consumer.

After these steps are complete, database consumer users with access to the datashare views can reference them in their SQL queries. The following SQL queries are examples for achieving the preceding steps.

Create a data lake table on the producer warehouse:

CREATE EXTERNAL TABLE myspectrum_db.myspectrum_schema.test (c1 INT)
stored AS parquet
location 's3://amzn-s3-demo-bucket/myfolder/';

Create a view on the producer warehouse:

CREATE VIEW mylocal_db.mylocal_schema.myspectrumview AS SELECT c1 FROM myspectrum_db.myspectrum_schema.v_test
WITH no schema binding;

Add a view to the datashare on the producer warehouse:

ALTER datashare mydatashare ADD SCHEMA mylocal_db.mylocal_schema;
ALTER datashare mydatashare ADD VIEW myspectrumview;
ALTER datashare mydatashare ADD SCHEMA myspectrum_db.myspectrum_schema;

Create a consumer datashare and grant permissions for the view in the consumer warehouse:

CREATE database myspectrum_db FROM datashare myspectrumproducer OF account '123456789012' namespace 'p1234567-8765-4321-p10987654321';
GRANT usage ON database myspectrum_db TO usernames;

Add a data lake table directly to a Redshift datashare

Adding a data lake table to a datashare is similar to adding a view. This process works well for a case where the consumers want the raw data from the data lake table and they want to write queries and join it to tables in their own data warehouse. There are two high-level steps involved:

  1. Add the Redshift external schemas (of the data lake tables to be shared) to the Redshift datashare.
  2. Add the data lake table (the Redshift external table) to the Redshift datashare.

The following diagram illustrates the full workflow.

Figure 5: Sharing data lake tables directly in an Amazon Redshift datashare

Figure 5: Sharing data lake tables directly in an Amazon Redshift datashare

The workflow consists of the following steps:

  1. Create a data lake table on the datashare producer.
  2. Add objects to your datashare, including the data lake table you created. In this case, you don’t have any abstraction over the table.
  3. On the consumer, the administrator makes the table available.
  4. Database consumer users can write queries to retrieve data from the shared table and join it with other tables and views on the consumer.

The following SQL queries are examples for achieving the preceding producer steps.

Create a data lake table on the producer warehouse:

CREATE EXTERNAL TABLE myspectrum_db.myspectrum_schema.test (c1 INT)
stored AS parquet
location 's3://amzn-s3-demo-bucket/myfolder/';

Add a data lake schema and table directly to the datashare on the producer warehouse:

ALTER datashare mydatashare ADD SCHEMA myspectrum_db.myspectrum_schema;
ALTER datashare mydatashare ADD TABLE myspectrum_db.myspectrum_schema.test;

Create a consumer datashare and grant permissions for the view in the consumer warehouse:

CREATE database myspectrum_db FROM datashare myspectrumproducer OF account '123456789012' namespace 'p1234567-8765-4321-p10987654321';
GRANT usage ON database myspectrum_db TO usernames;

Security considerations for sharing data lake tables and views

Data lake tables are stored outside of Amazon Redshift, in the data lake, and may not be owned by the Redshift warehouse, but are still referenced within Amazon Redshift. This setup requires special security considerations. Data lake tables operate under the security and governance of both Amazon Redshift and the data lake. For Lake Formation registered tables specifically, the Amazon S3 resources are secured by Lake Formation and made available to consumers using the provided credentials.

The data owner of the data in the data lake tables may want to impose restrictions on which external objects can be added to a datashare. To give data owners more control over whether warehouse users can share data lake tables, you can use session tags in AWS Identity and Access Management (IAM). These tags provide additional context about the user running the queries. For more details on tagging resources, refer to Tags for AWS Identity and Access Management resources.

Audit considerations for sharing data lake tables and views

When sharing data lake objects through a datashare, there are special logging considerations to keep in mind:

  • Access controls – You can also use CloudTrail log data in conjunction with IAM policies to control access to shared tables, including both Redshift datashare producers and consumers. The CloudTrail logs record details about who accesses shared tables. The identifiers in the log data are available in the ExternalId field under the AssumeRole CloudTrail logs. The data owner can configure additional limitations on data access in an IAM policy by means of actions. For more information about defining data access through policies, see Access to AWS accounts owned by third parties.
  • Centralized access – Amazon S3 resources such as data lake tables can be registered and centrally managed with Lake Formation. After they’re registered with Lake Formation, Amazon S3 resources are secured and governed by the associated Lake Formation policies and made available using the credentials provided by Lake Formation.

Billing considerations for sharing data lake tables and views

The billing model for Redshift Spectrum differs for Amazon Redshift provisioned and serverless warehouses. For provisioned warehouses, Redshift Spectrum queries (queries involving data lake tables) are billed based on the amount of data scanned during query execution. For serverless warehouses, data lake queries are billed the same as non-data-lake queries. Storage for data lake tables is always billed to the AWS account associated with the Amazon S3 data.

In the case of datashares involving data lake tables, costs are attributed for storing and scanning data lake objects in a datashare as follows:

  • When a consumer queries shared objects from a data lake, the cost of scanning is billed to the consumer:
    • When the consumer is a provisioned warehouse, Amazon Redshift uses Redshift Spectrum to scan the Amazon S3 data. Therefore, the Redshift Spectrum cost is billed to the consumer account.
    • When the consumer is an Amazon Redshift Serverless workgroup, there is no separate charge for data lake queries.
  • Amazon S3 costs for storage and operations, such as listing buckets, is billed to the account that owns each S3 bucket.

For detailed information on Redshift Spectrum billing, refer to Amazon Redshift pricing and Billing for storage.

Conclusion

In this post, we explored how Amazon Redshift enhanced data sharing capabilities, including support for sharing data lake tables and Redshift views that reference those data lake tables, empower organizations to unlock the full potential of their data by bringing the full breadth of data assets in scope for advanced analytics. Organizations are now able to seamlessly share local tables and data lake tables across warehouses, accounts, and Regions.

We outlined the steps to securely share data lake tables and views that reference those data lake tables across Redshift warehouses, even those in separate AWS accounts or Regions. Additionally, we covered some considerations and best practices to keep in mind when using this innovative feature.

Sharing data lake tables and views through Amazon Redshift data sharing champions the modern, data-driven organization’s goal to democratize data access in a secure, scalable, and efficient manner. By eliminating the need for physical data movement or duplication, this capability reduces overhead and enables seamless cross-team and cross-organizational collaboration. Unleashing the full potential of your data analytics to span the full breadth of your local tables and data lake tables is just a few steps away.

For more information on Amazon Redshift data sharing and how it can benefit your organization, refer to the following resources:

Please also reach out to your AWS technical account manager or AWS account Solutions Architect. They will be happy to provide additional guidance and support.


About the Authors

Mohammed Alkateb is an Engineering Manager at Amazon Redshift. Prior to joining Amazon, Mohammed had 12 years of industry experience in query optimization and database internals as an individual contributor and engineering manager. Mohammed has 18 US patents, and he has publications in research and industrial tracks of premier database conferences including EDBT, ICDE, SIGMOD and VLDB. Mohammed holds a PhD in Computer Science from The University of Vermont, and MSc and BSc degrees in Information Systems from Cairo University.

Ramchandra Anil Kulkarni is a software development engineer who has been with Amazon Redshift for over 4 years. He is driven to develop database innovations that serve AWS customers globally. Kulkarni’s long-standing tenure and dedication to the Amazon Redshift service demonstrate his deep expertise and commitment to delivering cutting-edge database solutions that empower AWS customers worldwide.

Mark Lyons is a Principal Product Manager on the Amazon Redshift team. He works on the intersection of data lakes and data warehouses. Prior to joining AWS, Mark held product leadership roles with Dremio and Vertica. He is passionate about data analytics and empowering customers to change the world with their data.

Asser Moustafa is a Principal Worldwide Specialist Solutions Architect at AWS, based in Dallas, Texas. He partners with customers worldwide, advising them on all aspects of their data architectures, migrations, and strategic data visions to help organizations adopt cloud-based solutions, maximize the value of their data assets, modernize legacy infrastructures, and implement cutting-edge capabilities like machine learning and advanced analytics. Prior to joining AWS, Asser held various data and analytics leadership roles, completing an MBA from New York University and an MS in Computer Science from Columbia University in New York. He is passionate about empowering organizations to become truly data-driven and unlock the transformative potential of their data.

Perform data parity at scale for data modernization programs using AWS Glue Data Quality

Post Syndicated from Himanshu Sahni original https://aws.amazon.com/blogs/big-data/perform-data-parity-at-scale-for-data-modernization-programs-using-aws-glue-data-quality/

Today, customers are embarking on data modernization programs by migrating on-premises data warehouses and data lakes to the AWS Cloud to take advantage of the scale and advanced analytical capabilities of the cloud. Customers are migrating their on-premises data warehouse solutions built on databases like Netezza, PostgreSQL, Greenplum, and Teradata to AWS based modern data platforms using services like Amazon Simple Storage Service (Amazon S3) and Amazon Redshift. AWS based modern data platforms help you break down data silos and enable analytics and machine learning (ML) use cases at scale.

During migration, you might want to establish data parity checks between on-premises databases and AWS data platform services. Data parity is a process to validate that data was migrated successfully from source to target without any errors or failures. A successful data parity check means that data in the target platform has the equivalent content, values, and completeness as that of the source platform.

Data parity can help build confidence and trust with business users on the quality of migrated data. Additionally, it can help you identify errors in the new cloud-based extract, transform, and load (ETL) process.

Some customers build custom in-house data parity frameworks to validate data during migration. Others use open source data quality products for data parity use cases. These options involve a lot of custom code, configurations, and installation, and have scalability challenges. This takes away important person hours from the actual migration effort into building and maintaining a data parity framework.

In this post, we show you how to use AWS Glue Data Quality, a feature of AWS Glue, to establish data parity during data modernization and migration programs with minimal configuration and infrastructure setup. AWS Glue Data Quality enables you to automatically measure and monitor the quality of your data in data repositories and AWS Glue ETL pipelines.

Overview of solution

In large data modernization projects of migrating from an on-premises database to an Amazon S3 based data lake, it’s common to have the following requirements for data parity:

  • Compare one-time historical data from the source on-premises database to the target S3 data lake.
  • Compare ongoing data that is replicated from the source on-premises database to the target S3 data lake.
  • Compare the output of the cloud-based new ETL process with the existing on-premises ETL process. You can plan a period of parallel runs, where the legacy and new systems run in parallel, and the data is compared daily.
  • Use functional queries to compare high-level aggregated business metrics between the source on-premises database and the target data lake.

In this post, we use an example of PostgreSQL migration from an on-premises database to an S3 data lake using AWS Glue Data Quality.

The following diagram illustrates this use case’s historical data migration architecture.

The architecture shows a common pattern for on-premises databases (like PostgreSQL) to Amazon S3 based data lake migration. The workflow includes the following steps:

  1. Schemas and tables are stored in an on-premises database (PostgreSQL), and you want to migrate to Amazon S3 for storage and AWS Glue for compute.
  2. Use AWS Database Migration Service (AWS DMS) to migrate historical data from the source database to an S3 staging bucket.
  3. Use AWS Glue ETL to curate data from the S3 staging bucket to an S3 curated bucket. In the curated bucket, AWS Glue tables are created using AWS Glue crawlers or an AWS Glue ETL job.
  4. Use an AWS Glue connection to connect AWS Glue with the on-premises PostgreSQL database.
  5. Use AWS Glue Data Quality to compare historical data from the source database to the target S3 bucket and write results to a separate S3 bucket.

The following diagram illustrates the incremental data migration architecture.

After historical data is migrated and validated, the workflow proceeds to the following steps:

  1. Ingest incremental data from the source systems to the S3 staging bucket. This is done using an ETL ingestion tool like AWS Glue.
  2. Curate incremental data from the S3 staging bucket to the S3 curated bucket using AWS Glue ETL.
  3. Compare the incremental data using AWS Glue Data Quality.

In the next sections, we demonstrate how to use AWS Glue Data Quality to establish data parity between source (PostgreSQL) and target (Amazon S3). We cover the following scenarios:

  • Establish data parity for historical data migration – Historical data migration is defined as a one-time bulk data migration of historical data from legacy on-premises databases to the AWS Cloud. The data parity process maintains the validity of migrated historical data.
  • Establish data parity for incremental data – After the historical data migration, incremental data is loaded to Amazon S3 using the new cloud-based ETL process. The incremental data is compared between the legacy on-premises database and the AWS Cloud.
  • Establish data parity using functional queries – We perform business- and functional-level checks using SQL queries on migrated data.

Prerequisites

You need to set up the following prerequisite resources:

Establish data parity for historical data migration

For historical data migration and parity, we’re assuming that setting up a PostgreSQL database, migrating data to Amazon S3 using AWS DMS, and data curation have been completed as a prerequisite to perform data parity using AWS Glue Data Quality. For this use case, we use an on-premises PostgreSQL database with historical data loaded on Amazon S3 and AWS Glue. Our objective is to compare historical data between the on-premises database and the AWS Cloud.

We use the following tables in the on-premises PostgreSQL database. These have been migrated to the AWS Cloud using AWS DMS. As part of data curation, the following three additional columns have been added to the test_schema.sample_data table in the curated layer: id, etl_create_ts, and etl_user_id.

  1. Create sample_data with the following code:
create table test_schema.sample_data
(
    job              text,
    company          text,
    ssn              text,
    residence        text,
    current_location text,
    website          text,
    username         text,
    name             text,
    gender_id        integer,
    blood_group_id   integer,
    address          text,
    mail             text,
    birthdate        date,
    insert_ts        timestamp with time zone,
    update_ts        timestamp with time zone
);
  1. Create gender with the following code (contains gender details for lookup):
create table test_schema.gender
(
    gender_id integer,
    value     varchar(1)
);
  1. Create blood_group with the following code (contains blood group information for lookup):
create table test_schema.blood_group
(
    blood_group_id integer,
    value          varchar(10)
);

We’re assuming that the preceding tables have been migrated to the S3 staging bucket using AWS DMS and curated using AWS Glue. For detailed instructions on how to set up AWS DMS to replicate data, refer to the appendix at the end of this post.

In the following sections, we showcase how to configure an AWS Glue Data Quality job for comparison.

Create an AWS Glue connection

AWS Glue Data Quality uses an AWS Glue connection to connect to the source PostgreSQL database. Complete the following steps to create the connection:

  1. On AWS Glue console, under Data Catalog in the navigation pane, choose Connections.
  2. Choose Create connection.
  3. Set Connector type as JDBC.
  4. Add connection details like the connection URL, credentials, and networking details.

Refer to AWS Glue connection properties for additional details.

AWS Glue 4.0 uses PostgreSQL JDBC driver 42.3.6. If your PostgreSQL database requires a different version JDBC driver, download the JDBC driver corresponding to your PostgreSQL version.

Create an AWS Glue data parity job for historical data comparison

As part of the preceding steps, you used AWS DMS to pull historical data from PostgreSQL to the S3 staging bucket. You then used an AWS Glue notebook to curate data from the staging bucket to the curated bucket and created AWS Glue tables. As part of this step, you use AWS Glue Data Quality to compare data between PostgreSQL and Amazon S3 to confirm the data is valid. Complete the following steps to create an AWS Glue job using the AWS Glue visual editor to compare data between PostgreSQL and Amazon S3:

  1. Set the source as the PostgreSQL table sample_data.
  2. Set the target as the AWS Glue table sample_data.

  1. In the curated layer, we added a few additional columns: id, etl_create_ts, and etl_user_id. Because these columns are newly created, we use a transformation to drop these columns for comparison.
  2. Additionally, the birth_date column is a timestamp in AWS Glue, so we change it to date format prior to comparison.

  1. Choose Evaluate Data Quality in Transformations.
  2. Specify the AWS Glue Data Quality rule as DatasetMatch, which checks if the data in the primary dataset matches the data in a reference dataset.
  3. Provide the unique key (primary key) information for source and target. In this example, the primary key is a combination of columns job and username.

  1. For Data quality transform output, specify your data to output:
    1. Original data – This output includes all rows and columns in original data. In addition, you can select Add new columns to indicate data quality errors. This option adds metadata columns for each row that can be used to identify valid and invalid rows and the rules that failed validation. You can further customize row-level output to select only valid rows or convert the table format based on the use case.
    2. Data quality results – This is a summary output grouped by a rule. For our data parity example, this output will have one row with a summary of the match percentage.

  1. Configure the Amazon S3 targets for ruleOutcomes and rowLevelOutcomes to write AWS Glue Data Quality output in the Amazon S3 location in Parquet format.

  1. Save and run the AWS Glue job.
  2. When the AWS Glue job is complete, you can run AWS Glue crawler to automatically create rulesummary and row_level_output tables and view the output in Amazon Athena.

The following is an example of rule-level output. The screenshot shows the DatasetMatch value as 1.0, which implies all rows between the source PostgreSQL database and target data lake matched.

The following is an example of row-level output. The screenshot shows all source rows along with additional columns that confirm if a row has passed or failed validation.

Let’s update a few records in PostgreSQL to simulate a data issue during the data migration process:

update test_schema.sample_data set residence = null where blood_group_id = 8
Records updated 1,272

You can rerun the AWS Glue job and observe the output in Athena. In the following screenshot, the new match percentage is 87.27%. With this example, you were able to capture the simulated data issue with AWS Glue Data Quality successfully.

If you run the following query, the output will match the record count with the preceding screenshot:

SELECT count(*) FROM "gluedqblog"."rowleveloutput" where dataqualityevaluationresult='Failed'

Establish data parity for incremental data

After the initial historical migration, the next step is to implement a process to validate incremental data between the legacy on-premises database and the AWS Cloud. For incremental data validation, data output from the existing ETL process and the new cloud-based ETL process is compared daily. You can add a filter to the preceding AWS Glue data parity job to select data that has been modified for a given day using a timestamp column.

Establish data parity using functional queries

Functional queries are SQL statements that business analysts can run in the legacy system (for this post, an on-premises database) and the new AWS Cloud-based data lake to compare data metrics and output. To make sure the consumer applications work correctly with migrated data, it’s imperative to validate data functionally. The previous examples are primarily technical validation to make sure there is no data loss in the target data lake after data ingestion from both historical migration and change data capture (CDC) context. In a typical data warehouse migration use case, the historical migration pipeline often pulls data from a data warehouse, and the incremental or CDC pipeline integrates the actual source systems, which feed the data warehouse.

Functional data parity is the third step in the overall data validation framework, where you have the flexibility to continue similar business metrics validation driven by an aggregated SQL query. You can construct your own business metrics validation query, preferably working with subject matter experts (SMEs) from the business side. We have noticed that agility and perfection matter for a successful data warehouse migration, therefore reusing the time-tested and business SME-approved aggregated SQL query from the legacy data warehouse system with minimal changes can fast-track the implementation as well as maintain business confidence. In this section, we demonstrate how to implement a sample functional parity for a given dataset.

In this example, we use a set of source PostgreSQL tables and target S3 data lake tables for comparison. We use an AWS Glue crawler to create Data Catalog tables for the source tables, as described in the first example.

The sample functional validation compares the distribution count of gender and blood group for each company. This could be any functional query that joins facts and dimension tables and performs aggregations.

You can use a SQL transformation to generate an aggregated dataset for both the source and target query. In this example, the source query uses multiple tables. Apply SQL functions on the columns and required filter criteria.

The following screenshot illustrates the Source Functional Query transform.

The following screenshot illustrates the Target Functional Query transform.

The following screenshot illustrates the Evaluate Data Quality transform. You can apply the DatasetMatch rule to achieve a 100% match.

After the job runs, you can find the job run status on AWS Glue console.

The Data quality tab displays the data quality results.

AWS Glue Data Quality provides row- and rule-level outputs, as described in the previous examples.

Check the rule-level output in the Athena table. The outcome of the DatasetMatch rule shows a 100% match between the PostgreSQL source dataset and target data lake.

Check the row-level output in the Athena table. The following screenshot displays the row-level output with data quality evaluation results and rule status.

Let’s change the company value for Spencer LLC to Spencer LLC – New to simulate the impact on the data quality rule and overall results. This creates a gap in the count of records for the given company name while comparing source and target.

By rerunning the job and checking the AWS Glue Data Quality results, you will discover that the data quality rule has failed. This is due to the difference in company name between the source and target dataset because the data quality rule evaluation is tracking a 100% match. You can reduce the match percentage in the data quality expression based on the required threshold.

Next, revert the changes made for the data quality rule failure simulation.


If you rerun the job and validate the AWS Glue Data Quality results, you can find the data quality score is back to 100%.


Clean up

If you no longer want to keep the resources you created as part of this post in your AWS account, complete the following steps:

  1. Delete the AWS Glue notebook and visual ETL jobs.
  2. Remove all data and delete the staging and curated S3 buckets.
  3. Delete the AWS Glue connection to the PostgreSQL database.
  4. Delete the AWS DMS replication task and instance.
  5. Delete the Data Catalog.

Conclusion

In this post, we discussed how you can use AWS Glue Data Quality to build a scalable data parity pipeline for data modernization programs. AWS Glue Data Quality enables you to maintain the quality of your data by automating many of the manual tasks involved in data quality monitoring and management. This helps prevent bad data from entering your data lakes and data warehouses. The examples in this post provided an overview on how to set up historical, incremental, and functional data parity jobs using AWS Glue Data Quality.

To learn more about AWS Glue Data Quality, refer to Evaluating data quality with AWS Glue Studio and AWS Glue Data Quality. To dive into the AWS Glue Data Quality APIs, see Data Quality API.

Appendix

In this section, we demonstrate how to set up AWS DMS and replicate data. You can use AWS DMS to copy one-time historical data from the PostgreSQL database to the S3 staging bucket. Complete the following steps:

  1. On the AWS DMS console, under Migrate data in the navigation pane, choose Replication instances.
  2. Choose Create a replication instance.
  3. Choose a VPC that has connectivity to the PostgreSQL instance.

After the instance is created, it should appear with the status as Available on the AWS DMS console.

  1. 4. Based on our solution architecture, you now create an S3 staging bucket for AWS DMS to write replicated output. For this post, the staging bucket name is gluedq-blog-dms-staging-bucket.
  2. Under Migrate data in the navigation pane, choose Endpoints.
  3. Create a source endpoint for the PostgreSQL connection.

  1. After you create the source endpoint, choose Test endpoint to make sure it’s connecting successfully to the PostgreSQL Instance.
  2. Similarly, create a target endpoint with the S3 staging bucket as a target and test the target endpoint.

  1. We’ll be writing replicated output from PostgreSQL in CSV format. the addColumnName=true; property in the AWS DMS configuration to make sure the schema information is written as headers in CSV output.

Now you’re ready to create the migration task.

  1. Under Migrate data in the navigation pane, choose Database migration tasks.
  2. Create a new replication task.
  3. Specify the source and target endpoints you created and choose the table that needs to be replicated.

After the replication task is created, it will start replicating data automatically.

When the status shows as Load complete, data should appear in the following S3 locations (the bucket name in this example is a placeholder):

  • s3://<gluedq-blog-dms-staging-bucket>/staging_layer/test_schema/sample_data/
  • s3://<gluedq-blog-dms-staging-bucket>/staging_layer/test_schema/gender/
  • s3://<gluedq-blog-dms-staging-bucket>/staging_layer/test_schema/blood_group/

About the Authors

Himanshu Sahni is a Senior Data Architect in AWS Professional Services. Himanshu specializes in building Data and Analytics solutions for enterprise customers using AWS tools and services. He is an expert in AI/ ML and Big Data tools like Spark, AWS Glue and Amazon EMR. Outside of work, Himanshu likes playing chess and tennis.

Arunabha Datta is a Senior Data Architect at AWS Professional Services. He collaborates with customers and partners to create and execute modern data architecture using AWS Analytics services. Arunabha’s passion lies in assisting customers with digital transformation, particularly in the areas of data lakes, databases, and AI/ML technologies. Besides work, his hobbies include photography and he likes to spend quality time with his family.

Charishma Ravoori is an Associate Data & ML Engineer at AWS Professional Services. She focuses on developing solutions for customers that include building out data pipelines, developing predictive models and generating ai chatbots using AWS/Amazon tools. Outside of work, Charishma likes to experiment with new recipes and play the guitar.

Accelerate Amazon Redshift Data Lake queries with AWS Glue Data Catalog Column Statistics

Post Syndicated from Kalaiselvi Kamaraj original https://aws.amazon.com/blogs/big-data/accelerate-amazon-redshift-data-lake-queries-with-column-level-statistics/

Amazon Redshift enables you to efficiently query and retrieve structured and semi-structured data from open format files in Amazon S3 data lake without having to load the data into Amazon Redshift tables. Amazon Redshift extends SQL capabilities to your data lake, enabling you to run analytical queries. Amazon Redshift supports a wide variety of tabular data formats like CSV, JSON, Parquet, ORC and open tabular formats like Apache Hudi, Linux foundation Delta Lake and Apache Iceberg.

You create Redshift external tables by defining the structure for your files, S3 location of the files and registering them as tables in an external data catalog. The external data catalog can be AWS Glue Data Catalog, the data catalog that comes with Amazon Athena, or your own Apache Hive metastore.

Over the last year, Amazon Redshift added several performance optimizations for data lake queries across multiple areas of query engine such as rewrite, planning, scan execution and consuming AWS Glue Data Catalog column statistics. To get the best performance on data lake queries with Redshift, you can use AWS Glue Data Catalog’s column statistics feature to collect statistics on Data Lake tables. For Amazon Redshift Serverless instances, you will see improved scan performance through increased parallel processing of S3 files and this happens automatically based on RPUs used.

In this post, we highlight the performance improvements we observed using industry standard TPC-DS benchmarks. Overall execution time of TPC-DS 3 TB benchmark improved by 3x. Some of the queries in our benchmark experienced up to 12x speed up.

Performance Improvements

Several performance optimizations were done over the last year to improve performance of data lake queries including the following.

  • Consume AWS Glue Data Catalog column statistics and tuning of Redshift optimizer to improve quality of query plans
  • Utilize bloom filters for partition columns
  • Improved scan efficiency for Amazon Redshift Serverless instances through increased parallel processing of files
  • Novel query rewrite rules to merge similar scans
  • Faster retrieval of metadata from AWS Glue Data Catalog

To understand the performance gains, we tested the performance on the industry-standard TPC-DS benchmark using 3 TB data sets and queries which represents different customer use cases. Performance was tested on a Redshift serverless data warehouse with 128 RPU. In our testing, the dataset was stored in Amazon S3 in Parquet format and AWS Glue Data Catalog was used to manage external databases and tables. Fact tables were partitioned on the date column, and each fact table consisted of approximately 2,000 partitions. All of the tables had their row count table property, numRows, set as per the spectrum query performance guidelines.

We did a baseline run on Redshift patch version (patch 172) from last year. Later, we ran all TPC-DS queries on latest patch version (patch 180) that includes all performance optimizations added over last year. Then we used AWS Glue Data Catalog’s column statistics feature to compute statistics for all the tables and measured improvements with the presence of AWS Glue Data Catalog column statistics.

Our analysis revealed that the TPC-DS 3TB Parquet benchmark saw substantial performance gains with these optimizations. Specifically, partitioned Parquet with our latest optimizations achieved 2x faster runtimes compared to the previous implementation. Enabling AWS Glue Data Catalog column statistics further improved performance by 3x versus last year. The following graph illustrates these runtime improvements for the full benchmark (all TPC-DS queries) over the past year, including the additional boost from using AWS Glue Data Catalog column statistics.

Improvement in total runtime of TPC-DS 3T workload

Figure 1: Improvement in total runtime of TPC-DS 3T workload

The following graph presents the top queries from the TPC-DS benchmark with the greatest performance improvement over the last year with and without AWS Glue Data Catalog column statistics. You can see that performance improves a lot when statistics exist on AWS Glue Data Catalog (for details on how to get statistics for your Data Lake tables, please refer to optimizing query performance using AWS Glue Data Catalog column statistics). Specifically, multi-join queries will benefit the most from AWS Glue Data Catalog column statistics because the optimizer uses statistics to choose the right join order and distribution strategy.

Speed-up in TPC-DS queries

Figure 2: Speed-up in TPC-DS queries

Let’s discuss some of the optimizations that contributed to improved query performance.

Optimizing with table-level statistics

Amazon Redshift’s design enables it to handle large-scale data challenges with superior speed and cost-efficiency. Its massively parallel processing (MPP) query engine, AI-powered query optimizer, auto-scaling capabilities, and other advanced features allow Redshift to excel at searching, aggregating, and transforming petabytes of data.

However, even the most powerful systems can experience performance degradation if they encounter anti-patterns like grossly inaccurate table statistics, such as the row count metadata.

Without this crucial metadata, Redshift’s query optimizer may be limited in the number of possible optimizations, especially those related to data distribution during query execution. This can have a significant impact on overall query performance.

To illustrate this, consider the following simple query involving an inner join between a large table with billions of rows and a small table with only a few hundred thousand rows.

select small_table.sellerid, sum(large_table.qtysold)
from large_table, small_table
where large_table.salesid = small_table.listid
 and small_table.listtime > '2023-12-01'
 and large_table.saletime > '2023-12-01'
group by 1 order by 1

If executed as-is, with the large table on the right-hand side of the join, the query will lead to sub-optimal performance. This is because the large table will need to be distributed (broadcast) to all Redshift compute nodes to perform the inner join with the small table, as shown in the following diagram.

Inaccurate table statistics lead to limited optimizations and large amounts of data broadcast among compute nodes for a simple inner join

Figure 3: Inaccurate table statistics lead to limited optimizations and large amounts of data broadcast among compute nodes for a simple inner join

Now, consider a scenario where the table statistics, such as the row count, are accurate. This allows the Amazon Redshift query optimizer to make more informed decisions, such as determining the optimal join order. In this case, the optimizer would immediately rewrite the query to have the large table on the left-hand side of the inner join, so that it is the small table that is broadcast across the Redshift compute nodes, as illustrated in the following diagram.

Accurate table statistics lead to high degree of optimizations and very little data broadcast among compute nodes for a simple inner join

Figure 4: Accurate table statistics lead to high degree of optimizations and very little data broadcast among compute nodes for a simple inner join

Fortunately, Amazon Redshift automatically maintains accurate table statistics for local tables by running the ANALYZE command in the background. For external tables (data lake tables), however, AWS Glue Data Catalog column statistics are recommended for use with Amazon Redshift as we will discuss in the next section. For more general information on optimizing queries in Amazon Redshift, please refer to the documentation on factors affecting query performance, data redistribution, and Amazon Redshift best practices for designing queries.

Improvements with AWS Glue Data Catalog column statistics

AWS Glue Data Catalog has a feature to compute column level statistics for Amazon S3 backed external tables. AWS Glue Data Catalog can compute column level statistics such as NDV, Number of Nulls, Min/Max and Avg. column width for the columns without the need for additional data pipelines. Amazon Redshift cost-based optimizer utilizes these statistics to come up with better quality query plans. In addition to consuming statistics, we also made several improvements in cardinality estimations and cost tuning to get high quality query plans thereby improving query performance.

TPC-DS 3TB dataset showed 40% improvement in total query execution time when these AWS Glue Data Catalog column statistics were provided. Individual TPC-DS queries showed up to 5x improvements in query execution time. Some of the queries that had greater impact in execution time are Q85, Q64, Q75, Q78, Q94, Q16, Q04, Q24 and Q11.

We will go through an example where cost-based optimizer generated a better query plan with statistics and how it improved the execution time.

Let’s consider following simpler version of TPC-DS Q64 to showcase the query plan differences with statistics.

select i_product_name product_name
,i_item_sk item_sk
,ad1.ca_street_number b_street_number
,ad1.ca_street_name b_street_name
,ad1.ca_city b_city
,ad1.ca_zip b_zip
,d1.d_year as syear
,count(*) cnt
,sum(ss_wholesale_cost) s1
,sum(ss_list_price) s2
,sum(ss_coupon_amt) s3
FROM   tpcds_3t_alls3_pp_ext.store_sales
,tpcds_3t_alls3_pp_ext.store_returns
,tpcds_3t_alls3_pp_ext.date_dim d1
,tpcds_3t_alls3_pp_ext.customer
,tpcds_3t_alls3_pp_ext.customer_address ad1
,tpcds_3t_alls3_pp_ext.item
WHERE
ss_sold_date_sk = d1.d_date_sk AND
ss_customer_sk = c_customer_sk AND

ss_addr_sk = ad1.ca_address_sk and
ss_item_sk = i_item_sk and
ss_item_sk = sr_item_sk and
ss_ticket_number = sr_ticket_number and
i_color in ('firebrick','papaya','orange','cream','turquoise','deep') and
i_current_price between 42 and 42 + 10 and
i_current_price between 42 + 1 and 42 + 15
group by i_product_name
,i_item_sk
,ad1.ca_street_number
,ad1.ca_street_name
,ad1.ca_city
,ad1.ca_zip
,d1.d_year

Without Statistics

Following figure represents the logical query plan of Q64. You can observe that cardinality estimation of joins is not accurate. With inaccurate cardinalities, optimizer produces a sub-optimal query plan leading to higher execution time.

With Statistics

Following figure represents the logical query plan after consuming AWS Glue Data Catalog column statistics. Based on the highlighted changes, you can observe that the cardinality estimations of JOIN improved by many magnitudes helping the optimizer to choose a better join order and join strategy (broadcast DS_BCAST_INNER vs. distribute DS_DIST_BOTH). Switching the customer_address and customer table from inner to outer table and making join strategies as distribute has major impact because this reduces the data movement between the nodes and avoids spilling from hash table.

Logical query plan of Q64 without statistics

Figure 5: Logical query plan of Q64 without statistics

Logical query plan of Q64 after consuming column-level statistics

Figure 6: Logical query plan of Q64 after consuming AWS Glue Data Catalog column statistics

This change in query plan improved the query execution time of Q64 from 383s to 81s.

Given the greater benefits with AWS Glue Data Catalog column statistics for the optimizer, you should consider collecting stats for your data lake using AWS Glue. If your workload is a JOIN heavy workload, then collecting stats will show greater improvement on your workload. Refer to generating AWS Glue Data Catalog column statistics for instructions on how to collect statistics in AWS Glue Data Catalog.

Query rewrite optimization

We introduced a new query rewrite rule which combines scalar aggregates over the same common expression using slightly different predicates. This rewrite resulted in performance improvements on TPC-DS queries Q09, Q28, and Q88. Let’s focus on Q09 as a representative of these queries, given by the following fragment:

SELECT CASE
WHEN (SELECT COUNT(*)
FROM store_sales
WHERE ss_quantity BETWEEN 1 AND 20) > 48409437
THEN (SELECT AVG(ss_ext_discount_amt)
FROM store_sales
WHERE ss_quantity BETWEEN 1 AND 20)
ELSE (SELECT AVG(ss_net_profit)
FROM store_sales
WHERE ss_quantity BETWEEN 1 AND 20) END
AS bucket1,
<<4 more variations of the CASE expression above>>
FROM reason
WHERE r_reason_sk = 1

In total, there are 15 scans of the fact table store_sales, each one returning various aggregates over different subsets of data. The engine first performs subquery removal and transforms the various expressions in the CASE statements into relational subtrees connected via cross products, and then they are fused into one subquery handling all scalar aggregates. The resulting plan for Q09, described below using SQL for clarity, is given by:

SELECT CASE WHEN v1 > 48409437 THEN t1 ELSE e1 END,
<4 more variations>
FROM (SELECT COUNT(CASE WHEN b1 THEN 1 END) AS v1,
AVG(CASE WHEN b1 THEN ss_ext_discount_amt END) AS t1,
AVG(CASE WHEN b1 THEN ss_net_profit END) AS e1,
<4 more variations>
FROM reason,
(SELECT *,
ss_quantity BETWEEN 1 AND 20 AS b1,
<4 more variations>
FROM store_sales
WHERE ss_quantity BETWEEN 1 AND 20 OR
<4 more variations>))
WHERE r_reason_sk = 1)

In general, this rewrite rule results in the largest improvements both in latency (from 3x to 8x improvements) and bytes read from Amazon S3 (from 6x to 8x reduction in scanned bytes and, consequently, cost).

Bloom filter for partition columns

Amazon Redshift already uses Bloom filters on data columns of external tables in Amazon S3 to enable early and effective data filtering. Last year, we extended this support for partition columns as well. A Bloom filter is a probabilistic, memory-efficient data structure that accelerates join queries at scale by filtering rows that do not match the join relation, significantly reducing the amount of data transferred over the network. Amazon Redshift automatically determines what queries are suitable for leveraging Bloom filters at query runtime.

This optimization resulted in performance improvements on TPC-DS queries Q05, Q17 and Q54. This optimization resulted in large improvements in both latency (from 2x to 3x improvement) and bytes read from S3 (from 9x to 15x reduction in scanned bytes and, consequently cost).

Following is the subquery of Q05 which showcased improvements with runtime filter.

select s_store_id,
sum(sales_price) as sales,
sum(profit) as profit,
sum(return_amt) as returns,
sum(net_loss) as profit_loss
from
( select  ss_store_sk as store_sk,
ss_sold_date_sk  as date_sk,
ss_ext_sales_price as sales_price,
ss_net_profit as profit,
cast(0 as decimal(7,2)) as return_amt,
cast(0 as decimal(7,2)) as net_loss
from tpcds_3t_alls3_pp_ext.store_sales
union all
select sr_store_sk as store_sk,
sr_returned_date_sk as date_sk,
cast(0 as decimal(7,2)) as sales_price,
cast(0 as decimal(7,2)) as profit,
sr_return_amt as return_amt,
sr_net_loss as net_loss
from tpcds_3t_alls3_pp_ext.store_returns
) salesreturnss,
tpcds_3t_alls3_pp_ext.date_dim,
tpcds_3t_alls3_pp_ext.store
where date_sk = d_date_sk
and d_date between cast('1998-08-13' as date)
and (cast('1998-08-13' as date) +  14)
and store_sk = s_store_sk
group by s_store_id

Without bloom filter support on partition columns

Following figure is the logical query plan for sub-query of Q05. This appends two large fact tables store_sales (8B rows) and store_returns (863M rows) and then joins with very selective dimension tables date_dim and then with dimension table store. You can observe that join with date_dim table reduces the number of rows from 9B to 93M rows.

With bloom filter support on partition columns

With support of bloom filter on partition columns, we now create bloom filter for d_date_sk column of date_dim table and push down the bloom filters to store_sales and store_returns table. These bloom filters help to filter out the partitions in both store_sales and store_returns table because join happens on partition column (number of partitions processed reduces by 10x).

Logical query plan for sub-query of Q05 without bloom filter support on partition columns

Figure 7: Logical query plan for sub-query of Q05 without bloom filter support on partition columns

Logical query plan for sub-query of Q05 with bloom filter support on partition columns

Figure 8: Logical query plan for sub-query of Q05 with bloom filter support on partition columns

Overall, bloom filter on partition column will reduce the number of partitions processed resulting in reduced S3 listing calls and lesser number of data files to be read (reduction in scanned bytes). You can see that we only scan 89M rows from store_sales and 4M rows from store_returns because of the bloom filter. This reduced number of rows to process at JOIN level and helped in improving the overall query performance by 2x and scanned bytes by 9x.

Conclusion

In this post, we covered new performance optimizations in Amazon Redshift data lake query processing and how AWS Glue Data Catalog statistics helps to enhance quality of query plans for data lake queries in Amazon Redshift. These optimizations together improved TPC-DS 3 TB benchmark by 3x. Some of the queries in our benchmark benefited up to 12x speed up.

In summary, Amazon Redshift now offers enhanced query performance with optimizations such as AWS Glue Data Catalog column statistics, bloom filters on partition columns, new query rewrite rules and faster retrieval of metadata. These optimizations are enabled by default and Amazon Redshift users will benefit with better query response times for their workloads. For more information, please reach out to your AWS technical account manager or AWS account solutions architect. They will be happy to provide additional guidance and support.


About the authors

Kalaiselvi Kamaraj is a Sr. Software Development Engineer with Amazon. She has worked on several projects within Redshift Query processing team and currently focusing on performance related projects for Redshift Data Lake.

Mark Lyons is a Principal Product Manager on the Amazon Redshift team. He works on the intersection of data lakes and data warehouses. Prior to joining AWS, Mark held product leadership roles with Dremio and Vertica. He is passionate about data analytics and empowering customers to change the world with their data.

Asser Moustafa is a Principal Worldwide Specialist Solutions Architect at AWS, based in Dallas, Texas, USA. He partners with customers worldwide, advising them on all aspects of their data architectures, migrations, and strategic data visions to help organizations adopt cloud-based solutions, maximize the value of their data assets, modernize legacy infrastructures, and implement cutting-edge capabilities like machine learning and advanced analytics. Prior to joining AWS, Asser held various data and analytics leadership roles, completing an MBA from New York University and an MS in Computer Science from Columbia University in New York. He is passionate about empowering organizations to become truly data-driven and unlock the transformative potential of their data.