Tag Archives: AWS Glue

How Open Universities Australia modernized their data platform and significantly reduced their ETL costs with AWS Cloud Development Kit and AWS Step Functions

Post Syndicated from Michael Davies original https://aws.amazon.com/blogs/big-data/how-open-universities-australia-modernized-their-data-platform-and-significantly-reduced-their-etl-costs-with-aws-cloud-development-kit-and-aws-step-functions/

This is a guest post co-authored by Michael Davies from Open Universities Australia.

At Open Universities Australia (OUA), we empower students to explore a vast array of degrees from renowned Australian universities, all delivered through online learning. We offer students alternative pathways to achieve their educational aspirations, providing them with the flexibility and accessibility to reach their academic goals. Since our founding in 1993, we have supported over 500,000 students to achieve their goals by providing pathways to over 2,600 subjects at 25 universities across Australia.

As a not-for-profit organization, cost is a crucial consideration for OUA. While reviewing our contract for the third-party tool we had been using for our extract, transform, and load (ETL) pipelines, we realized that we could replicate much of the same functionality using Amazon Web Services (AWS) services such as AWS Glue, Amazon AppFlow, and AWS Step Functions. We also recognized that we could consolidate our source code (much of which was stored in the ETL tool itself) into a code repository that could be deployed using the AWS Cloud Development Kit (AWS CDK). By doing so, we had an opportunity to not only reduce costs but also to enhance the visibility and maintainability of our data pipelines.

In this post, we show you how we used AWS services to replace our existing third-party ETL tool, improving the team’s productivity and producing a significant reduction in our ETL operational costs.

Our approach

The migration initiative consisted of two main parts: building the new architecture and migrating data pipelines from the existing tool to the new architecture. Often, we would work on both in parallel, testing one component of the architecture while developing another at the same time.

From early in our migration journey, we began to define a few guiding principles that we would apply throughout the development process. These were:

  • Simple and modular – Use simple, reusable design patterns with as few moving parts as possible. Structure the code base to prioritize ease of use for developers.
  • Cost-effective – Use resources in an efficient, cost-effective way. Aim to minimize situations where resources are running idly while waiting for other processes to be completed.
  • Business continuity – As much as possible, make use of existing code rather than reinventing the wheel. Roll out updates in stages to minimize potential disruption to existing business processes.

Architecture overview

The following Diagram 1 is the high-level architecture for the solution.

Diagram 1: Overall architecture of the solution, using AWS Step Functions, Amazon Redshift and Amazon S3

The following AWS services were used to shape our new ETL architecture:

  • Amazon Redshift – A fully managed, petabyte-scale data warehouse service in the cloud. Amazon Redshift served as our central data repository, where we would store data, apply transformations, and make data available for use in analytics and business intelligence (BI). Note: The provisioned cluster itself was deployed separately from the ETL architecture and remained unchanged throughout the migration process.
  • AWS Cloud Development Kit (AWS CDK) – The AWS Cloud Development Kit (AWS CDK) is an open-source software development framework for defining cloud infrastructure in code and provisioning it through AWS CloudFormation. Our infrastructure was defined as code using the AWS CDK. As a result, we simplified the way we defined the resources we wanted to deploy while using our preferred coding language for development.
  • AWS Step Functions – With AWS Step Functions, you can create workflows, also called State machines, to build distributed applications, automate processes, orchestrate microservices, and create data and machine learning pipelines. AWS Step Functions can call over 200 AWS services including AWS Glue, AWS Lambda, and Amazon Redshift. We used the AWS Step Function state machines to define, orchestrate, and execute our data pipelines.
  • Amazon EventBridge – We used Amazon EventBridge, the serverless event bus service, to define the event-based rules and schedules that would trigger our AWS Step Functions state machines.
  • AWS Glue – A data integration service, AWS Glue consolidates major data integration capabilities into a single service. These include data discovery, modern ETL, cleansing, transforming, and centralized cataloging. It’s also serverless, which means there’s no infrastructure to manage. includes the ability to run Python scripts. We used it for executing long-running scripts, such as for ingesting data from an external API.
  • AWS Lambda – AWS Lambda is a highly scalable, serverless compute service. We used it for executing simple scripts, such as for parsing a single text file.
  • Amazon AppFlow – Amazon AppFlow enables simple integration with software as a service (SaaS) applications. We used it to define flows that would periodically load data from selected operational systems into our data warehouse.
  • Amazon Simple Storage Service (Amazon S3) – An object storage service offering industry-leading scalability, data availability, security, and performance. Amazon S3 served as our staging area, where we would store raw data prior to loading it into other services such as Amazon Redshift. We also used it as a repository for storing code that could be retrieved and used by other services.

Where practical, we made use of the file structure of our code base for defining resources. We set up our AWS CDK to refer to the contents of a specific directory and define a resource (for example, an AWS Step Functions state machine or an AWS Glue job) for each file it found in that directory. We also made use of configuration files so we could customize the attributes of specific resources as required.

Details on specific patterns

In the above architecture Diagram 1, we showed multiple flows by which data could be ingested or unloaded from our Amazon Redshift data warehouse. In this section, we highlight four specific patterns in more detail which were utilized in the final solution.

Pattern 1: Data transformation, load, and unload

Several of our data pipelines included significant data transformation steps, which were primarily performed through SQL statements executed by Amazon Redshift. Others required ingestion or unloading of data from the data warehouse, which could be performed efficiently using COPY or UNLOAD statements executed by Amazon Redshift.

In keeping with our aim of using resources efficiently, we sought to avoid running these statements from within the context of an AWS Glue job or AWS Lambda function because these processes would remain idle while waiting for the SQL statement to be completed. Instead, we opted for an approach where SQL execution tasks would be orchestrated by an AWS Step Functions state machine, which would send the statements to Amazon Redshift and periodically check their progress before marking them as either successful or failed. The following Diagram 2 shows this workflow.

Data transformation, load, and unload

Diagram 2: Data transformation, load, and unload pattern using Amazon Lambda and Amazon Redshift within an AWS Step Function

Pattern 2: Data replication using AWS Glue

In cases where we needed to replicate data from a third-party source, we used AWS Glue to run a script that would query the relevant API, parse the response, and store the relevant data in Amazon S3. From here, we used Amazon Redshift to ingest the data using a COPY statement. The following Diagram 3 shows this workflow.

Image 3: Copying from external API to Redshift with AWS Glue

Diagram 3: Copying from external API to Redshift with AWS Glue

Note: Another option for this step would be to use Amazon Redshift auto-copy, but this wasn’t available at time of development.

Pattern 3: Data replication using Amazon AppFlow

For certain applications, we were able to use Amazon AppFlow flows in place of AWS Glue jobs. As a result, we could abstract some of the complexity of querying external APIs directly. We configured our Amazon AppFlow flows to store the output data in Amazon S3, then used an EventBridge rule based on an End Flow Run Report event (which is an event which is published when a flow run is complete) to trigger a load into Amazon Redshift using a COPY statement. The following Diagram 4 shows this workflow.

By using Amazon S3 as an intermediate data store, we gave ourselves greater control over how the data was processed when it was loaded into Amazon Redshift, when compared with loading the data directly to the data warehouse using Amazon AppFlow.

Image 4: Using Amazon AppFlow to integrate external data

Diagram 4: Using Amazon AppFlow to integrate external data to Amazon S3 and copy to Amazon Redshift

Pattern 4: Reverse ETL

Although most of our workflows involve data being brought into the data warehouse from external sources, in some cases we needed the data to be exported to external systems instead. This way, we could run SQL queries with complex logic drawing on multiple data sources and use this logic to support operational requirements, such as identifying which groups of students should receive specific communications.

In this flow, shown in the following Diagram 5, we start by running an UNLOAD statement in Amazon Redshift to unload the relevant data to files in Amazon S3. From here, each file is processed by an AWS Lambda function, which performs any necessary transformations and sends the data to the external application through one or more API calls.

Image 5: Reverse ETL workflow, sending data back out to external data sources

Diagram 5: Reverse ETL workflow, sending data back out to external data sources

Outcomes

The re-architecture and migration process took 5 months to complete, from the initial concept to the successful decommissioning of the previous third-party tool. Most of the architectural effort was completed by a single full-time employee, with others on the team primarily assisting with the migration of pipelines to the new architecture.

We achieved significant cost reductions, with final expenses on AWS native services representing only a small percentage of projected costs compared to continuing with the third-party ETL tool. Moving to a code-based approach also gave us greater visibility of our pipelines and made the process of maintaining them quicker and easier. Overall, the transition was seamless for our end users, who were able to view the same data and dashboards both during and after the migration, with minimal disruption along the way.

Conclusion

By using the scalability and cost-effectiveness of AWS services, we were able to optimize our data pipelines, reduce our operational costs, and improve our agility.

Pete Allen, an analytics engineer from Open Universities Australia, says, “Modernizing our data architecture with AWS has been transformative. Transitioning from an external platform to an in-house, code-based analytics stack has vastly improved our scalability, flexibility, and performance. With AWS, we can now process and analyze data with much faster turnaround, lower costs, and higher availability, enabling rapid development and deployment of data solutions, leading to deeper insights and better business decisions.”

Additional resources


About the Authors

Michael Davies is a Data Engineer at OUA. He has extensive experience within the education industry, with a particular focus on building robust and efficient data architecture and pipelines.

Emma Arrigo is a Solutions Architect at AWS, focusing on education customers across Australia. She specializes in leveraging cloud technology and machine learning to address complex business challenges in the education sector. Emma’s passion for data extends beyond her professional life, as evidenced by her dog named Data.

Hybrid big data analytics with Amazon EMR on AWS Outposts

Post Syndicated from Shoukat Ghouse original https://aws.amazon.com/blogs/big-data/hybrid-big-data-analytics-with-amazon-emr-on-aws-outposts/

Businesses require powerful and flexible tools to manage and analyze vast amounts of information. Amazon EMR has long been the leading solution for processing big data in the cloud. Amazon EMR is the industry-leading big data solution for petabyte-scale data processing, interactive analytics, and machine learning using over 20 open source frameworks such as Apache Hadoop, Hive, and Apache Spark. However, data residency requirements, latency issues, and hybrid architecture needs often challenge purely cloud-based solutions.

Enter Amazon EMR on AWS Outposts—a groundbreaking extension that brings the power of Amazon EMR directly to your on-premises environments. This innovative service merges the scalability, performance (the Amazon EMR runtime for Apache Spark is 4.5 times more performant than Apache Spark 3.5.1), and ease of Amazon EMR with the control and proximity of your data center, empowering enterprises to meet stringent regulatory and operational requirements while unlocking new data processing possibilities.

In this post, we dive into the transformative features of EMR on Outposts, showcasing its flexibility as a native hybrid data analytics service that allows seamless data access and processing both on premises and in the cloud. We also explore how it integrates smoothly with your existing IT infrastructure, providing the flexibility to keep your data where it best fits your needs while performing computations entirely on premises. We examine a hybrid setup where sensitive data remains locally in Amazon S3 on Outposts and public data in an AWS Regional Amazon Simple Storage Service bucket. This configuration allows you to augment your sensitive on-premises data with cloud data while making sure all data processing and compute runs on-premises in AWS Outposts Racks.

Solution overview

Consider a fictional company named Oktank Finance. Oktank aims to build a centralized data lake to store vast amounts of structured and unstructured data, enabling unified access and supporting advanced analytics and big data processing for data-driven insights and innovation. Additionally, Oktank must comply with data residency requirements, making sure that confidential data is stored and processed strictly on premises. Oktank also needs to enrich their datasets with non-confidential and public market data stored in the cloud on Amazon S3, which means they should be able to join datasets across their on-premises and cloud data stores.

Traditionally, Oktank’s big data platforms tightly coupled compute and storage resources, creating an inflexible system where decommissioning compute nodes could lead to data loss. To avoid this situation, Oktank aims to decouple compute from storage, allowing them to scale down compute nodes and repurpose them for other workloads without compromising data integrity and accessibility.

To meet these requirements, Oktank decides to adopt Amazon EMR on Outposts as their big data analytics platform and Amazon S3 on Outposts as their on-premises data store for their data lake. With EMR on Outposts, Oktank can make sure that all compute occurs on premises within their Outposts rack while still being able to query and join the public data stored in Amazon S3 with their confidential data stored in S3 on Outposts, using the same unified data APIs. For data processing, Oktank can choose from a wide variety of applications available on Amazon EMR. In this post, we use Spark as the data processing framework.

This approach makes sure that all data processing and analytics are performed locally within their on-premises environment, allowing Oktank to maintain compliance with data privacy and regulatory requirements. Simultaneously, by avoiding the need to replicate public data to their on-premises data centers, Oktank reduces storage costs and simplifies their end-to-end data pipelines by eliminating additional data movement jobs.

The following diagram illustrates the high-level solution architecture.

As explained earlier, the S3 on Outposts bucket in the architecture holds Oktank’s sensitive data, which stays on the Outpost in Oktank’s data center while the Regional S3 bucket holds the non-sensitive data.

In this post, to achieve high network performance from the Outpost to the Regional S3 bucket and vice-versa, we also use AWS Direct Connect with a virtual private gateway. This is especially beneficial when you need higher query throughput to the Regional S3 bucket by making sure the traffic is routed through your own dedicated network channel to AWS.

The solution involves deploying an EMR cluster on an Outposts rack. A service link connects AWS Outposts to a Region. The service link is a necessary connection between your Outposts and the Region (or home Region). It allows for the management of the Outposts and the exchange of traffic to and from the Region.

You can also access Regional S3 buckets using this service link. However, in this post, we employ an alternate option to enable the EMR cluster to privately access the Regional S3 bucket through the local gateway. This helps optimize data access from the Regional S3 bucket as traffic is routed through Direct Connect.

To enable the EMR cluster to access Amazon S3 privately over Direct Connect, a route is configured in the Outposts subnet (marked as 2 in the architecture diagram) to direct Amazon S3 traffic through the local gateway. Upon reaching the local gateway, the traffic is routed over Direct Connect (private virtual interface) to a virtual private gateway in the Region. The second VPC (5 in diagram), which includes the S3 interface endpoint, is connected to this virtual private gateway. A route is then added to make sure that traffic can return to the EMR cluster. This setup provides more efficient, higher-bandwidth communication between the EMR cluster and Regional S3 buckets.

For big data processing, we use Amazon EMR. Amazon EMR supports access to local S3 on Outposts with the Apache Hadoop S3A connector from Amazon EMR version 7.0.0 onwards. EMR File System (EMRFS) with S3 on Outposts is not supported. We use EMR Studio notebooks for running interactive queries on the data. We also submit Spark jobs as a step on the EMR cluster. We also use the AWS Glue Data Catalog as the external Hive compatible metastore, which serves as the central technical metadata catalog. The Data Catalog is a centralized metadata repository for all your data assets across various data sources. It provides a unified interface to store and query information about data formats, schemas, and sources. Additionally, we use AWS Lake Formation for access controls on the AWS Glue table. You still need to control the raw files access on the S3 on Outposts bucket with AWS Identity and Access Management (IAM) permissions in this architecture. At the time of writing, Lake Formation can’t directly manage access to data on the S3 on Outposts bucket. Access to the actual data files stored in the S3 on Outposts bucket is managed with IAM permissions.

In the following sections, you will implement this architecture for Oktank. We focus on a specific use case for Oktank Finance, where they maintain sensitive customer stockholding data in a local S3 on Outposts bucket. Additionally, they have publicly available stock details stored in a Regional S3 bucket. Their goal is to explore both the datasets within their on-premises Outpost setup. Additionally, they need to enrich the customer stock holdings data by combining it with the publicly available stock details data.

First, we explore how to access both datasets using an EMR cluster. Then, we demonstrate the process of performing joins between the local and public data. We also demonstrate how to use Lake Formation to effectively manage permissions for these tables. We explore two primary scenarios throughout this walkthrough. In the interactive use case, we demonstrate how users can connect to the EMR cluster and run queries interactively using EMR Studio notebooks. This approach allows for real-time data exploration and analysis. Additionally, we show you how to submit batch jobs to Amazon EMR using EMR steps for automated, scheduled data processing. This method is ideal for recurring tasks or large-scale data transformations.

Prerequisites

Complete the following prerequisite steps:

  1. Have an AWS account and a role with administrator access. If you don’t have an account, you can create one.
  2. Have an Outposts rack installed and running.
  3. Create an EC2 key pair. This allows you to connect to the EMR cluster nodes even if Regional connectivity is lost.
  4. Set up Direct Connect. This is required only if you want to deploy the second AWS CloudFormation template as explained in the following section.

Deploy the CloudFormation stacks

In this post, we’ve divided the setup into four CloudFormation templates, each responsible for provisioning a specific component of the architecture. The templates come with default parameters, which you may need to adjust based on your specific configuration requirements.

Stack1 provisions the network infrastructure on Outposts. It also creates the S3 on Outposts bucket and Regional S3 bucket. It copies the sample data to the buckets to simulate the data setup for Oktank. Confidential data for customer stock holdings is copied to the S3 on Outposts bucket, and non-confidential data for stock details is copied to the Regional S3 bucket.

Stack2 provisions the infrastructure to connect to the Regional S3 bucket privately using Direct Connect. It establishes a VPC with private connectivity to both the regional S3 bucket and the Outposts subnet. It also creates an Amazon S3 VPC interface endpoint to allow private access to Amazon S3. It establishes a virtual private gateway for connectivity between the VPC and Outposts subnet. Lastly, it configures a private Amazon Route 53 hosted zone for Amazon S3, enabling private DNS resolution for S3 endpoints within the VPC. You can skip deploying this stack if you don’t need to route traffic using Direct Connect.

Stack3 provisions the EMR cluster infrastructure, AWS Glue database, and AWS Glue tables. The stack creates an AWS Glue database named oktank_outpostblog_temp and three tables under it: stock_details, stockholdings_info, and stockholdings_info_detailed. The table stock_details contains public information for the stocks, and the data location of this table points to the Regional S3 bucket. The tables stockholdings_info and stockholdings_info_detailed contain confidential information, and their data location is in the S3 on Outposts bucket. It also creates a runtime role named outpostblog-runtimeRole1. A runtime role is an IAM role that you associate with an EMR step, and jobs use this role to access AWS resources. With runtime roles for EMR steps, you can specify different IAM roles for the Spark and the Hive jobs, thereby scoping down access at a job level. This allows you to simplify access controls on a single EMR cluster that is shared between multiple tenants, wherein each tenant can be isolated using IAM roles. This stack also grants the required permissions on the runtime role to grant access on the Regional S3 bucket and the S3 on Outposts bucket. The EMR cluster uses a bootstrap action that runs a script to copy sample data to the S3 on Outposts bucket and the Regional S3 bucket for the two tables.

Stack4 provisions the EMR Studio. We will connect to EMR Studio notebook and interact with the data stored across S3 on Outposts and the Regional S3 bucket. This stack outputs the EMR Studio URL, which you can use to connect to EMR Studio.

Run the preceding CloudFormation stacks in sequence with an admin role to create the solution resources.

Access the data and join tables

To verify the solution, complete the following steps:

  1. On the AWS CloudFormation console, navigate to the Outputs tab of Stack4, which deployed the EMR Studio, and choose the EMR Studio URL.

This will open EMR Studio in a new window.

  1. Create a workspace and use the default options.

The workspace will launch in a new tab.

  1. Connect to the EMR cluster using the runtime role (outpostblog-runtimeRole1).

You are now connected to the EMR cluster.

  1. Choose the File Browser tab and open the notebook while choosing the kernel as PySpark.
    File browser tab
  2. Run the following query in the notebook to read from the stock details table. This table points to public data stored in the Regional S3 bucket.
    spark.sql("select * from oktank_outpostblog_temp.stock_details").show(5)

    Public data stored

  3. Run the following query to read from the confidential data stored in the local S3 on Outposts bucket:
    spark.sql("select * from oktank_outpostblog_temp.stockholdings_info").show(5)

    Confidential data

As highlighted earlier, one of the requirements for Oktank is to enrich the preceding data with data from the Regional S3 bucket.

  1. Run the following query to join the preceding two tables:
    spark.sql("select customerid,sharesheld,purchasedate, a.stockid, b.stockname,b.category,b.currentprice from oktank_outpostblog_temp.stockholdings_info a inner join oktank_outpostblog_temp.stock_details b on a.stockid=b.stockid order by customerid").show(10)

    S3 on Outposts

Control access to tables using Lake Formation

In this post, we also showcase how you can control access to the tables using Lake Formation. To demonstrate, let’s block access to RuntimeRole1 on the stockholdings_info table.

  1. On the Lake Formation console, choose Tables in the navigation pane.
  2. Select the table stockholdings_info and on the Actions menu, choose View to view the current access permissions on this table.
    AWS Lake Formation
  3. Select IAMAllowedPrincipals from the list of principals and choose Revoke to revoke the permission.
    Revoke permissions
  4. Go back to the EMR Studio notebook and rerun the earlier query.
    Data access query fails

Oktank’s data access query fails because Lake Formation has denied permission to the runtime role; you will need to adjust the permissions.

  1. To resolve this issue, return to the Lake Formation console, select the stockholdings_info table, and on the Actions menu, choose Grant.
  2. Assign the necessary permissions to the runtime role to make sure it can access the table.
    Grant permission
  3. Select IAM users and roles and choose the runtime role (outpostblog-runtimeRole1).
    Grant data lake permissions
  4. Choose the table stockholdings_info from the list of tables and for Table permissions, select Select.
    Table permissions
  5. Select All data access and choose Grant.
    Data permissions
  6. Go back to the notebook and rerun the query.
    Rerun the query

The query now succeeds because we granted access to the runtime role connected to the EMR cluster through the EMR Studio notebook. This demonstrates how Lake Formation allows you to manage permissions on your Data Catalog tables.

The previous steps only restrict access to the table in the catalog, not to the actual data files stored in the S3 on Outposts bucket. To control access to these data files, you need to use IAM permissions. As mentioned earlier, Stack3 in this post handles the IAM permissions for the data. For access control on the Regional S3 bucket with Lake Formation, you don’t need to specifically provide IAM permissions on the specific S3 bucket to the roles. Lake Formation manages the Regional S3 bucket access controls for runtime roles. Refer to Introducing runtime roles for Amazon EMR steps: Use IAM roles and AWS Lake Formation for access control with Amazon EMR for detailed guidance on managing access to a Regional S3 bucket with Lake Formation and EMR runtime roles.

Submit a batch job

Next, let’s submit a batch job as an EMR step on the EMR cluster. Before we do that, let’s confirm there is currently no data in the table stockholdings_info_detailed. Run the following query in the notebook:

spark.sql("select * from oktank_outpostblog_temp.stockholdings_info_detailed").show(10)

Submit a batch job
You will not see any data in this table. You can now detach the notebook from the cluster.
You will now insert data in this table using a batch job submitted as an EMR step.

  1. On the EMR console, navigate to the cluster EMROutpostBlog and submit a step.
  2. Choose Spark Application for Type.
  3. Select the py script from the scripts folder in your S3 bucket created by the CloudFormation template.
  4. For Permissions, choose the runtime role (outpostblog-RuntimeRole1).
  5. Choose Add step to submit the job.

Wait for the job to complete. The job inserted data into the stockholdings_info_detailed table. You can rerun the earlier query in the notebook to verify the data:

spark.sql("select * from oktank_outpostblog_temp.stockholdings_info_detailed").show(10)

Verify the data

Clean up

To avoid incurring further charges, delete the CloudFormation stacks.

  1. Before deleting Stack4, run the following shell command (with the %%sh magic command) in the EMR Studio notebook to delete the objects from the S3 on Outposts bucket:
    aws s3api delete-objects --bucket <replace with value of key S3OutpostBucketAccessPointAlias1 from stack 3 output> --delete "$(aws s3api list-object-versions --bucket <replace with value of key S3OutpostBucketAccessPointAlias1 from stack 3 output> --output=json | jq '{Objects: [.Versions[]|{Key:.Key,VersionId:.VersionId}], Quiet: true}')"

    Delete the objects from the S3 on Outposts bucket

  2. Next, manually delete the EMR workspace from the EMR Studio.
  3. You can now delete the stacks, starting with Stack4, Stack3, Stack2, and finally Stack1.

Conclusion

In this post, we demonstrated how to use Amazon EMR on Outposts as a managed big data processing service in your on-premises setup. We explored how you can set up the cluster to access data stored in an S3 on Outposts bucket on premises and also efficiently access data in the Regional S3 bucket with private networking. We also explored Glue Data Catalog as a serverless external Hive metastore and managed access control to the catalog tables using Lake Formation. We accessed the data interactively using EMR Studio notebooks and processed it as a batch job using EMR steps.

To learn more, visit Amazon EMR on AWS Outposts.

For further reading, refer to the following resources:


About the Authors

Shoukat Ghouse is a Senior Big Data Specialist Solutions Architect at AWS. He helps customers around the world build robust, efficient and scalable data platforms on AWS leveraging AWS analytics services like AWS Glue, AWS Lake Formation, Amazon Athena and Amazon EMR.

Fernando Galves is an Outpost Solutions Architect at AWS, specializing in networking, security, and hybrid cloud architectures. He helps customers design and implement secure hybrid environments using AWS Outposts, focusing on complex networking solutions and seamless integration between on-premises and cloud infrastructure.

How MuleSoft achieved cloud excellence through an event-driven Amazon Redshift lakehouse architecture

Post Syndicated from Sean Zou original https://aws.amazon.com/blogs/big-data/how-mulesoft-achieved-cloud-excellence-through-an-event-driven-amazon-redshift-lakehouse-architecture/

This post is cowritten with Sean Zou, Terry Quan and Audrey Yuan from MuleSoft.

In our previous thought leadership blog post Why a Cloud Operating Model we defined a COE Framework and showed why MuleSoft implemented it and the benefits they received from it. In this post, we’ll dive into the technical implementation describing how MuleSoft used Amazon EventBridge, Amazon Redshift, Amazon Redshift Spectrum, Amazon S3, & AWS Glue to implement it.

Solution overview

MuleSoft’s solution was to build a lakehouse built on top of AWS services, illustrated in the following diagram, supporting a portal. To provide near real-time analytics we used an event-driven strategy that which would trigger AWS Glue jobs an refresh materialized views.  We also implemented a layered approach that included collection, preparation, and enrichment making it straightforward to identify areas that affect data accuracy.

For MuleSoft’s lakehouse end-to-end solution, the following phases are key:

  • Preparation phase
  • Enrichment phase
  • Action phase

In the following sections, we discuss these phases in more detail.

Preparation phase

Using the COE Framework, we engaged with the stakeholders in the preparation phase to determine the business goals and identify the data sources to ingest. Examples of data sources were cloud assets inventory, AWS Cost and Usage Reports, and AWS Trusted Advisor data. The ingested data is processed in the lakehouse to implement the Well-Architected pillars, utilization, security, and compliance status checks and measures.

How you configure the CUR data and the Trusted Advisor data to land into S3?

The configuration process involves multiple components for both CUR and Trusted Advisor data storage. For CUR setup, customers need to configure an S3 bucket where the CUR report will be delivered, either by selecting an existing bucket or creating a new one. The S3 bucket requires a policy to be applied and customers must specify an S3 path prefix which creates a subfolder for CUR file delivery .

Trusted Advisor data is configured to use Kinesis Firehose to deliver customer summary data to the Support Data lake S3 bucket .The data ingestion process uses firehose buffer parameters (1MB buffer size and 60-second buffer time) to manage data flow to the S3 bucket .

The Trusted Advisor data is stored in JSON and GZIP format, following a specific folder structure with hourly partitions using the “YYYY-MM-DD-HH” format .

The S3 partition structure for Trusted Advisor customer summary data includes separate paths for success and error data, and the data is encrypted using a KMS key specific to Trusted Advisor data .

MuleSoft used AWS managed services and data ingestion tools to pull from multiple data sources and that can support customizations. Cloudquery is used tool to gather cloud infrastructure information, which can connect many infrastructure data sources out of the box and land it into an Amazon S3 bucket. The MuleSoft Anypoint Platform provides an integration layer to integrate infrastructure tools, accommodating many data sources like on-premise, SaaS, and commercial off-the-shelf (COTS) software. Cloud Custodian  was used for its capability of managing cloud resources and auto-remediation with customizations.

Enrichment phase

The enrichment phase includes ingesting raw data aligning with our business goals into the lakehouse through our pipelines, and consolidating the data to create a single pane of glass.

The pipelines adopt the event-driven architecture consisting of EventBridge, Amazon Simple Queue Service (Amazon SQS), and Amazon S3 Event Notifications to provide near real-time data for analysis. When new data arrives in the source bucket, new object creation is captured by the EventBridge rule, which invokes the AWS Glue workflow, consisting of an AWS Glue crawler and AWS Glue extract, transform, and load (ETL) jobs. We also configured S3 Event Notifications to send messages to the SQS queue to make sure the pipeline will only process the new data.

The AWS Glue ETL job cleanses and standardizes the data, so that it’s ready to be analyzed using Amazon Redshift. To tackle data with complex structures, additional processing is performed to flatten the nested data formats into a relational model. The flattening step also extracts the tags of AWS assets out of the nested JSON objects and pivots them into individual columns, enabling tagging enforcement controls and ownership attribution.  The ownership attribution of the infrastructure data provides accountability and holds teams responsible for the costs, utilization, security, compliance, and remediation of their cloud assets.  One important tag is asset ownership which is from the tags extracted from the flattening step, this data can be attributed to the corresponding owners by SQL scripts.

When the workflow is complete, the raw data from different sources and with various structures is now  centralized in the data warehouse.  From there, disjointed data with different purposes is ready to be consolidated and translated into actionable intelligence in the Well-Architected Pillars by coding out the business logic.

 Solutions for the enrichment phase

In the enrichment phase, we faced a number of storage, efficiency, and scalability challenges given the sheer volume of data. We used three techniques (file partitioning, Redshift Spectrum, and materialized views) to address these issues and scale without compromising performance.

File partitioning

MuleSoft’s infrastructure data is stored in folder structure: year, month, day, hour, account, and Region in an S3 bucket, so AWS Glue crawlers are able to automatically identify and add partitions to the tables in the AWS Glue Data Catalog. Partitioning helps improve query performance significantly because it optimizes parallel processing for queries. The amount of data scanned by each query is restricted based on the partition keys, helping reduce overall data transfers, processing time, and computation costs. Although partitioning is an optimization technique that helps improve query efficiency, it’s important to keep in mind two key points while using this technique:

  • The Data Catalog has a maximum cap of 10 million partitions per table
  • Query performance gets compromised as partitions grow rapidly

Therefore, balancing the number of partitions in the Data Catalog tables and query efficiency is essential. We decided on a data retention policy of 3 months and configured a lifecycle rule to expire any data older than that.

Our event-driven architecture–AWS Eventbridge event is invoked when objects are put into or removed from an S3 bucket, event messages are published to the SQS queue using S3 Event Notifications, which invokes an AWS Glue crawler to either add new partitions or removes old partitions from the Data Catalog based on the messages handling the partition cleanup.

Amazon Redshift and concurrency scaling

MuleSoft uses Amazon Redshift to query the data in S3 because it provides large scale compute and minimized data redundancy. MuleSoft also used Amazon Redshift concurrency scaling to run concurrent queries with consistently fast query performance. Amazon Redshift automatically added query processing power in seconds to process a high number of concurrent queries without any delays.

Materialized views

Another technique we used is Amazon Redshift materialized views. Materialized views store preset query results that future similar queries can use, so many computation steps can be skipped. Therefore, relevant data can be accessed efficiently, which leads to query optimization. Additionally, materialized views can be automatically and incrementally refreshed. Therefore, we can achieve a single pane of glass in our cloud infrastructure with the most up-to-date projections, trends, and actionable insights to our organization with improved query performance.

Amazon Redshift Materialized Views (MVs) are used extensively for reporting in MuleSoft’s Cloud Central portal, but if users needed to drill down into a granular view they could reference external tables.

Mulesoft is currently manually refreshing the materialized views through the event-driven architecture, but is evaluating a switch to automatic refresh.

Action phase

Using materialized views in Amazon Redshift, we developed a self-serve Cloud Central portal in Tableau to provide a display portal for each team, engineer, and manager offering guidance and recommendations to help them operate in a way that aligns with the organization’s requirements, standards, and budget. Managers are empowered with monitoring and decision-making information for their teams. Engineers are able to identify and tag assets with missing mandatory tagging information, as well as remediate non-compliant resources. A key feature of the portal is personalization, meaning that the portal is enabled to populate visualizations and analysis based on the relevant data associated with a manager’s or engineer’s login information.

Cloud Central also helps engineering teams improve their cloud maturity in the six Well-Architecture pillars: operational excellence, security, reliability, performance efficiency, cost optimization, and sustainability. The team proved out the “art of possible” by poc’ing Amazon Q to assist with 100 and 200 Well-Architected pillar inquiries and how to’s. The following screenshot illustrates the MuleSoft implementation of the portal, Cloud Central. Other companies will design portals that are more bespoke to their own use cases and requirements.

Conclusion

The technical and business impact of MuleSoft’s COE Framework enables an optimization strategy and a cloud usage show back approach which helps MuleSoft continue to grow with a scalable and sustainable cloud infrastructure. The framework also drives continual maturity and benefits in cloud infrastructure centered around the six Well-Architecture pillars shown in the following figure.

The framework helps organizations with expanded public cloud infrastructure achieve their business goals guided by the Well-Architected benefits powered by an event-driven architecture.

The event-driven Amazon Redshift lakehouse architecture solution offers near real-time actionable insights on decision-making, control, and accountability. The event-driven architecutre can be distilled into modules which can be added or deleted depending on your technical/business goals.

The team is exploring new ways to lower the total cost of ownership. They are evaluating Amazon Redshift Serverless for transient database workloads as well as exploring Amazon DataZone to aggregate and correlate data sources into a data catalog to share among teams, applications, and lines of businesses in a democratized way. We can increase visibility, productivity, and scalability with a well-thought-out lakehouse solution.

We invite organizations and enterprises to take a holistic approach to understand their cloud resources, infrastructure, and applications. You can enable and educate your teams through a single pane of glass, while running on a data modernization lakehouse applying Well-Architected concepts, best practices, and cloud-centric principles. This solution can ultimately enable near real-time streaming, leveling up a COE Framework well into the future.


About the Authors

Sean Zou is a Cloud Operations leader with MuleSoft at Salesforce. Sean has been involved in many aspects of MuleSoft’s Cloud Operations, and helped drive MuleSoft’s cloud infrastructure to scale more than tenfold in 7 years. He built the Oversight Engineering function at MuleSoft from scratch.

Terry Quan focuses on FinOps issues. He works on MuleSoft Engineering on cloud computing budgets and forecasting, cost reduction efforts, costs-to-serve, and coordinates with Salesforce Finance. Terry is a FinOps Practitioner and Professional Certified.

Audrey Yuan is a Software Engineer with MuleSoft at Salesforce. Audrey works on data lakehouse solutions to help drive cloud maturity across the six pillars of the Well-Architected Framework.

Rueben Jimenez is a Senior Solutions Architect at AWS, designing and implementing complex data analytics, AI/ML, and cloud infrastructure solutions.

Avijit Goswami is a Principal Solutions Architect at AWS specialized in data and analytics. He supports AWS strategic customers in building high-performing, secure, and scalable data lake solutions on AWS using AWS managed services and open source solutions. Outside of his work, Avijit likes to travel, hike, watch sports, and listen to music.

Batch data ingestion into Amazon OpenSearch Service using AWS Glue

Post Syndicated from Ravikiran Rao original https://aws.amazon.com/blogs/big-data/batch-data-ingestion-into-amazon-opensearch-service-using-aws-glue/

Organizations constantly work to process and analyze vast volumes of data to derive actionable insights. Effective data ingestion and search capabilities have become essential for use cases like log analytics, application search, and enterprise search. These use cases demand a robust pipeline that can handle high data volumes and enable efficient data exploration.

Apache Spark, an open source powerhouse for large-scale data processing, is widely recognized for its speed, scalability, and ease of use. Its ability to process and transform massive datasets has made it an indispensable tool in modern data engineering. Amazon OpenSearch Service—a community-driven search and analytics solution—empowers organizations to search, aggregate, visualize, and analyze data seamlessly. Together, Spark and OpenSearch Service offer a compelling solution for building powerful data pipelines. However, ingesting data from Spark into OpenSearch Service can present challenges, especially with diverse data sources.

This post showcases how to use Spark on AWS Glue to seamlessly ingest data into OpenSearch Service. We cover batch ingestion methods, share practical examples, and discuss best practices to help you build optimized and scalable data pipelines on AWS.

Overview of solution

AWS Glue is a serverless data integration service that simplifies data preparation and integration tasks for analytics, machine learning, and application development. In this post, we focus on batch data ingestion into OpenSearch Service using Spark on AWS Glue.

AWS Glue offers multiple integration options with OpenSearch Service using various open source and AWS managed libraries, including:

In the following sections, we explore each integration method in detail, guiding you through the setup and implementation. As we progress, we incrementally build the architecture diagram shown in the following figure, providing a clear path for creating robust data pipelines on AWS. Each implementation is independent of the others. We chose to showcase them separately, because in a real-world scenario, only one of the three integration methods is likely to be used.

Image showing the high level architecture diagram

You can find the code base in the accompanying GitHub repo. In the following sections, we walk through the steps to implement the solution.

Prerequisites

Before you deploy this solution, make sure the following prerequisites are in place:

Clone the repository to your local machine

Clone the repository to your local machine and set the BLOG_DIR environment variable. All the relative paths assume BLOG_DIR is set to the repository location in your machine. If BLOG_DIR is not being used, adjust the path accordingly.

git clone [email protected]:aws-samples/opensearch-glue-integration-patterns.git
cd opensearch-glue-integration-patterns
export BLOG_DIR=$(pwd)

Deploy the AWS CloudFormation template to create the necessary infrastructure

The main focus of this post is to demonstrate how to use the mentioned libraries in Spark on AWS Glue to ingest data into OpenSearch Service. Though we center on this core topic, several key AWS components will need to be pre-provisioned for the integration examples, such as a Amazon Virtual Private Cloud (Amazon VPC), multiple Subnets, an AWS Key Management Service (AWS KMS) key, an Amazon Simple Storage Service (Amazon S3) bucket, an AWS Glue role, and an OpenSearch Service cluster with domains for OpenSearch Service and Elasticsearch. To simplify the setup, we’ve automated the provisioning of this core infrastructure using the cloudformation/opensearch-glue-infrastructure.yaml AWS CloudFormation template.

  1. Run the following commands

The CloudFormation template will deploy the necessary networking components (such as VPC and subnets), Amazon CloudWatch logging, AWS Glue role, and OpenSearch Service and Elasticsearch domains required to implement the proposed architecture. Use a strong password (8–128 characters, three of which are lowercase, uppercase, numbers, or special characters, and no /, “, or spaces) and adhere to your organization’s security standards for ESMasterUserPassword and OSMasterUserPassword in the following command:

cd ${BLOG_DIR}/cloudformation/
aws cloudformation deploy \
--template-file ${BLOG_DIR}/cloudformation/opensearch-glue-infrastructure.yaml \
--stack-name GlueOpenSearchStack \
--capabilities CAPABILITY_NAMED_IAM \
--region <AWS_REGION> \
--parameter-overrides \
ESMasterUserPassword=<ES_MASTER_USER_PASSWORD> \
OSMasterUserPassword=<OS_MASTER_USER_PASSWORD>

You should see a success message such as "Successfully created/updated stack – GlueOpenSearchStack" after the resources have been provisioned successfully. Provisioning this CloudFormation stack typically takes approximately 30 minutes to complete.

  1. On the AWS CloudFormation console, locate the GlueOpenSearchStack stack, and confirm that its status is CREATE_COMPLETE.

Image showing the "CREATE_COMPLETE" status of cloudformation template

You can review the deployed resources on the Resources tab, as shown in the following screenshot.The screenshot does not display all the created resources.

Image showing the "Resources" tab of cloudformation template

Additional setup steps

In this section, we collect essential information, including the S3 bucket name and the OpenSearch Service and Elasticsearch domain endpoints. These details are required for executing the code in subsequent sections.

Capture the details of the provisioned resources

Use the following AWS CLI command to extract and save the output values from the CloudFormation stack to a file named GlueOpenSearchStack_outputs.txt. We refer to the values in this file in upcoming steps.

aws cloudformation describe-stacks \
--stack-name GlueOpenSearchStack \
--query 'sort_by(Stacks[0].Outputs[], &OutputKey)[].{Key:OutputKey,Value:OutputValue}' \
--output table \
--no-cli-pager \
--region <AWS_REGION> > ${BLOG_DIR}/GlueOpenSearchStack_outputs.txt

Download NY Green Taxi December 2022 dataset and copy to S3 bucket

The purpose of this post is to demonstrate the technical implementation of ingesting data into OpenSearch Service using AWS Glue. Understanding the dataset itself is not essential, aside from its data format, which we discuss in AWS Glue notebooks in later sections. To learn more about the dataset, you can find additional information on the NYC Taxi and Limousine Commission website.

We specifically request that you download the December 2022 dataset, because we have tested the solution using this particular dataset:

S3_BUCKET_NAME=$(awk -F '|' '$2 ~ /S3Bucket/ {gsub(/^[ \t]+|[ \t]+$/, "", $3); print $3}' ${BLOG_DIR}/GlueOpenSearchStack_outputs.txt)
mkdir -p ${BLOG_DIR}/datasets && cd ${BLOG_DIR}/datasets
curl -O https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2022-12.parquet
aws s3 cp green_tripdata_2022-12.parquet s3://${S3_BUCKET_NAME}/datasets/green_tripdata_2022-12.parquet

Download the required JARs from the Maven repository and copy to S3 bucket

We’ve specified a particular JAR file version to ensure stable deployment experience. However, we recommend adhering to your organization’s security best practices and reviewing any known vulnerabilities in the version of the JAR files before deployment. AWS does not guarantee the security of any open-source code used here. Additionally, please verify the downloaded JAR file’s checksum against the published value to confirm its integrity and authenticity.

mkdir -p ${BLOG_DIR}/jars && cd ${BLOG_DIR}/jars
# OpenSearch Service jar
curl -O https://repo1.maven.org/maven2/org/opensearch/client/opensearch-spark-30_2.12/1.0.1/opensearch-spark-30_2.12-1.0.1.jar
aws s3 cp opensearch-spark-30_2.12-1.0.1.jar s3://${S3_BUCKET_NAME}/jars/opensearch-spark-30_2.12-1.0.1.jar
# Elasticsearch jar
curl -O https://repo1.maven.org/maven2/org/elasticsearch/elasticsearch-spark-30_2.12/7.17.23/elasticsearch-spark-30_2.12-7.17.23.jar
aws s3 cp elasticsearch-spark-30_2.12-7.17.23.jar s3://${S3_BUCKET_NAME}/jars/elasticsearch-spark-30_2.12-7.17.23.jar

In the following sections, we implement the individual data ingestion methods as outlined in the architecture diagram.

Ingest data into OpenSearch Service using the OpenSearch Spark library

In this section, we load an OpenSearch Service index using Spark and the OpenSearch Spark library. We demonstrate this implementation by using AWS Glue notebooks, employing basic authentication using user name and password.

To demonstrate the ingestion mechanisms, we have provided the Spark-and-OpenSearch-Code-Steps.ipynb notebook with detailed instructions. Follow the steps in this section in conjunction with the instructions in the notebook.

Set up the AWS Glue Studio notebook

Complete the following steps:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Under Create job, choose Notebook.

Image showing AWS console page for AWS Glue to open notebook

  1. Upload the notebook file located at ${BLOG_DIR}/glue_jobs/Spark-and-OpenSearch-Code-Steps.ipynb.
  2. For IAM role, choose the AWS Glue job IAM role that begins with GlueOpenSearchStack-GlueRole-*.

Image showing AWS console page for AWS Glue to open notebook

  1. Enter a name for the notebook (for example, Spark-and-OpenSearch-Code-Steps) and choose Save.

Image showing AWS Glue OpenSearch Notebook

Replace the placeholder values in the notebook

Complete the following steps to update the placeholders in the notebook:

  1. In Step 1 in the notebook, replace the placeholder <GLUE-INTERACTIVE-SESSION-CONNECTION-NAME> with the AWS Glue interactive session connection name. You can get the name of the interactive session by executing the following command:
cd ${BLOG_DIR}
awk -F '|' '$2 ~ /GlueInteractiveSessionConnectionName/ {gsub(/^[ \t]+|[ \t]+$/, "", $3); print $3}' ${BLOG_DIR}/GlueOpenSearchStack_outputs.txt
  1. In Step 1 in the notebook, replace the placeholder <S3-BUCKET-NAME> and populate the variable s3_bucket with the bucket name. You can get the name of the S3 bucket by executing the following command:
awk -F '|' '$2 ~ /S3Bucket/ {gsub(/^[ \t]+|[ \t]+$/, "", $3); print $3}' ${BLOG_DIR}/GlueOpenSearchStack_outputs.txt
  1. In Step 4 in the notebook, replace <OPEN-SEARCH-DOMAIN-WITHOUT-HTTPS> with the OpenSearch Service domain name. You can get the domain name by executing the following command:
awk -F '|' '$2 ~ /OpenSearchDomainEndpoint/ {gsub(/^[ \t]+|[ \t]+$/, "", $3); print $3}' ${BLOG_DIR}/GlueOpenSearchStack_outputs.txt

Run the notebook

Run each cell of the notebook to load data into the OpenSearch Service domain and read it back to verify the successful load. Refer to the detailed instructions within the notebook for execution-specific guidance.

Spark write modes (append vs. overwrite)

It is recommended to write data incrementally into OpenSearch Service indexes using the append mode, as demonstrated in Step 8 in the notebook. However, in certain cases, you may need to refresh the entire dataset in the OpenSearch Service index. In these scenarios, you can use the overwrite mode, though it is not advised for large indexes. When using overwrite mode, the Spark library deletes rows from the OpenSearch Service index one by one and then rewrites the data, which can be inefficient for large datasets. To avoid this, you can implement a preprocessing step in Spark to identify insertions and updates, and then write the data into OpenSearch Service using append mode.

Ingest data into Elasticsearch using the Elasticsearch Hadoop library

In this section, we load an Elasticsearch index using Spark and the Elasticsearch Hadoop Library. We demonstrate this implementation by using AWS Glue as the engine for Spark.

Set up the AWS Glue Studio notebook

Complete the following steps to set up the notebook:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Under Create job, choose Notebook.

Image showing AWS console page for AWS Glue to open notebook

  1. Upload the notebook file located at ${BLOG_DIR}/glue_jobs/Spark-and-Elasticsearch-Code-Steps.ipynb.
  2. For IAM role, choose the AWS Glue job IAM role that begins with GlueOpenSearchStack-GlueRole-*.

Image showing AWS console page for AWS Glue to open notebook

  1. Enter a name for the notebook (for example, Spark-and-ElasticSearch-Code-Steps) and choose Save.

Image showing AWS Glue Elasticsearch Notebook

Replace the placeholder values in the notebook

Complete the following steps:

  1. In Step 1 in the notebook, replace the placeholder <GLUE-INTERACTIVE-SESSION-CONNECTION-NAME> with the AWS Glue interactive session connection name. You can get the name of the interactive session by executing the following command:
awk -F '|' '$2 ~ /GlueInteractiveSessionConnectionName/ {gsub(/^[ \t]+|[ \t]+$/, "", $3); print $3}' ${BLOG_DIR}/GlueOpenSearchStack_outputs.txt
  1. In Step 1 in the notebook, replace the placeholder <S3-BUCKET-NAME> and populate the variable s3_bucket with the bucket name. You can get the name of the S3 bucket by executing the following command:
awk -F '|' '$2 ~ /S3Bucket/ {gsub(/^[ \t]+|[ \t]+$/, "", $3); print $3}' ${BLOG_DIR}/GlueOpenSearchStack_outputs.txt
  1. In Step 4 in the notebook, replace <ELASTIC-SEARCH-DOMAIN-WITHOUT-HTTPS> with the Elasticsearch domain name. You can get the domain name by executing the following command:
awk -F '|' '$2 ~ /ElasticsearchDomainEndpoint/ {gsub(/^[ \t]+|[ \t]+$/, "", $3); print $3}' ${BLOG_DIR}/GlueOpenSearchStack_outputs.txt

Run the notebook

Run each cell in the notebook to load data to the Elasticsearch domain and read it back to verify the successful load. Refer to the detailed instructions within the notebook for execution-specific guidance.

Ingest data into OpenSearch Service using the AWS Glue OpenSearch Service connection

In this section, we load an OpenSearch Service index using Spark and the AWS Glue OpenSearch Service connection.

Create the AWS Glue job

Complete the following steps to create an AWS Glue Visual ETL job:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Under Create job, choose Visual ETL

This will open the AWS Glue job visual editor.Image showing AWS console page for AWS Glue to open Visual ETL

  1. Choose the plus sign, and under Sources, choose Amazon S3.

Image showing AWS console page for AWS Glue Visual Editor

  1. In the visual editor, choose the Data Source – S3 bucket node.
  2. In the Data source properties – S3 pane, configure the data source as follows:
    • For S3 source type, select S3 location.
    • For S3 URL, choose Browse S3, and choose the green_tripdata_2022-12.parquet file from the designated S3 bucket.
    • For Data format, choose Parquet.
  1. Choose Infer schema to let AWS Glue detect the schema of the data.

This will set up your data source from the specified S3 bucket.

Image showing AWS console page for AWS Glue Visual Editor

  1. Choose the plus sign again to add a new node.
  2. For Transforms, choose Drop Fields to include this transformation step.

This will allow you to remove any unnecessary fields from your dataset before loading it into OpenSearch Service.

Image showing AWS console page for AWS Glue Visual Editor

  1. Choose the Drop Fields transform node, then select the following fields to drop from the dataset:
    • payment_type
    • trip_type
    • congestion_surcharge

This will remove these fields from the data before it is loaded into OpenSearch Service.

Image showing AWS console page for AWS Glue Visual Editor

  1. Choose the plus sign again to add a new node.
  2. For Targets, choose Amazon OpenSearch Service.

This will configure OpenSearch Service as the destination for the data being processed.

Image showing AWS console page for AWS Glue Visual Editor

  1. Choose the Data target – Amazon OpenSearch Service node and configure it as follows:
    • For Amazon OpenSearch Service connection, choose the connection GlueOpenSearchServiceConnec-* from the drop down.
    • For Index, enter green_taxi. The green_taxi index was created earlier in the “Ingest data into OpenSearch Service using the OpenSearch Spark library” section.

This configures the OpenSearch Service to write the processed data to the specified index.

Image showing AWS console page for AWS Glue Visual Editor

  1. On the Job details tab, update the job details as follows:
    • For Name, enter a name (for example, Spark-and-Glue-OpenSearch-Connection).
    • For Description, enter an optional description (for example, AWS Glue job using Glue OpenSearch Connection to load data into Amazon OpenSearch Service).
    • For IAM role, choose the role starting with GlueOpenSearchStack-GlueRole-*.
    • For the Glue version, choose Glue 4.0 – Supports spark 3.3, Scala 2, Python 3
    • Leave the rest of the fields as default.
    • Choose Save to save the changes.

Image showing AWS console page for AWS Glue Visual Editor

  1. To run the AWS Glue job Spark-and-Glue-OpenSearch-Connector, choose Run.

This will initiate the job execution.

Image showing AWS console page for AWS Glue Visual Editor

  1. Choose the Runs tab and wait for the AWS Glue job to complete successfully.

You will see the status change to Succeeded when the job is complete.

Image showing AWS console page for AWS Glue job run status

Clean up

To clean up your resources, complete the following steps:

  1. Delete the CloudFormation stack:
aws cloudformation delete-stack \
--stack-name GlueOpenSearchStack \
--region <AWS_REGION>
  1. Delete the AWS Glue jobs:
    • On the AWS Glue console, under ETL jobs in the navigation pane, choose Visual ETL.
    • Select the jobs you created (Spark-and-Glue-OpenSearch-Connector, Spark-and-ElasticSearch-Code-Steps, and Spark-and-OpenSearch-Code-Steps) and on the Actions menu, choose Delete.

Conclusion

In this post, we explored several ways to ingest data into OpenSearch Service using Spark on AWS Glue. We demonstrated the use of three key libraries: the AWS Glue OpenSearch Service connection, the OpenSearch Spark Library, and the Elasticsearch Hadoop Library. The methods outlined in this post can help you streamline your data ingestion into OpenSearch Service.

If you’re interested in learning more and getting hands-on experience, we’ve created a workshop that walks you through the entire process in detail. You can explore the full setup for ingesting data into OpenSearch Service, handling both batch and real-time streams, and building dashboards. Check out the workshop Unified Real-Time Data Processing and Analytics Using Amazon OpenSearch and Apache Spark to deepen your understanding and apply these techniques step by step.


About the Authors

Ravikiran Rao is a Data Architect at Amazon Web Services and is passionate about solving complex data challenges for various customers. Outside of work, he is a theater enthusiast and amateur tennis player.

Vishwa Gupta is a Senior Data Architect with the AWS Professional Services Analytics Practice. He helps customers implement big data and analytics solutions. Outside of work, he enjoys spending time with family, traveling, and trying new food.

Suvojit Dasgupta is a Principal Data Architect at Amazon Web Services. He leads a team of skilled engineers in designing and building scalable data solutions for AWS customers. He specializes in developing and implementing innovative data architectures to address complex business challenges.This post showcases how to use Spark on AWS Glue to seamlessly ingest data into OpenSearch Service. We cover batch ingestion methods, share practical examples, and discuss best practices to help you build optimized and scalable data pipelines on AWS.

Build a high-performance quant research platform with Apache Iceberg

Post Syndicated from Guy Bachar original https://aws.amazon.com/blogs/big-data/build-a-high-performance-quant-research-platform-with-apache-iceberg/

In our previous post Backtesting index rebalancing arbitrage with Amazon EMR and Apache Iceberg, we showed how to use Apache Iceberg in the context of strategy backtesting. In this post, we focus on data management implementation options such as accessing data directly in Amazon Simple Storage Service (Amazon S3), using popular data formats like Parquet, or using open table formats like Iceberg. Our experiments are based on real-world historical full order book data, provided by our partner CryptoStruct, and compare the trade-offs between these choices, focusing on performance, cost, and quant developer productivity.

Data management is the foundation of quantitative research. Quant researchers spend approximately 80% of their time on necessary but not impactful data management tasks such as data ingestion, validation, correction, and reformatting. Traditional data management choices include relational, SQL, NoSQL, and specialized time series databases. In recent years, advances in parallel computing in the cloud have made object stores like Amazon S3 and columnar file formats like Parquet a preferred choice.

This post explores how Iceberg can enhance quant research platforms by improving query performance, reducing costs, and increasing productivity, ultimately enabling faster and more efficient strategy development in quantitative finance. Our analysis shows that Iceberg can accelerate query performance by up to 52%, reduce operational costs, and significantly improve data management at scale.

Having chosen Amazon S3 as our storage layer, a key decision is whether to access Parquet files directly or use an open table format like Iceberg. Iceberg offers distinct advantages through its metadata layer over Parquet, such as improved data management, performance optimization, and integration with various query engines.

In this post, we use the term vanilla Parquet to refer to Parquet files stored directly in Amazon S3 and accessed through standard query engines like Apache Spark, without the additional features provided by table formats such as Iceberg.

Quant developer and researcher productivity

In this section, we focus on the productivity features offered by Iceberg and how it compares to directly reading files in Amazon S3. As mentioned earlier, 80% of quantitative research work is attributed to data management tasks. Business impact heavily relies on quality data (“garbage in, garbage out”). Quants and platform teams have to ingest data from multiple sources with different velocities and update frequencies, and then validate and correct the data. These activities translate into the ability to run append, insert, update, and delete operations. For simple append operations, both Parquet on Amazon S3 and Iceberg offer similar convenience and productivity. However, real-world data is never perfect and needs to be corrected. Gaps filling (inserts), error corrections and restatements (updates), and removing duplicates (deletes) are the most obvious examples. When writing data in the Parquet format directly to Amazon S3 without using an open table format like Iceberg, you have to write code to identify the affected partition, correct errors, and rewrite the partition. Moreover, if the write job fails or a downstream read job occurs during this write operation, all downstream jobs have the possibility of reading inconsistent data. However, Iceberg has built-in insert, update, and delete features with ACID (Atomicity, Consistency, Isolation, Durability) properties, and the framework itself manages the Amazon S3 mechanics on your behalf.

Guarding against lookahead bias is an essential capability of any quant research platform—what backtests as a profitable trading strategy can render itself useless and unprofitable in real time. Iceberg provides time travel and snapshotting capabilities out of the box to manage lookahead bias that could be embedded in the data (such as delayed data delivery).

Simplified data corrections and updates

Iceberg enhances data management for quants in capital markets through its robust insert, delete, and update capabilities. These features allow efficient data corrections, gap-filling in time series, and historical data updates without disrupting ongoing analyses or compromising data integrity.

Unlike direct Amazon S3 access, Iceberg supports these operations on petabyte-scale data lakes without requiring complex custom code. This simplifies data modification processes, which is crucial for ingesting and updating large volumes of market and trade data, quickly iterating on backtesting and reprocessing workflows, and maintaining detailed audit trails for risk and compliance requirements.

Iceberg’s table format separates data files from metadata files, enabling efficient data modifications without full dataset rewrites. This approach also reduces expensive ListObjects API calls typically needed when directly accessing Parquet files in Amazon S3.

Additionally, Iceberg offers merge on read (MoR) and copy on write (CoW) approaches, providing flexibility for different quant research needs. MoR enables faster writes, suitable for frequently updated datasets, and CoW provides faster reads, beneficial for read-heavy workflows like backtesting.

For example, when a new data source or attribute is added, quant researchers can seamlessly incorporate it into their Iceberg tables and then reprocess historical data, confident they’re using correct, time-appropriate information. This capability is particularly valuable in maintaining the integrity of backtests and the reliability of trading strategies.

In scenarios involving large-scale data corrections or updates, such as adjusting for stock splits or dividend payments across historical data, Iceberg’s efficient update mechanisms significantly reduce processing time and resource usage compared to traditional methods.

These features collectively improve productivity and data management efficiency in quant research environments, allowing researchers to focus more on strategy development and less on data handling complexities.

Historical data access for backtesting and validation

Iceberg’s time travel feature can enable quant developers and researchers to access and analyze historical snapshots of their data. This capability can be useful while performing tasks like backtesting, model validation, and understanding data lineage.

Iceberg simplifies time travel workflows on Amazon S3 by introducing a metadata layer that tracks the history of changes made to the table. You can refer to this metadata layer to create a mental model of how Iceberg’s time travel capability works.

Iceberg’s time travel capability is driven by a concept called snapshots, which are recorded in metadata files. These metadata files act as a central repository that stores table metadata, including the history of snapshots. Additionally, Iceberg uses manifest files to provide a representation of data files, their partitions, and any associated deleted files. These manifest files are referenced in the metadata snapshots, allowing Iceberg to identify the relevant data for a specific point in time.

When a user requests a time travel query, the typical workflow involves querying a specific snapshot. Iceberg uses the snapshot identifier to locate the corresponding metadata snapshot in the metadata files. The time travel capability is invaluable to quants, enabling them to backtest and validate strategies against historical data, reproduce and debug issues, perform what-if analysis, comply with regulations by maintaining audit trails and reproducing past states, and roll back and recover from data corruption or errors. Quants can also gain deeper insights into current market trends and correlate them with historical patterns. Also, the time travel feature can further mitigate any risks of lookahead bias. Researchers can access the exact data snapshots that were present in the past, and then run their models and strategies against this historical data, without the risk of inadvertently incorporating future information.

Seamless integration with familiar tools

Iceberg provides a variety of interfaces that enable seamless integration with the open source tools and AWS services that quant developers and researchers are familiar with.

Iceberg provides a comprehensive SQL interface that allows quant teams to interact with their data using familiar SQL syntax. This SQL interface is compatible with popular query engines and data processing frameworks, such as Spark, Trino, Amazon Athena, and Hive. Quant developers and researchers can use their existing SQL knowledge and tools to query, filter, aggregate, and analyze their data stored in Iceberg tables.

In addition to the primary interface of SQL, Iceberg also provides the DataFrame API, which allows quant teams to programmatically interact with their data with popular distributed data processing frameworks like Spark and Flink as well as thin clients like PyIceberg. Quants can further use this API to build more programmatic approaches to access and manipulate data, allowing for the implementation of custom logic and integration of Iceberg with other AWS ecosystems like Amazon EMR.

Although accessing data from Amazon S3 is a viable option, Iceberg provides several advantages like metadata management, performance optimization using partition pruning, data manipulation, and a rich AWS ecosystem integration including services like Athena and Amazon EMR with more seamless and feature-rich data processing experience.

Undifferentiated heavy lifting

Data partitioning is one of major contributing factors to optimizing aggregate throughput to and from Amazon S3, contributing to overall High Performance Computing (HPC) environment price-performance.

Quant researchers often face performance bottlenecks and complex data management challenges when dealing with large-scale datasets in Amazon S3. As discussed in Best practices design patterns: optimizing Amazon S3 performance, single prefix performance is limited to 3,500 PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per partitioned Amazon S3 prefix. Iceberg’s metadata layer and intelligent partitioning strategies automatically optimize data access patterns, reducing the likelihood of I/O throttling and minimizing the need for manual performance tuning. This automation allows quant teams to focus on developing and refining trading strategies rather than troubleshooting data access issues or optimizing storage layouts.

In this section, we discuss situations we discovered while running our experiments at scale and solutions provided by Iceberg vs. vanilla Parquet when accessing data in Amazon S3.

As we mentioned in the introduction, the nature of quant research is “fail fast”—new ideas have to be quickly evaluated and then either prioritized for a deep dive or dismissed. This makes it impossible to come up with universal partitioning that works all the time and for all research styles.

When accessing data directly as Parquet files in Amazon S3, without using an open table format like Iceberg, partitioning and throttling issues can arise. Partitioning in this case is determined by the physical layout of files in Amazon S3, and a mismatch between the intended partitioning and the actual file layout can lead to I/O throttling exceptions. Additionally, listing directories in Amazon S3 can also result in throttling exceptions due to the high number of API calls required.

In contrast, Iceberg provides a metadata layer that abstracts away the physical file layout in Amazon S3. Partitioning is defined at the table level, and Iceberg handles the mapping between logical partitions and the underlying file structure. This abstraction helps mitigate partitioning issues and reduces the likelihood of I/O throttling exceptions. Furthermore, Iceberg’s metadata caching mechanism minimizes the number of List API calls required, addressing the directory listing throttling issue.

Although both approaches involve direct access to Amazon S3, Iceberg is an open table format that introduces a metadata layer, providing better partitioning management and reducing the risk of throttling exceptions. It doesn’t act as a database itself, but rather as a data format and processing engine on top of the underlying storage (in this case, Amazon S3).

One of the most effective techniques to address Amazon S3 API quota limits is salting (random hash prefixes)—a method that adds random partition IDs to Amazon S3 paths. This increases the probability of prefixes residing on different physical partitions, helping distribute API requests more evenly. Iceberg supports this functionality out of the box for both data ingestion and reading.

Implementing salting directly in Amazon S3 requires complex custom code to create and use partitioning schemes with random keys in the naming hierarchy. This approach necessitates a custom data catalog and metadata system to map physical paths to logical paths, allowing direct partition access without relying on Amazon S3 List API calls. Without such a system, applications risk exceeding Amazon S3 API quotas when accessing specific partitions.

At petabyte scale, Iceberg’s advantages become clear. It efficiently manages data through the following features:

  • Directory caching
  • Configurable partitioning strategies (range, bucket)
  • Data management functionality (compaction)
  • Catalog, metadata, and statistics use for optimal execution plans

These built-in features eliminate the need for custom solutions to manage Amazon S3 API quotas and data organization at scale, reducing development time and maintenance costs while improving query performance and reliability.

Performance

We highlighted a lot of the functionality of Iceberg that eliminates undifferentiated heavy lifting and improves developer and quant productivity. What about performance?

This section evaluates whether Iceberg’s metadata layer introduces overhead or delivers optimization for quantitative research use cases, comparing it with vanilla Parquet access on Amazon S3. We examine how these approaches impact common quant research queries and workflows.

The key question is whether Iceberg’s metadata layer, designed to optimize vanilla Parquet access on Amazon S3, introduces overhead or delivers the intended optimization for quantitative research use cases. Then we discuss overlapping optimization techniques, such as data distribution and sorting. We also discuss that there is no magic partitioning and all sorting scheme where one size fits all in the context of quant research. Our benchmarks show that Iceberg performs comparably to direct Amazon S3 access, with additional optimizations from its metadata and statistics usage, similar to database indexing.

Vanilla Parquet vs Iceberg: Amazon S3 read performance

We created four different datasets: two using Iceberg and two with direct Amazon S3 Parquet access, each with both sorted and unsorted write distributions. The purpose of this exercise was to compare the performance of direct Amazon S3 Parquet access vs. the Iceberg open table format, taking into account the impact of write distribution patterns when running various queries commonly used in quantitative trading research.

Query 1

We first run a simple count query to get the total number of records in the table. This query helps understand the baseline performance for a straightforward operation. For example, if the table contains tick-level market data for various financial instruments, the count can give an idea of the total number of data points available for analysis.

The following is the code for vanilla Parquet:

count = spark.read.parquet(s3://example-s3-bucket/path/to/data).count()

The following is the code for Iceberg:

count = spark.read.table(table_name).count()
# We used typical count query for the performance comparision however this could have been also done using metadata as shown below which completes in few seconds 
spark.read.format("iceberg").load(f"{table_name}.files").select(sum("record_count")).show(truncate=False)

Query 2

Our second query is a grouping and counting query to find the number of records for each combination of exchange_code and instrument. This query is commonly used in quantitative trading research to analyze market liquidity and trading activity across different instruments and exchanges.

The following is the code for vanilla Parquet:

spark.read.parquet(s3://example-s3-bucket/path/to/data) \
         .groupBy("exchange_code", "instrument") \
         .count() \
         .orderBy("count", ascending=False) \
         .count().show(truncate=False)

The following is the code for Iceberg:

spark.read.table(table_name) \
        .groupBy("exchange_code", "instrument") \
        .count() \
        .orderBy("count", ascending=False) \
        .show(truncate=False)

Query 3

Next, we run a distinct query to retrieve the distinct combinations of year, month, and day from the adapterTimestamp_ts_utc column. In quantitative trading research, this query can be helpful for understanding the time range covered by the dataset. Researchers can use this information to identify periods of interest for their analysis, such as specific market events, economic cycles, or seasonal patterns.

 The following is the code for vanilla Parquet:

spark.read.parquet(s3://example-s3-bucket/path/to/data) \
         .select(f.year("adapterTimestamp_ts_utc").alias("year"),
                 f.month("adapterTimestamp_ts_utc").alias("month"),
                 f.dayofmonth("adapterTimestamp_ts_utc").alias("day")) \
         .distinct() \
         .count() \
         .show(truncate=False)

The following is the code for Iceberg:

spark.read.table(table_name) \
        .select(f.year("adapterTimestamp_ts_utc").alias("year"),
                f.month("adapterTimestamp_ts_utc").alias("month"),
                f.dayofmonth("adapterTimestamp_ts_utc").alias("day")) \
        .distinct() \
        .count() \
        .show(truncate=False)

Query 4

Lastly, we run a grouping and counting query with a date range filter on the adapterTimestamp_ts_utc column. This query is similar to Query 2 but focuses on a specific time period. You could use this query to analyze market activity or liquidity during specific time periods, such as periods of high volatility, market crashes, or economic events. Researchers can use this information to identify potential trading opportunities or investigate the impact of these events on market dynamics.

 The following is the code for vanilla Parquet:

spark.read.parquet(s3://example-s3-bucket/path/to/data) \
         .filter((f.col("adapterTimestamp_ts_utc") >= "2023-04-17 00:00:00") &
                 (f.col("adapterTimestamp_ts_utc") <= "2023-04-18 23:59:59.999")) \
         .groupBy("exchange_code", "instrument") \
         .count() \
         .orderBy("count", ascending=False) \
         .show(truncate=False)

The following is the code for Iceberg. Because Iceberg has a metadata layer, the row count can be fetched from metadata:

spark.read.table(table_name) \
        .filter((f.col("adapterTimestamp_ts_utc") >= "2023-04-17 00:00:00") &
                (f.col("adapterTimestamp_ts_utc") <= "2023-04-18 23:59:59.999")) \
        .groupBy("exchange_code", "instrument") \
        .count() \
        .orderBy("count", ascending=False) \
        .show(truncate=False)

Test results

To evaluate the performance and cost benefits of using Iceberg for our quant research data lake, we created four different datasets: two with Iceberg tables and two with direct Amazon S3 Parquet access, each using both sorted and unsorted write distributions. We first ran AWS Glue write jobs to create the Iceberg tables and then mirrored the same write processes for the Amazon S3 Parquet datasets. For the unsorted datasets, we partitioned the data by exchange and instrument, and for the sorted datasets, we added a sort key on the time column.

Next, we ran a series of queries commonly used in quantitative trading research, including simple count queries, grouping and counting, distinct value queries, and queries with date range filters. Our benchmarking process involved reading data from Amazon S3, performing various transformations and joins, and writing the processed data back to Amazon S3 as Parquet files.

By comparing runtimes and costs across different data formats and write distributions, we quantified the benefits of Iceberg’s optimized data organization, metadata management, and efficient Amazon S3 data handling. The results showed that Iceberg not only enhanced query performance without introducing significant overhead, but also reduced the likelihood of task failures, reruns, and throttling issues, leading to more stable and predictable job execution, particularly with large datasets stored in Amazon S3.

AWS Glue write jobs

In the following table, we compare the performance and the cost implications of using Iceberg vs. vanilla Parquet access on Amazon S3, taking into account the following use cases:

  • Iceberg table (unsorted) – We created an Iceberg table partitioned by exchange_code and instrument This means that the data was physically partitioned in Amazon S3 based on the unique combinations of exchange_code and instrument values. Partitioning the data in this way can improve query performance, because Iceberg can prune out partitions that aren’t relevant to a particular query, reducing the amount of data that needs to be scanned. The data was not sorted on any column in this case, which is the default behavior.
  • Vanilla Parquet (unsorted) – For this use case, we wrote the data directly as Parquet files to Amazon S3, without using Iceberg. We repartitioned the data by exchange_code and instrument columns using standard hash partitioning before writing it out. Repartitioning was necessary to avoid potential throttling issues when reading the data later, because accessing data directly from Amazon S3 without intelligent partitioning can lead to too many requests hitting the same S3 prefix. Like the Iceberg table, the data was not sorted on any column in this case. To make comparison fair, we used the exact repartition count that Iceberg uses.
  • Iceberg table (sorted) – We created another Iceberg table, this time partitioned by exchange_code and instrument Additionally, we sorted the data in this table on the adapterTimestamp_ts_utc column. Sorting the data can improve query performance for certain types of queries, such as those that involve range filters or ordered outputs. Iceberg automatically handles the sorting and partitioning of the data transparently to the user.
  • Vanilla Parquet (sorted) – For this use case, we again wrote the data directly as Parquet files to Amazon S3, without using Iceberg. We repartitioned the data by range on the exchange_code, instrument, and adapterTimestamp_ts_utc columns before writing it out using standard range partitioning with 1996 partition count, because this was what Iceberg was using based on SparkUI. Repartitioning on the time column (adapterTimestamp_ts_utc) was necessary to achieve a sorted write distribution, because Parquet files are sorted within each partition. This sorted write distribution can improve query performance for certain types of queries, similar to the sorted Iceberg table.
Write Distribution Pattern Iceberg Table (Unsorted) Vanilla Parquet (Unsorted) Iceberg Table (Sorted) Vanilla Parquet
(Sorted)
DPU Hours 899.46639 915.70222 1402 1365
Number of S3 Objects 7444 7288 9283 9283
Size of S3 Parquet Objects 567.7 GB 629.8 GB 525.6 GB 627.1 GB
Runtime 1h 51m 40s 1h 53m 29s 2h 52m 7s 2h 47m 36s

AWS Glue read jobs

For the AWS Glue read jobs, we ran a series of queries commonly used in quantitative trading research, such as simple counts, grouping and counting, distinct value queries, and queries with date range filters. We compared the performance of these queries between the Iceberg tables and the vanilla Parquet files read in Amazon S3. In the following table, you can see two AWS Glue jobs that show the performance and cost implications of access patterns described earlier.

Read Queries / Runtime in Seconds Iceberg Table Vanilla Parquet
COUNT(1) on unsorted 35.76s 74.62s
GROUP BY and ORDER BY on unsorted 34.29s 67.99s
DISTINCT and SELECT on unsorted 51.40s 82.95s
FILTER and GROUP BY and ORDER BY on unsorted 25.84s 49.05s
COUNT(1) on sorted 15.29s 24.25s
GROUP BY and ORDER BY on sorted 15.88s 28.73s
DISTINCT and SELECT on sorted 30.85s 42.06s
FILTER and GROUP BY and ORDER BY on sorted 15.51s 31.51s
AWS Glue DPU hours 45.98 67.97

Test results insights

These test results offered the following insights:

  • Accelerated query performance – Iceberg improved read operations by up to 52% for unsorted data and 51% for sorted data. This speed boost enables quant researchers to analyze larger datasets and test trading strategies more rapidly. In quantitative finance, where speed is crucial, this performance gain allows teams to uncover market insights faster, potentially gaining a competitive edge.
  • Reduced operational costs – For read-intensive workloads, Iceberg reduced DPU hours by 32.4% and achieved a 10–16% reduction in Amazon S3 storage. These efficiency gains translate to cost savings in data-intensive quant operations. With Iceberg, firms can run more comprehensive analyses within the same budget or reallocate resources to other high-value activities, optimizing their research capabilities.
  • Enhanced data management and scalability – Iceberg showed comparable write performance for unsorted data (899.47 DPU hours vs. 915.70 for vanilla Parquet) and maintained consistent object counts across sorted and unsorted scenarios (7,444 and 9,283, respectively). This consistency leads to more reliable and predictable job execution. For quant teams dealing with large-scale datasets, this reduces time spent on troubleshooting data infrastructure issues and increases focus on developing trading strategies.
  • Improved productivity – Iceberg outperformed vanilla Parquet access across various query types. Simple counts were 52.1% faster, grouping and ordering operations improved by 49.6%, and filtered queries were 47.3% faster for unsorted data. This performance enhancement boosts productivity in quant research workflows. It reduces query completion times, allowing quant developers and researchers to spend more time on model development and market analysis, leading to faster iteration on trading strategies.

Conclusion

Quant research platforms often avoid adopting new data management solutions like Iceberg, fearing performance penalties and increased costs. Our analysis disproves these concerns, demonstrating that Iceberg not only matches or enhances performance compared to direct Amazon S3 access, but also provides substantial additional benefits.

Our tests reveal that Iceberg significantly accelerates query performance, with improvements of up to 52% for unsorted data and 51% for sorted data. This speed boost enables quant researchers to analyze larger datasets and test trading strategies more rapidly, potentially uncovering valuable market insights faster.

Iceberg streamlines data management tasks, allowing researchers to focus on strategy development. Its robust insert, update, and delete capabilities, combined with time travel features, enable effortless management of complex datasets, improving backtest accuracy and facilitating rapid strategy iteration.

The platform’s intelligent handling of partitioning and Amazon S3 API quota issues eliminates undifferentiated heavy lifting, freeing quant teams from low-level data engineering tasks. This automation redirects efforts to high-value activities such as model development and market analysis. Moreover, our tests show that for read-intensive workloads, Iceberg reduced DPU hours by 32.4% and achieved a 10–16% reduction in Amazon S3 storage, leading to significant cost savings.

Flexibility is a key advantage of Iceberg. Its various interfaces, including SQL, DataFrames, and programmatic APIs, integrate seamlessly with existing quant research workflows, accommodating diverse analysis needs and coding preferences.

By adopting Iceberg, quant research teams gain both performance enhancements and powerful data management tools. This combination creates an environment where researchers can push analytical boundaries, maintain high data integrity standards, and focus on generating valuable insights. The improved productivity and reduced operational costs enable quant teams to allocate resources more effectively, ultimately leading to a more competitive edge in quantitative finance.


About the Authors

Guy Bachar is a Senior Solutions Architect at AWS based in New York. He specializes in assisting capital markets customers with their cloud transformation journeys. His expertise encompasses identity management, security, and unified communication.

Sercan KaraogluSercan Karaoglu is Senior Solutions Architect, specialized in capital markets. He is a former data engineer and passionate about quantitative investment research.

Boris LitvinBoris Litvin is a Principal Solutions Architect at AWS. His job is in financial services industry innovation. Boris joined AWS from the industry, most recently Goldman Sachs, where he held a variety of quantitative roles across equity, FX, and interest rates, and was CEO and Founder of a quantitative trading FinTech startup.

Salim TutuncuSalim Tutuncu is a Senior Partner Solutions Architect Specialist on Data & AI, based in Dubai with a focus on the EMEA. With a background in the technology sector that spans roles as a data engineer, data scientist, and machine learning engineer, Salim has built a formidable expertise in navigating the complex landscape of data and artificial intelligence. His current role involves working closely with partners to develop long-term, profitable businesses using the AWS platform, particularly in data and AI use cases.

Alex TarasovAlex Tarasov is a Senior Solutions Architect working with Fintech startup customers, helping them to design and run their data workloads on AWS. He is a former data engineer and is passionate about all things data and machine learning.

Jiwan PanjikerJiwan Panjiker is a Solutions Architect at Amazon Web Services, based in the Greater New York City area. He works with AWS enterprise customers, helping them in their cloud journey to solve complex business problems by making effective use of AWS services. Outside of work, he likes spending time with his friends and family, going for long drives, and exploring local cuisine.

Amazon Q data integration adds DataFrame support and in-prompt context-aware job creation

Post Syndicated from Bo Li original https://aws.amazon.com/blogs/big-data/amazon-q-data-integration-adds-dataframe-support-and-in-prompt-context-aware-job-creation/

Amazon Q data integration, introduced in January 2024, allows you to use natural language to author extract, transform, load (ETL) jobs and operations in AWS Glue specific data abstraction DynamicFrame. This post introduces exciting new capabilities for Amazon Q data integration that work together to make ETL development more efficient and intuitive. We’ve added support for DataFrame-based code generation that works across any Spark environment. We’ve also introduced in-prompt context-aware development that applies details from your conversations, working seamlessly with a new iterative development experience. This means you can refine your ETL jobs through natural follow-up questions—starting with a basic data pipeline and progressively adding transformations, filters, and business logic through conversation. These improvements are available through the Amazon Q chat experience on the AWS Management Console, and the Amazon SageMaker Unified Studio (preview) visual ETL and notebook interfaces.

The DataFrame code generation now extends beyond AWS Glue DynamicFrame to support a broader range of data processing scenarios. You can now generate data integration jobs for various data sources and destinations, including Amazon Simple Storage Service (Amazon S3) data lakes with popular file formats like CSV, JSON, and Parquet, as well as modern table formats such as Apache Hudi, Delta, and Apache Iceberg. Amazon Q can generate ETL jobs for connecting to over 20 different data sources, including relational databases like PostgreSQL, MySQL and Oracle; data warehouses like Amazon Redshift, Snowflake, and Google BigQuery; NoSQL databases like Amazon DynamoDB, MongoDB, and OpenSearch; tables defined in the AWS Glue Data Catalog; and custom user-supplied JDBC and Spark connectors. Your generated jobs can use a variety of data transformations, including filters, projections, unions, joins, and aggregations, giving you the flexibility to handle complex data processing requirements.

In this post, we discuss how Amazon Q data integration transforms ETL workflow development.

Improved capabilities of Amazon Q data integration

Previously, Amazon Q data integration only generated code with template values that required you to fill in the configurations such as connection properties for data source and data sink and the configurations for transforms manually. With in-prompt context awareness, you can now include this information in your natural language query, and Amazon Q data integration will automatically extract and incorporate it into the workflow. In addition, generative visual ETL in the SageMaker Unified Studio (preview) visual editor allows you to reiterate and refine your ETL workflow with new requirements, enabling incremental development.

Solution overview

This post describes the end-to-end user experiences to demonstrate how Amazon Q data integration and SageMaker Unified Studio (preview) simplify your data integration and data engineering tasks with the new enhancements, by building a low-code no-code (LCNC) ETL workflow that enables seamless data ingestion and transformation across multiple data sources.

We demonstrate how to do the following:

  • Connect to diverse data sources
  • Perform table joins
  • Apply custom filters
  • Export processed data to Amazon S3

The following diagram illustrates the architecture.

Using Amazon Q data integration with Amazon SageMaker Unified Studio (preview)

In the first example, we use Amazon SageMaker Unified Studio (preview) to develop a visual ETL workflow incrementally. This pipeline reads data from different Amazon S3 based Data Catalog tables, performs transformations on the data, and writes the transformed data back into an Amazon S3. We use the allevents_pipe and venue_pipe files from the TICKIT dataset to demonstrate this capability. The TICKIT dataset records sales activities on the fictional TICKIT website, where users can purchase and sell tickets online for different types of events such as sports games, shows, and concerts.

The process involves merging the allevents_pipe and venue_pipe files from the TICKIT dataset. Next, the merged data is filtered to include only a specific geographic region. Then the transformed output data is saved to Amazon S3 for further processing in future.

Data preparation

The two datasets are hosted as two Data Catalog tables, venue and event, in a project in Amazon SageMaker Unified Studio (preview), as shown in the following screenshots.

Data processing

To process the data, complete the following steps:

  1. On the Amazon SageMaker Unified Studio console, on the Build menu, choose Visual ETL flow.

An Amazon Q chat window will help you provide a description for the ETL flow to be built.

  1. For this post, enter the following text:
    Create a Glue ETL flow connect to 2 Glue catalog tables venue and event in my database glue_db_4fthqih3vvk1if, join the results on the venue’s venueid and event’s e_venueid, and write output to a S3 location.
    (The database name is generated with the project ID suffixed to the given database name automatically).
  2. Choose Submit.

An initial data integration flow will be generated as shown in the following screenshot to read from the two Data Catalog tables, join the results, and write to Amazon S3. We can see the join conditions are correctly inferred from our request from the join node configuration displayed.

Let’s add another filter transform based on the venue state as DC.

  1. Choose the plus sign and choose the Amazon Q icon to ask a follow-up question.
  2. Enter the instructions filter on venue state with condition as venuestate==‘DC’ after joining the results to modify the workflow.

The workflow is updated with a new filter transform.

Upon checking the S3 data target, we can see the S3 path is now a placeholder <s3-path> and the output format is Parquet.

  1. We can ask the following question in Amazon Q:
    update the s3 sink node to write to s3://xxx-testing-in-356769412531/output/ in CSV format
    in the same way to update the Amazon S3 data target.
  2. Choose Show script to see the generated code is DataFrame based, with all context in place from all of our conversation.
  3. Finally, we can preview the data to be written to the target S3 path. Note that the data is a joined result with only the venue state DC included.

With Amazon Q data integration with Amazon SageMaker Unified Studio (preview), an LCNC user can create the visual ETL workflow by providing prompts to Amazon Q and the context for data sources and transformations are preserved. Subsequently, Amazon Q also generated the DataFrame-based code for data engineers or more experienced users to use the automatic ETL generated code for scripting purposes.

Amazon Q data integration with Amazon SageMaker Unified Studio (preview) notebook

Amazon Q data integration is also available in the Amazon SageMaker Unified Studio (preview) notebook experience. You can add a new cell and enter your comment to describe what you want to achieve. After you press Tab and Enter, the recommended code is shown.

For example, we provide the same initial question:

Create a Glue ETL flow to connect to 2 Glue catalog tables venue and event in my database glue_db_4fthqih3vvk1if, join the results on the venue’s venueid and event’s e_venueid, and write output to a S3 location.

Similar to the Amazon Q chat experience, the code is recommended. If you press Tab, then the recommended code is chosen.

The following video provides a full demonstration of these two experiences in Amazon SageMaker Unified Studio (preview).

Using Amazon Q data integration with AWS Glue Studio

In this section, we walk through the steps to use Amazon Q data integration with AWS Glue Studio

Data preparation

The two datasets are hosted in two Amazon S3 based Data Catalog tables, event and venue, in the database glue_db, which we can query from Amazon Athena. The following screenshot shows an example of the venue table.

Data processing

To start using the AWS Glue code generation capability, use the Amazon Q icon on the AWS Glue Studio console. You can start authoring a new job, and ask Amazon Q the question to create the same workflow:

Create a Glue ETL flow connect to 2 Glue catalog tables venue and event in my database glue_db, join the results on the venue’s venueid and event’s e_venueid, and then filter on venue state with condition as venuestate=='DC' and write to s3://<s3-bucket>/<folder>/output/ in CSV format.

You can see the same code is generated with all configurations in place. With this response, you can learn and understand how you can author AWS Glue code for your needs. You can copy and paste the generated code to the script editor. After you configure an AWS Identity and Access Management (IAM) role on the job, save and run the job. When the job is complete, you can begin querying the data exported to Amazon S3.

After the job is complete, you can verify the joined data by checking the specified S3 path. The data is filtered by venue state as DC and is now ready for downstream workloads to process.

The following video provides a full demonstration of the experience with AWS Glue Studio.

Conclusion

In this post, we explored how Amazon Q data integration transforms ETL workflow development, making it more intuitive and time-efficient, with the latest enhancement of in-prompt context awareness to accurately generate a data integration flow with reduced hallucinations, and multi-turn chat capabilities to incrementally update the data integration flow, add new transforms and update DAG nodes. Whether you’re working with the console or other Spark environments in SageMaker Unified Studio (preview), these new capabilities can significantly reduce your development time and complexity.

To learn more, refer to Amazon Q data integration in AWS Glue.


About the Authors

Bo Li is a Senior Software Development Engineer on the AWS Glue team. He is devoted to designing and building end-to-end solutions to address customers’ data analytic and processing needs with cloud-based, data-intensive technologies.

Stuti Deshpande is a Big Data Specialist Solutions Architect at AWS. She works with customers around the globe, providing them strategic and architectural guidance on implementing analytics solutions using AWS. She has extensive experience in big data, ETL, and analytics. In her free time, Stuti likes to travel, learn new dance forms, and enjoy quality time with family and friends.

Kartik Panjabi is a Software Development Manager on the AWS Glue team. His team builds generative AI features for the Data Integration and distributed system for data integration.

Shubham Mehta is a Senior Product Manager at AWS Analytics. He leads generative AI feature development across services such as AWS Glue, Amazon EMR, and Amazon MWAA, using AI/ML to simplify and enhance the experience of data practitioners building data applications on AWS.

Accelerate queries on Apache Iceberg tables through AWS Glue auto compaction

Post Syndicated from Navnit Shukla original https://aws.amazon.com/blogs/big-data/accelerate-queries-on-apache-iceberg-tables-through-aws-glue-auto-compaction/

Data lakes were originally designed to store large volumes of raw, unstructured, or semi-structured data at a low cost, primarily serving big data and analytics use cases. Over time, as organizations began to explore broader applications, data lakes have become essential for various data-driven processes beyond just reporting and analytics. Today, they play a critical role in syncing with customer applications, enabling the ability to manage concurrent data operations while maintaining the integrity and consistency of information. This shift includes not only storing batch data but also ingesting and processing near real-time data streams, allowing businesses to merge historical insights with live data to power more responsive and adaptive decision-making. However, this new data lake architecture brings challenges around managing transactional support and handling the influx of small files generated by real-time data streams. Traditionally, customers addressed these challenges by performing complex extract, transform, and load (ETL) processes, which often led to data duplication and increased complexity in data pipelines. Additionally, to cope with the proliferation of small files, organizations had to develop custom mechanisms to compact and merge these files, leading to the creation and maintenance of bespoke solutions that were difficult to scale and manage. As data lakes increasingly handle sensitive business data and transactional workloads, maintaining strong data quality, governance, and compliance becomes vital to maintaining trust and regulatory alignment.

To simplify these challenges, organizations have adopted open table formats (OTFs) like Apache Iceberg, which provide built-in transactional capabilities and mechanisms for compaction. OTFs, such as Iceberg, address key limitations in traditional data lakes by offering features like ACID transactions, which maintain data consistency across concurrent operations, and compaction, which helps manage the issue of small files by merging them efficiently. By using features like Iceberg’s compaction, OTFs streamline maintenance, making it straightforward to manage object and metadata versioning at scale. However, although OTFs reduce the complexity of maintaining efficient tables, they still require some regular maintenance to make sure tables remain in an optimal state.

In this post, we explore new features of the AWS Glue Data Catalog, which now supports improved automatic compaction of Iceberg tables for streaming data, making it straightforward for you to keep your transactional data lakes consistently performant. Enabling automatic compaction on Iceberg tables reduces metadata overhead on your Iceberg tables and improves query performance. Many customers have streaming data continuously ingested in Iceberg tables, resulting in a large number of delete files that track changes in data files. With this new feature, as you enable the Data Catalog optimizer. It constantly monitors table partitions and runs the compaction process for both data and delta or delete files, and it regularly commits partial progress. The Data Catalog also now supports heavily nested complex data and supports schema evolution as you reorder or rename columns.

Automatic compaction with AWS Glue

Automatic compaction in the Data Catalog makes sure your Iceberg tables are always in optimal condition. The data compaction optimizer continuously monitors table partitions and invokes the compaction process when specific thresholds for the number of files and file sizes are met. For example, based on the Iceberg table configuration of the target file size, the compaction process will start and continue if the table or any of the partitions within the table have more than the default configuration (for example 100 files), each smaller than 75% of the target file size.

Iceberg supports two table modes: Merge-on-Read (MoR) and Copy-on-Write (CoW). These table modes provide different approaches for handling data updates and play a critical role in how data lakes manage changes and maintain performance:

  • Data compaction on Iceberg CoW – With CoW, any updates or deletes are directly applied to the table files. This means the entire dataset is rewritten when changes are made. Although this provides immediate consistency and simplifies reads (because readers only access the latest snapshot of the data), it can become costly and slow for write-heavy workloads due to the need for frequent rewrites. Announced during AWS re:Invent 2023, this feature focuses on optimizing data storage for Iceberg tables using the CoW mechanism. Compaction in CoW makes sure updates to the data result in new files being created, which are then compacted to improve query performance.
  • Data compaction on Iceberg MoR – Unlike CoW, MoR allows updates to be written separately from the existing dataset, and those changes are only merged when the data is read. This approach is beneficial for write-heavy scenarios because it avoids frequent full table rewrites. However, it can introduce complexity during reads because the system has to merge base and delta files as needed to provide a complete view of the data. MoR compaction, now generally available, allows for efficient handling of streaming data. It makes sure that while data is being continuously ingested, it’s also compacted in a way that optimizes read performance without compromising the ingestion speed.

Whether you are using CoW, MoR, or a hybrid of both, one challenge remains consistent: maintenance around the growing number of small files generated by each transaction. AWS Glue automatic compaction addresses this by making sure your Iceberg tables remain efficient and performant across both table modes.

This post provides a detailed comparison of query performance between auto compacted and non-compacted Iceberg tables. By analyzing key metrics such as query latency and storage efficiency, we demonstrate how the automatic compaction feature optimizes data lakes for better performance and cost savings. This comparison will help guide you in making informed decisions on enhancing your data lake environments.

Solution overview

This blog post explores the performance benefits of the newly launched feature in AWS Glue that supports automatic compaction of Iceberg tables with MoR capabilities. We run two versions of the same architecture: one where the tables are auto compacted, and another without compaction. By comparing both scenarios, this post demonstrates the efficiency, query performance, and cost benefits of auto compacted tables vs. non-compacted tables in a simulated Internet of Things (IoT) data pipeline.

The following diagram illustrates the solution architecture.

The solution consists of the following components:

  • Amazon Elastic Compute Cloud (Amazon EC2) simulates continuous IoT data streams, sending them to Amazon MSK for processing
  • Amazon Managed Streaming for Apache Kafka (Amazon MSK) ingests and streams data from the IoT simulator for real-time processing
  • Amazon EMR Serverless processes streaming data from Amazon MSK without managing clusters, writing results to the Amazon S3 data lake
  • Amazon Simple Storage Service (Amazon S3) stores data using Iceberg’s MoR format for efficient querying and analysis
  • The Data Catalog manages metadata for the datasets in Amazon S3, enabling organized data discovery and querying through Amazon Athena
  • Amazon Athena queries data from the S3 data lake with two table options:
    • Non-compacted table – Queries raw data from the Iceberg table
    • Compacted table – Queries data optimized by automatic compaction for faster performance.

The data flow consists of the following steps:

  1. The IoT simulator on Amazon EC2 generates continuous data streams.
  2. The data is sent to Amazon MSK, which acts as a streaming table.
  3. EMR Serverless processes streaming data and writes the output to Amazon S3 in Iceberg format.
  4. The Data Catalog manages the metadata for the datasets.
  5. Athena is used to query the data, either directly from the non-compacted table or from the compacted table after auto compaction.

In this post, we guide you through setting up an evaluation environment for AWS Glue Iceberg auto compaction performance using the following GitHub repository. The process involves simulating IoT data ingestion, deduplication, and querying performance using Athena.

Compaction IoT performance test

We simulated IoT data ingestion with over 20 billion events and used MERGE INTO for data deduplication across two time-based partitions, involving heavy partition reads and shuffling. After ingestion, we ran queries in Athena to compare performance between compacted and non-compacted tables using the MoR format. This test aims to have low latency on ingestion but will lead to hundreds of millions of small files.

We use the following table configuration settings:

'write.delete.mode'='merge-on-read'
'write.update.mode'='merge-on-read'
'write.merge.mode'='merge-on-read'
'write.distribution.mode=none'

We use 'write.distribution.mode=none' to lower the latency. However, it will increase the number of Parquet files. For other scenarios, you may want to use hash or range distribution write modes to reduce the file count.

This test makes make append operations because we’re appending new data to the table but we don’t have any delete operations.

The following table shows some metrics of the Athena query performance.

 

Execution Time (sec) Performance Improvement (%) Data Scanned (GB)
Query employee (without compaction) employeeauto (with compaction) employee (without compaction) employeeauto (with compaction)
SELECT count(*) FROM "bigdata"."<tablename>" 67.5896 3.8472 94.31% 0 0
SELECT team, name, min(age) AS youngest_age
FROM "bigdata"."<tablename>"
GROUP BY team, name
ORDER BY youngest_age ASC
72.0152 50.4308 29.97% 33.72 32.96
SELECT role, team, avg(age) AS average_age
FROM bigdata."<tablename>"
GROUP BY role, team
ORDER BY average_age DESC
74.1430 37.7676 49.06% 17.24 16.59
SELECT name, age, start_date, role, team
FROM bigdata."<tablename>"
WHERE
CAST(start_date as DATE) > CAST('2023-01-02' as DATE) and
age > 40
ORDER BY start_date DESC
limit 100
70.3376 37.1232 47.22% 105.74 110.32

Because the previous test didn’t perform any delete operations on the table, we conduct a new test involving hundreds of thousands of such operations. We use the previously auto compacted table (employeeauto) as a base, noting that this table uses MoR for all operations.

We run a query that deletes data from each even second on the table:

DELETE FROM iceberg_catalog.bigdata.employeeauto
WHERE start_date BETWEEN 'start' AND 'end'
AND SECOND(start_date) % 2 = 0;

This query runs with table optimizations enabled, using an Amazon EMR Studio notebook. After running the queries, we roll back the table to its previous state for a performance comparison. Iceberg’s time-traveling capabilities allow us to restore the table. We then disable the table optimizations, rerun the delete query, and follow up with Athena queries to analyze performance differences. The following table summarizes our results.

 

Execution Time (sec) Performance Improvement (%) Data Scanned (GB)
Query employee (without compaction) employeeauto (with compaction) employee (without compaction) employeeauto (with compaction)
SELECT count(*) FROM "bigdata"."<tablename>" 29.820 8.71 70.77% 0 0
SELECT team, name, min(age) as youngest_age
FROM "bigdata"."<tablename>"
GROUP BY team, name
ORDER BY youngest_age ASC
58.0600 34.1320 41.21% 33.27 19.13
SELECT role, team, avg(age) AS average_age
FROM bigdata."<tablename>"
GROUP BY role, team
ORDER BY average_age DESC
59.2100 31.8492 46.21% 16.75 9.73
SELECT name, age, start_date, role, team
FROM bigdata."<tablename>"
WHERE
CAST(start_date as DATE) > CAST('2023-01-02' as DATE) and
age > 40
ORDER BY start_date DESC
limit 100
68.4650 33.1720 51.55% 112.64 61.18

We analyze the following key metrics:

  • Query runtime – We compared the runtimes between compacted and non-compacted tables using Athena as the query engine and found significant performance improvements with both MoR for ingestion and appends and MoR for delete operations.
  • Data scanned evaluation – We compared compacted and non-compacted tables using Athena as the query engine and observed a reduction in data scanned for most queries. This reduction translates directly into cost savings.

Prerequisites

To set up your own evaluation environment and test the feature, you need the following prerequisites:

  • A virtual private cloud (VPC) with at least two private subnets. For instructions, see Create a VPC.
  • An EC2 instance c5.xlarge using Amazon Linux 2023 running on one of those private subnets where you will launch the data simulator. For the security group, you can use the default for the VPC. For more information, see Get started with Amazon EC2.
  • An AWS Identity and Access Management (IAM) user with the correct permissions to create and configure all the required resources.

Set up Amazon S3 storage

Create an S3 bucket with the following structure:

s3bucket/
/jars
/employee.desc
/warehouse
/checkpoint
/checkpointAuto

Download the descriptor file employee.desc from the GitHub repo and place it in the S3 bucket.

Download the application on the releases page

Get the packaged application from the GitHub repo, then upload the JAR file to the jars directory on the S3 bucket. The warehouse will be where the Iceberg data and metadata will live and checkpoint will be used for the Structured Streaming checkpointing mechanism. Because we use two streaming job runs, one for compacted and one for non-compacted data, we also create a checkpointAuto folder.

Create a Data Catalog database

Create a database in the Data Catalog (for this post, we name our database bigdata). For instructions, see Getting started with the AWS Glue Data Catalog.

Create an EMR Serverless application

Create an EMR Serverless application with the following settings (for instructions, see Getting started with Amazon EMR Serverless):

  • Type: Spark
  • Version: 7.1.0
  • Architecture: x86_64
  • Java Runtime: Java 17
  • Metastore Integration: AWS Glue Data Catalog
  • Logs: Enable Amazon CloudWatch Logs if desired

Configure the network (VPC, subnets, and default security group) to allow the EMR Serverless application to reach the MSK cluster.

Take note of the application-id to use later for launching the jobs.

Create an MSK cluster

Create an MSK cluster on the Amazon MSK console. For more details, see Get started using Amazon MSK.

You need to use custom create with at least two brokers using 3.5.1, Apache Zookeeper mode version, and instance type kafka.m7g.xlarge. Do not use public access; choose two private subnets to deploy it (one broker per subnet or Availability Zone, for a total of two brokers). For the security group, remember that the EMR cluster and the Amazon EC2 based producer will need to reach the cluster and act accordingly. For security, use PLAINTEXT (in production, you should secure access to the cluster). Choose 200 GB as storage size for each broker and do not enable tiered storage. For network security groups, you can choose the default of the VPC.

For the MSK cluster configuration, use the following settings:

auto.create.topics.enable=true
default.replication.factor=2
min.insync.replicas=2
num.io.threads=8
num.network.threads=5
num.partitions=32
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
zookeeper.session.timeout.ms=18000
compression.type=zstd
log.retention.hours=2
log.retention.bytes=10073741824

Configure the data simulator

Log in to your EC2 instance. Because it’s running on a private subnet, you can use an instance endpoint to connect. To create one, see Connect to your instances using EC2 Instance Connect Endpoint. After you log in, issue the following commands:

sudo yum install java-17-amazon-corretto-devel
wget https://archive.apache.org/dist/kafka/3.5.1/kafka_2.12-3.5.1.tgz
tar xzvf kafka_2.12-3.5.1.tgz

Create Kafka topics

Create two Kafka topics—remember that you need to change the bootstrap server with the corresponding client information. You can get this data from the Amazon MSK console on the details page for your MSK cluster.

cd kafka_2.12-3.5.1/bin/

./kafka-topics.sh --topic protobuf-demo-topic-pure-auto --bootstrap-server kafkaBoostrapString --create
./kafka-topics.sh --topic protobuf-demo-topic-pure --bootstrap-server kafkaBoostrapString –create

Launch job runs

Issue job runs for the non-compacted and auto compacted tables using the following AWS Command Line Interface (AWS CLI) commands. You can use AWS CloudShell to run the commands.

For the non-compacted table, you need to change the s3bucket value as needed and the application-id. You also need an IAM role (execution-role-arn) with the corresponding permissions to access the S3 bucket and to access and write tables on the Data Catalog.

aws emr-serverless start-job-run --application-id application-identifier --name job-run-name --execution-role-arn arn-of-emrserverless-role --mode 'STREAMING' --job-driver '{
"sparkSubmit": {
"entryPoint": "s3://s3bucket/jars/streaming-iceberg-ingest-1.0-SNAPSHOT.jar",
"entryPointArguments": ["true","s3://s3bucket/warehouse","s3://s3bucket/Employee.desc","s3://s3bucket/checkpoint","kafkaBootstrapString","true"],
"sparkSubmitParameters": "--class com.aws.emr.spark.iot.SparkCustomIcebergIngestMoR --conf spark.executor.cores=16 --conf spark.executor.memory=64g --conf spark.driver.cores=4 --conf spark.driver.memory=16g --conf spark.dynamicAllocation.minExecutors=3 --conf spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar --conf spark.dynamicAllocation.maxExecutors=5 --conf spark.sql.catalog.glue_catalog.http-client.apache.max-connections=3000 --conf spark.emr-serverless.executor.disk.type=shuffle_optimized --conf spark.emr-serverless.executor.disk=1000G --files s3://s3bucket/Employee.desc --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1"
}
}'

For the auto compacted table, you need to change the s3bucket value as needed, the application-id, and the kafkaBootstrapString. You also need an IAM role (execution-role-arn) with the corresponding permissions to access the S3 bucket and to access and write tables on the Data Catalog.

aws emr-serverless start-job-run --application-id application-identifier --name job-run-name --execution-role-arn arn-of-emrserverless-role --mode 'STREAMING' --job-driver '{
"sparkSubmit": {
"entryPoint": "s3://s3bucket/jars/streaming-iceberg-ingest-1.0-SNAPSHOT.jar",
"entryPointArguments": ["true","s3://s3bucket/warehouse","/home/hadoop/Employee.desc","s3://s3bucket/checkpointAuto","kafkaBootstrapString","true"],
"sparkSubmitParameters": "--class com.aws.emr.spark.iot.SparkCustomIcebergIngestMoRAuto --conf spark.executor.cores=16 --conf spark.executor.memory=64g --conf spark.driver.cores=4 --conf spark.driver.memory=16g --conf spark.dynamicAllocation.minExecutors=3 --conf spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar --conf spark.dynamicAllocation.maxExecutors=5 --conf spark.sql.catalog.glue_catalog.http-client.apache.max-connections=3000 --conf spark.emr-serverless.executor.disk.type=shuffle_optimized --conf spark.emr-serverless.executor.disk=1000G --files s3://s3bucket/Employee.desc --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1"
}
}'

Enable auto compaction

Enable auto compaction for the employeeauto table in AWS Glue. For instructions, see Enabling compaction optimizer.

Launch the data simulator

Download the JAR file to the EC2 instance and run the producer:

aws s3 cp s3://s3bucket/jars/streaming-iceberg-ingest-1.0-SNAPSHOT.jar .

Now you can start the protocol buffer producers.

For non-compacted tables, use the following commands:

java -cp streaming-iceberg-ingest-1.0-SNAPSHOT.jar 
com.aws.emr.proto.kafka.producer.ProtoProducer kafkaBoostrapString

For auto compacted tables, use the following commands:

java -cp streaming-iceberg-ingest-1.0-SNAPSHOT.jar 
com.aws.emr.proto.kafka.producer.ProtoProducerAuto kafkaBoostrapString

Test the solution in EMR Studio

For the delete test, we use an EMR Studio. For setup instructions, see Set up an EMR Studio. Next, you need to create an EMR Serverless interactive application to run the notebook; refer to Run interactive workloads with EMR Serverless through EMR Studio to create a Workspace.

Open the Workspace, select the interactive EMR Serverless application as the compute option, and attach it.

Download the Jupyter notebook, upload it to your environment, and run the cells using a PySpark kernel to run the test.

Clean up

This evaluation is for high-throughput scenarios and can lead to significant costs. Complete the following steps to clean up your resources:

  1. Stop the Kafka producer EC2 instance.
  2. Cancel the EMR job runs and delete the EMR Serverless application.
  3. Delete the MSK cluster.
  4. Delete the tables and database from the Data Catalog.
  5. Delete the S3 bucket.

Conclusion

The Data Catalog has improved automatic compaction of Iceberg tables for streaming data, making it straightforward for you to keep your transactional data lakes always performant. Enabling automatic compaction on Iceberg tables reduces metadata overhead on your Iceberg tables and improves query performance.

Many customers have streaming data that is continuously ingested in Iceberg tables, resulting in a large set of delete files that track changes in data files. With this new feature, when you enable the Data Catalog optimizer, it constantly monitors table partitions and runs the compaction process for both data and delta or delete files and regularly commits the partial progress. The Data Catalog also has expanded support for heavily nested complex data and supports schema evolution as you reorder or rename columns.

In this post, we assessed the ingestion and query performance of simulated IoT data using AWS Glue Iceberg with auto compaction enabled. Our setup processed over 20 billion events, managing duplicates and late-arriving events, and employed a MoR approach for both ingestion/appends and deletions to evaluate the performance improvement and efficiency.

Overall, AWS Glue Iceberg with auto compaction proves to be a robust solution for managing high-throughput IoT data streams. These enhancements lead to faster data processing, shorter query times, and more efficient resource utilization, all of which are essential for any large-scale data ingestion and analytics pipeline.

For detailed setup instructions, see the GitHub repo.


About the Authors

Navnit Shukla serves as an AWS Specialist Solutions Architect with a focus on Analytics. He possesses a strong enthusiasm for assisting clients in discovering valuable insights from their data. Through his expertise, he constructs innovative solutions that empower businesses to arrive at informed, data-driven choices. Notably, Navnit Shukla is the accomplished author of the book titled Data Wrangling on AWS. He can be reached through LinkedIn.

Angel Conde Manjon is a Sr. PSA Specialist on Data & AI, based in Madrid, and focuses on EMEA South and Israel. He has previously worked on research related to data analytics and artificial intelligence in diverse European research projects. In his current role, Angel helps partners develop businesses centered on data and AI.

Amit Singh currently serves as a Senior Solutions Architect at AWS, specializing in analytics and IoT technologies. With extensive expertise in designing and implementing large-scale distributed systems, Amit is passionate about empowering clients to drive innovation and achieve business transformation through AWS solutions.

Sandeep Adwankar is a Senior Technical Product Manager at AWS. Based in the California Bay Area, he works with customers around the globe to translate business and technical requirements into products that enable customers to improve how they manage, secure, and access data.

Introducing a new unified data connection experience with Amazon SageMaker Lakehouse unified data connectivity

Post Syndicated from Chiho Sugimoto original https://aws.amazon.com/blogs/big-data/introducing-a-new-unified-data-connection-experience-with-amazon-sagemaker-lakehouse-data-connectivity/

The need to integrate diverse data sources has grown exponentially, but there are several common challenges when integrating and analyzing data from multiple sources, services, and applications. First, you need to create and maintain independent connections to the same data source for different services. Second, the data connectivity experience is inconsistent across different services. For each service, you need to learn the supported authorization and authentication methods, data access APIs, and framework to onboard and test data sources. Third, some services require you to set up and manage compute resources used for federated connectivity, and capabilities like connection testing and data preview aren’t available in all services. This fragmented, repetitive, and error-prone experience for data connectivity is a significant obstacle to data integration, analysis, and machine learning (ML) initiatives.

To solve for these challenges, we launched Amazon SageMaker Lakehouse unified data connectivity. This feature offers the following capabilities and benefits:

  • With SageMaker Lakehouse unified data connectivity, you can set up a connection to a data source using a connection configuration template that is standardized for multiple services. Amazon SageMaker Unified Studio, AWS Glue, and Amazon Athena can share and reuse the same connection with proper permission configuration.
  • SageMaker Lakehouse unified data connectivity supports standard methods for data source connection authorization and authentications, such as basic authorization and OAuth2. This approach simplifies your data journey and helps you meet your security requirements.
  • The SageMaker Lakehouse data connection testing capability boosts your confidence in established connections. With the ability to browse metadata, you can understand the structure and schema of the data source, identify relevant tables and fields, and discover useful data assets you may not be aware of.
  • SageMaker Lakehouse unified data connectivity’s data preview capability helps you map source fields to target schemas, identify needed data transformation, and plan data standardization and normalization steps.
  • SageMaker Lakehouse unified data connectivity provides a set of APIs for you to use without the need to learn different APIs for various data sources, promoting coding efficiency and productivity.

With SageMaker Lakehouse unified data connectivity, you can confidently connect, explore, and unlock the full value of your data across AWS services and achieve your business objectives with agility.

This post demonstrates how SageMaker Lakehouse unified data connectivity helps your data integration workload by streamlining the establishment and management of connections for various data sources.

Solution overview

In this scenario, an e-commerce company sells products on their online platform. The product data is stored on Amazon Aurora PostgreSQL-Compatible Edition. Their existing business intelligence (BI) tool runs queries on Athena. Furthermore, they have a data pipeline to perform extract, transform, and load (ETL) jobs when moving data from the Aurora PostgreSQL database cluster to other data stores.

Now they have a new requirement to allow ad-hoc queries through SageMaker Unified Studio to enable data engineers, data analysts, sales representatives, and others to take advantage of its unified experience.

In the following sections, we demonstrate how to set up this connection and run queries using different AWS services.

Prerequisites

Before you begin, make sure you have the followings:

  • An AWS account.
  • A SageMaker Unified Studio domain.
  • An Aurora PostgreSQL database cluster.
  • A virtual private cloud (VPC) and private subnets required for SageMaker Unified Studio.
  • An Amazon Simple Storage Service (Amazon S3) bucket to store output from the AWS Glue ETL jobs. In the following steps, replace amzn-s3-demo-destination-bucket with the name of the S3 bucket.
  • An AWS Glue Data Catalog database. In the following steps, replace <your_database> with the name of your database.

Create an IAM role for the AWS Glue job

You can either create a new AWS Identity and Access Management (IAM) role or use an existing role that has permission to access the AWS Glue output bucket and AWS Secrets Manager.

If you want to create a new one, complete the following steps:

  1. On the IAM console, in the navigation pane, choose Roles.
  2. Choose Create role.
  3. For Trusted entity type, choose AWS service.
  4. For Service or use case, choose Glue.
  5. Choose Next.
  6. For Add permissions, choose AWSGlueServiceRole, then choose Next.
  7. For Role name, enter a role name (for this post, GlueJobRole-demo).
  8. Choose Create role.
  9. Choose the created IAM role.
  10. Under Permissions policies, choose Add permission and Create inline policy.
  11. For Policy editor, choose JSON, and enter the following policy:
    {
         "Version": "2012-10-17",
         "Statement": [
             {
                 "Effect": "Allow",
                 "Action": [
                     "s3:List*",
                     "s3:GetObject",
                     "s3:PutObject",
                     "s3:DeleteObject"
                 ],
                 "Resource": [
                     "arn:aws:s3:::amzn-s3-demo-destination-bucket/*",
                     "arn:aws:s3:::amzn-s3-demo-destination-bucket"
                 ]
             },
            {
                "Effect": "Allow",
                "Action": [
                    "secretsmanager:GetSecretValue"
                ],
                "Resource": [
                    "arn:aws:secretsmanager:<region>:<account-id>:secret:SageMakerUnifiedStudio-Glue-postgresql_source-*"
                ]
            }
         ]
     }

  12. Choose Next.
  13. For Policy name, enter a name for your policy.
  14. Choose Create policy.

Create a SageMaker Lakehouse data connection

Let’s get started with the unified data connection experience. The first step is to create a SageMaker Lakehouse data connection. Complete the following steps:

  1. Sign in to your SageMaker Unified Studio.
  2. Open your project.
  3. On your project, in the navigation pane, choose Data.
  4. Choose the plus sign.
  5. For Add data source, choose Add connection. Choose Next.
  6. Select PostgreSQL, and choose Next.
  7. For Name, enter postgresql_source.
  8. For Host, enter your host name of your Aurora PostgreSQL database cluster.
  9. For Port, enter your port number of your Aurora PostgreSQL database cluster (by default, it’s 5432).
  10. For Database, enter your database name.
  11. For Authentication, select Username and password.
  12. Enter your username and password.
  13. Choose Add data.

After the completion, it will create a new AWS Secrets Manager secret with a name like SageMakerUnifiedStudio-Glue-postgresql_source to securely store the specified username and password. It also creates a Glue connection with the same name postgresql_source.

Now you have a unified connection for Aurora PostgreSQL-Compatible.

Load data into the PostgreSQL database through the notebook

You will use a JupyterLab notebook on SageMaker Unified Studio to load sample data from an S3 bucket into a PostgreSQL database using Apache Spark.

  1. On the top left menu, choose Build, and under IDE & APPLICATIONS, choose JupyterLab.
  2. Choose Python 3 under Notebook.
  3. For the first cell, choose Local Python, python, enter following code, and run the cell:
    %%configure -f -n project.spark
    {
        "glue_version": "4.0"
    }

  4. For the second cell, choose PySpark, spark, enter following code, and run the cell:
    # Read sample data from S3 bucket
    df = spark.read.parquet("s3://aws-bigdata-blog/generated_synthetic_reviews/data/product_category=Apparel/")
    
    # Preview the data
    df.show()

The code snippet reads the sample data Parquet files from the specified S3 bucket location and stores the data in a Spark DataFrame named df. The df.show() command displays the first 20 rows of the DataFrame, allowing you to preview the sample data in a tabular format. Next, you will load this sample data into a PostgreSQL database.

  1. For the third cell, choose PySpark, spark, enter following code, and run the cell (replace <account-id> with your AWS account ID):
    import boto3
    import ast
    
    # replace you account ID before running this cell
    
    # Get secret
    secretsmanager_client = boto3.client('secretsmanager')
    get_secret_value_response = secretsmanager_client.get_secret_value(
        SecretId='SageMakerUnifiedStudio-Glue-postgresql_source' # replace the secret name if needed
    )
    secret = ast.literal_eval(get_secret_value_response["SecretString"])
    
    # Get connection
    glue_client = boto3.client('glue')
    glue_client_response = glue_client.get_connection(
        CatalogId='<account-id>',
        Name='postgresql_source' # replace the connection name if needed
    )
    connection_properties = glue_client_response["Connection"]["ConnectionProperties"]

  2. For the fourth cell, choose PySpark, spark, enter following code, and run the cell:
    # Load data into the DB
    jdbcurl = "jdbc:postgresql://{}:{}/{}".format(connection_properties["HOST"],connection_properties["PORT"],connection_properties["DATABASE"])
    df.write \
        .format("jdbc") \
        .option("url", jdbcurl) \
        .option("dbtable", "public.unified_connection_test") \
        .option("user", secret["username"]) \
        .option("password", secret["password"]) \
        .save()

Let’s see if you could successfully create the new table unified_connection_test. You can navigate to the project’s Data page to visually verify the existence of the newly created table.

  1. On the top left menu, choose your project name, and under CURRENT PROJECT, choose Data.

Within the Lakehouse section, expand the postgresql_source, then the public schema, and you should find the newly created unified_connection_test table listed there. Next, you will query the data in this table using SageMaker Unified Studio’s SQL query book feature.

Run queries on the connection through the query book using Athena

Now you can run queries using the connection you created. In this section, we demonstrate how to use the query book using Athena. Complete the following steps:

  1. In your project on SageMaker Unified Studio, choose the Lakehouse section, expand the postgresql_source, then the public
  2. On the options menu (three vertical dots) of the table unified_connection_test, choose Query with Athena.

This step will open a new SQL query book. The query statement select * from "postgresql_source"."public"."unified_connection_test" limit 10; is automatically filled.

  1. On the Actions menu, choose Save to Project.
  2. For Querybook title, enter the name of your SQL query book.
  3. Choose Save changes.

This will save the current SQL query book, and the status of the notebook will change from Draft to Saved. If you want to revert a draft notebook to its last published state, choose Revert to published version to roll back to the most recently published version. Now, let’s start running queries on your notebook.

  1. Choose Run all.

When a query finishes, results can be viewed in a few formats. The table view displays query results in a tabular format. You can download the results as JSON or CSV files using the download icon at the bottom of the output cell. Additionally, the notebook provides a chart view to visualize query results as graphs.

The sample data includes a column star_rating representing a 5-star rating for products. Let’s try a quick visualization to analyze the rating distribution.

  1. Choose Add SQL to add a new cell.
  2. Enter the following statement:
    SELECT count() as counts, star_rating FROM "postgresql_source"."public"."unified_connection_test"
    GROUP BY star_rating

  3. Choose the run icon of the cell, or you can press Ctrl+Enter or Cmd+Enter to run the query.

This will display the results in the output panel. Now you have learned how the connection works on SageMaker Unified Studio. Next, we show how you can use the connection on AWS Glue consoles.

Run Glue ETL jobs on the connection on the AWS Glue console

Next, we create an AWS Glue ETL job that reads table data from the PostgreSQL connection, converts data types, transforms the data into Parquet files, and outputs them to Amazon S3. It also creates a table in the Glue Data Catalog and add partitions so downstream data engineers can immediately use the table data. Complete the following steps:

  1. On the AWS Glue console, choose Visual ETL in the navigation pane.
  2. Under Create job, choose Visual ETL.
  3. At the top of the job, replace “Untitled job” with a name of your choice.
  4. On the Job Details tab, under Basic properties, specify the IAM role that the job will use (GlueJobRole-demo).
  5. For Glue version, choose Glue version 4.0
  6. Choose Save.
  7. On the Visual tab, choose the plus sign to open the Add nodes
  8. Search for postgresql and add PostgreSQL as Source.
  9. For JDBC source, choose JDBC connection details.
  10. For PostgreSQL connection, choose postgresql_source.
  11. For Table name, enter unified_connection_test
  1. As a child of this source, search in the Add nodes menu for timestamp and choose To Timestamp.
  2. For Column to convert, choose review_date.
  3. For Column type, choose iso.
  4. On the Visual tab, search in the Add nodes menu for s3 and add Amazon S3 as Target.
  5. For Format, choose Parquet.
  6. For Compression Type, choose Snappy.
  7. For S3 Target Location, enter your S3 output location (s3://amzn-s3-demo-destination-bucket).
  8. For Data Catalog update options, choose Create a table in the Data Catalog and on subsequent runs, update the schema and add new partitions.
  9. For Database, enter your Data Catalog database (<your_database>).
  10. For Table name, enter connection_demo_tbl.
  11. Under Partition keys, choose Add a partition key, and choose review_year.
  12. Choose Save, then choose Run to run the job.

When the job is complete, it will output Parquet files to Amazon S3 and create a table named connection_demo_tbl in the Data Catalog. You have now learned that you can use the SageMaker Lakehouse data connection not only in SageMaker Unified Studio, but also directly in AWS Glue console without needing to create separate individual connections.

Clean up

Now to the final step, cleaning up the resources. Complete the following steps:

  1. Delete the connection.
  2. Delete the Glue job.
  3. Delete the AWS Glue output S3 buckets.
  4. Delete the IAM role AWSGlueServiceRole.
  5. Delete the Aurora PostgreSQL cluster.

Conclusion

This post demonstrated how the SageMaker Lakehouse unified data connectivity works end to end, and how you can use the unified connection across different services such as AWS Glue and Athena. This new capability can simplify your data journey.

To learn more, refer to Amazon SageMaker Unified Studio.


About the Authors

Chiho Sugimoto is a Cloud Support Engineer on the AWS Big Data Support team. She is passionate about helping customers build data lakes using ETL workloads. She loves planetary science and enjoys studying the asteroid Ryugu on weekends.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his new road bike.

Shubham Agrawal is a Software Development Engineer on the AWS Glue team. He has expertise in designing scalable, high-performance systems for handling large-scale, real-time data processing. Driven by a passion for solving complex engineering problems, he focuses on building seamless integration solutions that enable organizations to maximize the value of their data.

Joju Eruppanal is a Software Development Manager on the AWS Glue team. He strives to delight customers by helping his team build software. He loves exploring different cultures and cuisines.

Julie Zhao is a Senior Product Manager at AWS Glue. She joined AWS in 2021 and brings three years of startup experience leading products in IoT data platforms. Prior to startups, she spent over 10 years in networking with Cisco and Juniper across engineering and product. She is passionate about building products to solve customer problems.

Building end-to-end data lineage for one-time and complex queries using Amazon Athena, Amazon Redshift, Amazon Neptune and dbt

Post Syndicated from Nancy Wu original https://aws.amazon.com/blogs/big-data/building-end-to-end-data-lineage-for-one-time-and-complex-queries-using-amazon-athena-amazon-redshift-amazon-neptune-and-dbt/

One-time and complex queries are two common scenarios in enterprise data analytics. One-time queries are flexible and suitable for instant analysis and exploratory research. Complex queries, on the other hand, refer to large-scale data processing and in-depth analysis based on petabyte-level data warehouses in massive data scenarios. These complex queries typically involve data sources from multiple business systems, requiring multilevel nested SQL or associations with numerous tables for highly sophisticated analytical tasks.

However, combining the data lineage of these two query types presents several challenges:

  1. Diversity of data sources
  2. Varying query complexity
  3. Inconsistent granularity in lineage tracking
  4. Different real-time requirements
  5. Difficulties in cross-system integration

Moreover, maintaining the accuracy and completeness of lineage information while providing system performance and scalability are crucial considerations. Addressing these challenges requires a carefully designed architecture and advanced technical solutions.

Amazon Athena offers serverless, flexible SQL analytics for one-time queries, enabling direct querying of Amazon Simple Storage Service (Amazon S3) data for rapid, cost-effective instant analysis. Amazon Redshift, optimized for complex queries, provides high-performance columnar storage and massively parallel processing (MPP) architecture, supporting large-scale data processing and advanced SQL capabilities. Amazon Neptune, as a graph database, is ideal for data lineage analysis, offering efficient relationship traversal and complex graph algorithms to handle large-scale, intricate data lineage relationships. The combination of these three services provides a powerful, comprehensive solution for end-to-end data lineage analysis.

In the context of comprehensive data governance, Amazon DataZone offers organization-wide data lineage visualization using Amazon Web Services (AWS) services, while dbt provides project-level lineage through model analysis and supports cross-project integration between data lakes and warehouses.

In this post, we use dbt for data modeling on both Amazon Athena and Amazon Redshift. dbt on Athena supports real-time queries, while dbt on Amazon Redshift handles complex queries, unifying the development language and significantly reducing the technical learning curve. Using a single dbt modeling language not only simplifies the development process but also automatically generates consistent data lineage information. This approach offers robust adaptability, easily accommodating changes in data structures.

By integrating Amazon Neptune graph database to store and analyze complex lineage relationships, combined with AWS Step Functions and AWS Lambda functions, we achieve a fully automated data lineage generation process. This combination promotes consistency and completeness of lineage data while enhancing the efficiency and scalability of the entire process. The result is a powerful and flexible solution for end-to-end data lineage analysis.

Architecture overview

The experiment’s context involves a customer already using Amazon Athena for one-time queries. To better accommodate massive data processing and complex query scenarios, they aim to adopt a unified data modeling language across different data platforms. This led to the implementation of both Athena on dbt and Amazon Redshift on dbt architectures.

AWS Glue crawler crawls data lake information from Amazon S3, generating a Data Catalog to support dbt on Amazon Athena data modeling. For complex query scenarios, AWS Glue performs extract, transform, and load (ETL) processing, loading data into the petabyte-scale data warehouse, Amazon Redshift. Here, data modeling uses dbt on Amazon Redshift.

Lineage data original files from both parts are loaded into an S3 bucket, providing data support for end-to-end data lineage analysis.

The following image is the architecture diagram for the solution.

Figure 1-Architecture diagram of DBT modeling based on Athena and Redshift

Some important considerations:

This experiment uses the following data dictionary:

Source table Tool Target table
imdb.name_basics DBT/Athena stg_imdb__name_basics
imdb.title_akas DBT/Athena stg_imdb__title_akas
imdb.title_basics DBT/Athena stg_imdb__title_basics
imdb.title_crew DBT/Athena stg_imdb__title_crews
imdb.title_episode DBT/Athena stg_imdb__title_episodes
imdb.title_principals DBT/Athena stg_imdb__title_principals
imdb.title_ratings DBT/Athena stg_imdb__title_ratings
stg_imdb__name_basics DBT/Redshift new_stg_imdb__name_basics
stg_imdb__title_akas DBT/Redshift new_stg_imdb__title_akas
stg_imdb__title_basics DBT/Redshift new_stg_imdb__title_basics
stg_imdb__title_crews DBT/Redshift new_stg_imdb__title_crews
stg_imdb__title_episodes DBT/Redshift new_stg_imdb__title_episodes
stg_imdb__title_principals DBT/Redshift new_stg_imdb__title_principals
stg_imdb__title_ratings DBT/Redshift new_stg_imdb__title_ratings
new_stg_imdb__name_basics DBT/Redshift int_primary_profession_flattened_from_name_basics
new_stg_imdb__name_basics DBT/Redshift int_known_for_titles_flattened_from_name_basics
new_stg_imdb__name_basics DBT/Redshift names
new_stg_imdb__title_akas DBT/Redshift titles
new_stg_imdb__title_basics DBT/Redshift int_genres_flattened_from_title_basics
new_stg_imdb__title_basics DBT/Redshift titles
new_stg_imdb__title_crews DBT/Redshift int_directors_flattened_from_title_crews
new_stg_imdb__title_crews DBT/Redshift int_writers_flattened_from_title_crews
new_stg_imdb__title_episodes DBT/Redshift titles
new_stg_imdb__title_principals DBT/Redshift titles
new_stg_imdb__title_ratings DBT/Redshift titles
int_known_for_titles_flattened_from_name_basics DBT/Redshift titles
int_primary_profession_flattened_from_name_basics DBT/Redshift
int_directors_flattened_from_title_crews DBT/Redshift names
int_genres_flattened_from_title_basics DBT/Redshift genre_titles
int_writers_flattened_from_title_crews DBT/Redshift names
genre_titles DBT/Redshift
names DBT/Redshift
titles DBT/Redshift

The lineage data generated by dbt on Athena includes partial lineage diagrams, as exemplified in the following images. The first image shows the lineage of name_basics in dbt on Athena. The second image shows the lineage of title_crew in dbt on Athena.

Figure 3-Lineage of name_basics in DBT on Athena

Figure 4-Lineage of title_crew in DBT on Athena

The lineage data generated by dbt on Amazon Redshift includes partial lineage diagrams, as illustrated in the following image.

Figure 5-Lineage of name_basics and title_crew in DBT on Redshift

Referring to the data dictionary and screenshots, it’s evident that the complete data lineage information is highly dispersed, spread across 29 lineage diagrams. Understanding the end-to-end comprehensive view requires significant time. In real-world environments, the situation is often more complex, with complete data lineage potentially distributed across hundreds of files. Consequently, integrating a complete end-to-end data lineage diagram becomes crucial and challenging.

This experiment will provide a detailed introduction to processing and merging data lineage files stored in Amazon S3, as illustrated in the following diagram.

Figure 6-Merging data lineage from Athena and Redshift into Neptune

Prerequisites

To perform the solution, you need to have the following prerequisites in place:

  • The Lambda function for preprocessing lineage files must have permissions to access Amazon S3 and Amazon Redshift.
  • The Lambda function for constructing the directed acyclic graph (DAG) must have permissions to access Amazon S3 and Amazon Neptune.

Solution walkthrough

To perform the solution, follow the steps in the next sections.

Preprocess raw lineage data for DAG generation using Lambda functions

Use Lambda to preprocess the raw lineage data generated by dbt, converting it into key-value pair JSON files that are easily understood by Neptune: athena_dbt_lineage_map.json and redshift_dbt_lineage_map.json.

  1. To create a new Lambda function in the Lambda console, enter a Function name, select the Runtime (Python in this example), configure the Architecture and Execution role, then click the “Create function” button.

Figure 7-Basic configuration of athena-data-lineage-process Lambda

  1. Open the created Lambda function and on the Configuration tab, in the navigation pane, select Environment variables and choose your configurations. Using Athena on dbt processing as an example, configure the environment variables as follows (the process for Amazon Redshift on dbt is similar):
    • INPUT_BUCKET: data-lineage-analysis-24-09-22 (replace with the S3 bucket path storing the original Athena on dbt lineage files)
    • INPUT_KEY: athena_manifest.json (the original Athena on dbt lineage file)
    • OUTPUT_BUCKET: data-lineage-analysis-24-09-22 (replace with the S3 bucket path for storing the preprocessed output of Athena on dbt lineage files)
    • OUTPUT_KEY: athena_dbt_lineage_map.json (the output file after preprocessing the original Athena on dbt lineage file)

Figure 8-Environment variable configuration for athena-data-lineage-process-Lambda

  1. On the Code tab, in the lambda_function.py file, enter the preprocessing code for the raw lineage data. Here’s a code reference using Athena on dbt processing as an example (the process for Amazon Redshift on dbt is similar). The preprocessing code for Athena on dbt’s original lineage file is as follows:

The athena_manifest.json, redshift_manifest.json, and other files used in this experiment can be obtained from the Data Lineage Graph Construction GitHub repository.

import json
import boto3
import os

def lambda_handler(event, context):
    # Set up S3 client
    s3 = boto3.client('s3')

    # Get input and output paths from environment variables
    input_bucket = os.environ['INPUT_BUCKET']
    input_key = os.environ['INPUT_KEY']
    output_bucket = os.environ['OUTPUT_BUCKET']
    output_key = os.environ['OUTPUT_KEY']

    # Define helper function
    def dbt_nodename_format(node_name):
        return node_name.split(".")[-1]

    # Read input JSON file from S3
    response = s3.get_object(Bucket=input_bucket, Key=input_key)
    file_content = response['Body'].read().decode('utf-8')
    data = json.loads(file_content)
    lineage_map = data["child_map"]
    node_dict = {}
    dbt_lineage_map = {}

    # Process data
    for item in lineage_map:
        lineage_map[item] = [dbt_nodename_format(child) for child in lineage_map[item]]
        node_dict[item] = dbt_nodename_format(item)

    # Update key names
    lineage_map = {node_dict[old]: value for old, value in lineage_map.items()}
    dbt_lineage_map["lineage_map"] = lineage_map

    # Convert result to JSON string
    result_json = json.dumps(dbt_lineage_map)

    # Write JSON string to S3
    s3.put_object(Body=result_json, Bucket=output_bucket, Key=output_key)
    print(f"Data written to s3://{output_bucket}/{output_key}")

    return {
        'statusCode': 200,
        'body': json.dumps('Athena data lineage processing completed successfully')
    }

Merge preprocessed lineage data and write to Neptune using Lambda functions

  1. Before processing data with the Lambda function, create a Lambda layer by uploading the required Gremlin plugin. For detailed steps on creating and configuring Lambda Layers, see the AWS Lambda Layers documentation.

Because connecting Lambda to Neptune for constructing a DAG requires the Gremlin plugin, it needs to be uploaded before using Lambda. The Gremlin package can be obtained from the Data Lineage Graph Construction GitHub repository.

Figure 9-Lambda layers

  1. Create a new Lambda function. Choose the function to configure. To the recently created layer, at the bottom of the page, choose Add a layer.

Figure 10_Add a layer

Create another Lambda layer for the requests library, similar to how you created the layer for the Gremlin plugin. This library will be used for HTTP client functionality in the Lambda function.

  1. Choose the recently created Lambda function to configure. Connect to Neptune through Lambda to merge the two datasets and construct a DAG. On the Code tab, the reference code to execute is as follows:
import json
import boto3
import os
import requests
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from botocore.credentials import get_credentials
from botocore.session import Session
from concurrent.futures import ThreadPoolExecutor, as_completed

def read_s3_file(s3_client, bucket, key):
    try:
        response = s3_client.get_object(Bucket=bucket, Key=key)
        data = json.loads(response['Body'].read().decode('utf-8'))
        return data.get("lineage_map", {})
    except Exception as e:
        print(f"Error reading S3 file {bucket}/{key}: {str(e)}")
        raise

def merge_data(athena_data, redshift_data):
    return {**athena_data, **redshift_data}

def sign_request(request):
    credentials = get_credentials(Session())
    auth = SigV4Auth(credentials, 'neptune-db', os.environ['AWS_REGION'])
    auth.add_auth(request)
    return dict(request.headers)

def send_request(url, headers, data):
    try:
        response = requests.post(url, headers=headers, data=data, timeout=30)
        response.raise_for_status()
        return response.text
    except requests.exceptions.RequestException as e:
        print(f"Request Error: {str(e)}")
        if hasattr(e.response, 'text'):
            print(f"Response content: {e.response.text}")
        raise

def write_to_neptune(data):
    endpoint = 'https://your neptune endpoint name:8182/gremlin'
    # replace with your neptune endpoint name

    # Clear Neptune database
    clear_query = "g.V().drop()"
    request = AWSRequest(method='POST', url=endpoint, data=json.dumps({'gremlin': clear_query}))
    signed_headers = sign_request(request)
    response = send_request(endpoint, signed_headers, json.dumps({'gremlin': clear_query}))
    print(f"Clear database response: {response}")

    # Verify if the database is empty
    verify_query = "g.V().count()"
    request = AWSRequest(method='POST', url=endpoint, data=json.dumps({'gremlin': verify_query}))
    signed_headers = sign_request(request)
    response = send_request(endpoint, signed_headers, json.dumps({'gremlin': verify_query}))
    print(f"Vertex count after clearing: {response}")
    
    def process_node(node, children):
        # Add node
        query = f"g.V().has('lineage_node', 'node_name', '{node}').fold().coalesce(unfold(), addV('lineage_node').property('node_name', '{node}'))"
        request = AWSRequest(method='POST', url=endpoint, data=json.dumps({'gremlin': query}))
        signed_headers = sign_request(request)
        response = send_request(endpoint, signed_headers, json.dumps({'gremlin': query}))
        print(f"Add node response for {node}: {response}")

        for child_node in children:
            # Add child node
            query = f"g.V().has('lineage_node', 'node_name', '{child_node}').fold().coalesce(unfold(), addV('lineage_node').property('node_name', '{child_node}'))"
            request = AWSRequest(method='POST', url=endpoint, data=json.dumps({'gremlin': query}))
            signed_headers = sign_request(request)
            response = send_request(endpoint, signed_headers, json.dumps({'gremlin': query}))
            print(f"Add child node response for {child_node}: {response}")

            # Add edge
            query = f"g.V().has('lineage_node', 'node_name', '{node}').as('a').V().has('lineage_node', 'node_name', '{child_node}').coalesce(inE('lineage_edge').where(outV().as('a')), addE('lineage_edge').from('a').property('edge_name', ' '))"
            request = AWSRequest(method='POST', url=endpoint, data=json.dumps({'gremlin': query}))
            signed_headers = sign_request(request)
            response = send_request(endpoint, signed_headers, json.dumps({'gremlin': query}))
            print(f"Add edge response for {node} -> {child_node}: {response}")

    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = [executor.submit(process_node, node, children) for node, children in data.items()]
        for future in as_completed(futures):
            try:
                future.result()
            except Exception as e:
                print(f"Error in processing node: {str(e)}")

def lambda_handler(event, context):
    # Initialize S3 client
    s3_client = boto3.client('s3')

    # S3 bucket and file paths
    bucket_name = 'data-lineage-analysis' # Replace with your S3 bucket name
    athena_key = 'athena_dbt_lineage_map.json' # Replace with your athena lineage key value output json name
    redshift_key = 'redshift_dbt_lineage_map.json' # Replace with your redshift lineage key value output json name

    try:
        # Read Athena lineage data
        athena_data = read_s3_file(s3_client, bucket_name, athena_key)
        print(f"Athena data size: {len(athena_data)}")

        # Read Redshift lineage data
        redshift_data = read_s3_file(s3_client, bucket_name, redshift_key)
        print(f"Redshift data size: {len(redshift_data)}")

        # Merge data
        combined_data = merge_data(athena_data, redshift_data)
        print(f"Combined data size: {len(combined_data)}")

        # Write to Neptune (including clearing the database)
        write_to_neptune(combined_data)

        return {
            'statusCode': 200,
            'body': json.dumps('Data successfully written to Neptune')
        }
    except Exception as e:
        print(f"Error in lambda_handler: {str(e)}")
        return {
            'statusCode': 500,
            'body': json.dumps(f'Error: {str(e)}')
        }

Create Step Functions workflow

  1. On the Step Functions console, choose State machines, and then choose Create state machine. On the Choose a template page, select Blank template.

Figure 11-Step Functions blank template

  1. In the Blank template, choose Code to define your state machine. Use the following example code:
{
  "Comment": "Daily Data Lineage Processing Workflow",
  "StartAt": "Parallel Processing",
  "States": {
    "Parallel Processing": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "Process Athena Data",
          "States": {
            "Process Athena Data": {
              "Type": "Task",
              "Resource": "arn:aws:states:::lambda:invoke",
              "Parameters": {
                "FunctionName": "athena-data-lineange-process-Lambda", ##Replace with your Athena data lineage process Lambda function name
                "Payload": {
                  "input.$": "$"
                }
              },
              "End": true
            }
          }
        },
        {
          "StartAt": "Process Redshift Data",
          "States": {
            "Process Redshift Data": {
              "Type": "Task",
              "Resource": "arn:aws:states:::lambda:invoke",
              "Parameters": {
                "FunctionName": "redshift-data-lineange-process-Lambda", ##Replace with your Redshift data lineage process Lambda function name
                "Payload": {
                  "input.$": "$"
                }
              },
              "End": true
            }
          }
        }
      ],
      "Next": "Load Data to Neptune"
    },
    "Load Data to Neptune": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "FunctionName": "data-lineage-analysis-lambda" ##Replace with your Lambda function Name
      },
      "End": true
    }
  }
}
  1. After completing the configuration, choose the Design tab to view the workflow shown in the following diagram.

Figure 12-Step Functions design view

Create scheduling rules with Amazon EventBridge

Configure Amazon EventBridge to generate lineage data daily during off-peak business hours. To do this:

  1. Create a new rule in the EventBridge console with a descriptive name.
  2. Set the rule type to “Schedule” and configure it to run once daily (using either a fixed rate or the Cron expression “0 0 * * ? *”).
  3. Select the AWS Step Functions state machine as the target and specify the state machine you created earlier.

Query results in Neptune

  1. On the Neptune console, select Notebooks. Open an existing notebook or create a new one.

Figure 13-Neptune notebook

  1. In the notebook, create a new code cell to perform a query. The following code example shows the query statement and its results:
%%gremlin -d node_name -de edge_name
g.V().hasLabel('lineage_node').outE('lineage_edge').inV().hasLabel('lineage_node').path().by(elementMap())

You can now see the end-to-end data lineage graph information for both dbt on Athena and dbt on Amazon Redshift. The following image shows the merged DAG data lineage graph in Neptune.

Figure 14-Merged DAG data lineage graph in Neptune

You can query the generated data lineage graph for data related to a specific table, such as title_crew.

The sample query statement and its results are shown in the following code example:

%%gremlin -d node_name -de edge_name
g.V().has('lineage_node', 'node_name', 'title_crew')
  .repeat(
    union(
      __.inE('lineage_edge').outV(),
      __.outE('lineage_edge').inV()
    )
  )
  .until(
    __.has('node_name', within('names', 'genre_titles', 'titles'))
    .or()
    .loops().is(gt(10))
  )
  .path()
  .by(elementMap())

The following image shows the filtered results based on title_crew table in Neptune.

Figure 15-Filtered results based on title_crew table in Neptune

Clean up

To clean up your resources, complete the following steps:

  1. Delete EventBridge rules
# Stop new events from triggering while removing dependencies
aws events disable-rule --name <rule-name>
# Break connections between rule and targets (like Lambda functions)
aws events remove-targets --rule <rule-name> --ids <target-id>
# Remove the rule completely from EventBridge
aws events delete-rule --name <rule-name>
  1. Delete Step Functions state machine
# Stop all running executions
aws stepfunctions stop-execution --execution-arn <execution-arn>
# Delete the state machine
aws stepfunctions delete-state-machine --state-machine-arn <state-machine-arn>
  1. Delete Lambda functions
# Delete Lambda function
aws lambda delete-function --function-name <function-name>
# Delete Lambda layers (if used)
aws lambda delete-layer-version --layer-name <layer-name> --version-number <version>
  1. Clean up the Neptune database
# Delete all snapshots
aws neptune delete-db-cluster-snapshot --db-cluster-snapshot-identifier <snapshot-id>
# Delete database instance
aws neptune delete-db-instance --db-instance-identifier <instance-id> --skip-final-snapshot
# Delete database cluster
aws neptune delete-db-cluster --db-cluster-identifier <cluster-id> --skip-final-snapshot
  1. Follow the instructions at Deleting a single object to clean up the S3 buckets

Conclusion

In this post, we demonstrated how dbt enables unified data modeling across Amazon Athena and Amazon Redshift, integrating data lineage from both one-time and complex queries. By using Amazon Neptune, this solution provides comprehensive end-to-end lineage analysis. The architecture uses AWS serverless computing and managed services, including Step Functions, Lambda, and EventBridge, providing a highly flexible and scalable design.

This approach significantly lowers the learning curve through a unified data modeling method while enhancing development efficiency. The end-to-end data lineage graph visualization and analysis not only strengthen data governance capabilities but also offer deep insights for decision-making.

The solution’s flexible and scalable architecture effectively optimizes operational costs and improves business responsiveness. This comprehensive approach balances technical innovation, data governance, operational efficiency, and cost-effectiveness, thus supporting long-term business growth with the adaptability to meet evolving enterprise needs.

With OpenLineage-compatible data lineage now generally available in Amazon DataZone, we plan to explore integration possibilities to further enhance the system’s capability to handle complex data lineage analysis scenarios.

If you have any questions, please feel free to leave a comment in the comments section.


About the authors

nancynwu+photo

Nancy Wu is a Solutions Architect at AWS, responsible for cloud computing architecture consulting and design for multinational enterprise customers. Has many years of experience in big data, enterprise digital transformation research and development, consulting, and project management across telecommunications, entertainment, and financial industries.

Xu+Feng+PhotoXu Feng is a Senior Industry Solution Architect at AWS, responsible for designing, building, and promoting industry solutions for the Media & Entertainment and Advertising sectors, such as intelligent customer service and business intelligence. With 20 years of software industry experience, currently focused on researching and implementing generative AI and AI-powered data solutions.

Xu+Da+PhotoXu Da is a Amazon Web Services (AWS) Partner Solutions Architect based out of Shanghai, China. He has more than 25 years of experience in IT industry, software development and solution architecture. He is passionate about collaborative learning, knowledge sharing, and guiding community in their cloud technologies journey.

Build Write-Audit-Publish pattern with Apache Iceberg branching and AWS Glue Data Quality

Post Syndicated from Tomohiro Tanaka original https://aws.amazon.com/blogs/big-data/build-write-audit-publish-pattern-with-apache-iceberg-branching-and-aws-glue-data-quality/

Given the importance of data in the world today, organizations face the dual challenges of managing large-scale, continuously incoming data while vetting its quality and reliability. The importance of publishing only high-quality data can’t be overstated—it’s the foundation for accurate analytics, reliable machine learning (ML) models, and sound decision-making. Equally crucial is the ability to segregate and audit problematic data, not just for maintaining data integrity, but also for regulatory compliance, error analysis, and potential data recovery.

AWS Glue is a serverless data integration service that you can use to effectively monitor and manage data quality through AWS Glue Data Quality. Today, many customers build data quality validation pipelines using its Data Quality Definition Language (DQDL) because with static rules, dynamic rules, and anomaly detection capability, it’s fairly straightforward.

Apache Iceberg is an open table format that brings atomicity, consistency, isolation, and durability (ACID) transactions to data lakes, streamlining data management. One of its key features is the ability to manage data using branches. Each branch has its own lifecycle, allowing for flexible and efficient data management strategies.

This post explores robust strategies for maintaining data quality when ingesting data into Apache Iceberg tables using AWS Glue Data Quality and Iceberg branches. We discuss two common strategies to verify the quality of published data. We dive deep into the Write-Audit-Publish (WAP) pattern, demonstrating how it works with Apache Iceberg.

Strategy for managing data quality

When it comes to vetting data quality in streaming environments, two prominent strategies emerge: the dead-letter queue (DLQ) approach and the WAP pattern. Each strategy offers unique advantages and considerations.

  • The DLQ approach – Segregate problematic entries from high-quality data so that only clean data makes it into your primary dataset.
  • The WAP pattern – Using branches, segregate problematic entries from high-quality data so that only clean data is published in the main branch.

The DLQ approach

The DLQ strategy focuses on efficiently segregating high-quality data from problematic entries so that only clean data makes it into your primary dataset. Here’s how it works:

  1. As data streams in, it passes through a validation process
  2. Valid data is written directly to the table referred by downstream users
  3. Invalid or problematic data is redirected to a separate DLQ for later analysis and potential recovery

The following screenshot shows this flow.

bdb4341_0_1_dlq

Here are its advantages:

  • Simplicity – The DLQ approach is straightforward to implement, especially when there is only one writer
  • Low latency – Valid data is instantly available in the main branch for downstream consumers
  • Separate processing for invalid data – You can have dedicated jobs to process the DLQ for auditing and recovery purposes.

The DLQ strategy can present significant challenges in complex data environments. With multiple concurrent writers to the same Iceberg table, maintaining consistent DLQ implementation becomes difficult. This issue is compounded when different engines (for example, Spark, Trino, or Python) are used for writes because the DLQ logic may vary between them, making system maintenance more complex. Additionally, storing invalid data separately can lead to management overhead.

Additionally, for low-latency requirements, the processing validation step may introduce additional delays. This creates a challenge in balancing data quality with speed of delivery.

To solve those challenges in a reasonable way, we introduce the WAP pattern in the next section.

The WAP pattern

The WAP pattern implements a three-stage process:

  1. Write – Data is initially written to a staging branch
  2. Audit – Quality checks are performed on the staging branch
  3. Publish – Validated data is merged into the main branch for consumption

The following screenshot shows this flow.

bdb4341_0_2_wap

Here are its advantages:

  • Flexible data latency management – In the WAP pattern, the raw data is ingested to the staging branch without data validation, and then the high-quality data is ingested to the main branch with data validation. With this characteristic, there’s flexibility to achieve urgent, low-latency data handling on the staging branch and achieve high-quality data handling on the main branch.
  • Unified data quality management – The WAP pattern separates the audit and publish logic from the writer applications. It provides a unified approach to quality management, even with multiple writers or varying data sources. The audit phase can be customized and evolved without affecting the write or publish stages.

The primary challenge of the WAP pattern is the increased latency it introduces. The multistep process inevitably delays data availability for downstream consumers, which may be problematic for near real-time use cases. Furthermore, implementing this pattern requires more sophisticated orchestration compared to the DLQ approach, potentially increasing development time and complexity.

How the WAP pattern works with Iceberg

The following sections explore how the WAP pattern works with Iceberg.

Iceberg’s branching feature

Iceberg offers a branching feature for data lifecycle management, which is particularly useful for efficiently implementing the WAP pattern. The metadata of an Iceberg table stores a history of snapshots. These snapshots, created for each change to the table, are fundamental to concurrent access control and table versioning. Branches are independent histories of snapshots branched from another branch, and each branch can be referred to and updated separately.

When a table is created, it starts with only a main branch, and all transactions are initially written to it. You can create additional branches, such as an audit branch, and configure engines to write to them. Changes on one branch can be fast-forwarded to another branch using Spark’s fast_forward procedure, as shown in the following screenshot.

bdb4341_0_3_iceberg-branch

How to manage Iceberg branches

In this section, we cover the essential operations for managing Iceberg branches using SparkSQL. We’ll demonstrate how to use the branches, specifically, to create a new branch, write to and read from a specific branch, and set a default branch for a Spark session. These operations form the foundation for implementing the WAP pattern with Iceberg.

To create a branch, run the following SparkSQL query:

ALTER TABLE glue_catalog.db.tbl CREATE BRANCH audit

To specify a branch to be updated, use the glue_catalog.<database_name>.<table_name>.branch_<branch_name> syntax:

INSERT INTO glue_catalog.db.tbl.branch_audit VALUES (1, 'a'), (2, 'b');

To specify a branch to be queried, use the glue_catalog.<database_name>.<table_name>.branch_<branch_name> syntax:

SELECT * FROM glue_catalog.db.tbl.branch_audit;

To specify a branch for the entire Spark session scope, set the branch name to the Spark parameter spark.wap.branch. After this parameter is set, all queries will refer to the specified branch without explicit expression:

SET spark.wap.branch = audit

-- audit branch will be updated
INSERT INTO glue_catalog.db.tbl VALUES (3, 'c');

How to implement the WAP pattern with Iceberg branches

Using Iceberg’s branching feature, we can efficiently implement the WAP pattern with a single Iceberg table. Additionally, Iceberg characteristics such as ACID transactions and schema evolution are useful for handling multiple concurrent writers and varying data.

  1. Write – The data ingestion process switches branch from main and it commits updates to the audit branch, instead of the main branch. At this point, these updates aren’t accessible to downstream users who can only access the main branch.
  2. Audit – The audit process runs data quality checks on the data in the audit branch. It specifies which data is clean and ready to be provided.
  3. Publish – The audit process publishes validated data to the main branch with the Iceberg fast_forward procedure, making it available for downstream users.

This flow is shown in the following screenshot.

bdb4341_0_4_wap-w-iceberg-branch

By implementing the WAP pattern with Iceberg, we can obtain several advantages:

  • Simplicity – Iceberg branches can express multiple states of a table, such as audit and main, within one table. We can have unified data management even when handling multiple data contexts separately and uniformly.
  • Handling concurrent writers – Iceberg tables are ACID compliant, so consistent reads and writes are guaranteed even when multiple reader and writer processes run concurrently.
  • Schema evolution – If there are issues with the data being ingested, its schema may differ from the table definition. Spark supports dynamic schema merging for Iceberg tables. Iceberg tables can flexibly evolve their schema to write data with inconsistent schemas. By configuring the following parameters, when schema changes occur, new columns from the source are added to the target table with NULL values for existing rows. Columns present only in the target have their values set to NULL for new insertions or left unchanged during updates.
SET `spark.sql.iceberg.check-ordering` = false

ALTER TABLE glue_catalog.db.tbl SET TBLPROPERTIES (
    'write.spark.accept-any-schema'='true'
)
df.writeTo("glue_catalog.db.tbl").option("merge-schema","true").append()

As an intermediate wrap-up, the WAP pattern offers a robust approach to managing the balance between data quality and latency. With Iceberg branches, we can implement WAP pattern simply on single Iceberg table with handling concurrent writers and schema evolution.

Example use case

Suppose that a home monitoring system tracks room temperature and humidity. The system captures and sends the data to an Iceberg based data lake built on top of Amazon Simple Storage Service (Amazon S3). The data is visualized using matplotlib for interactive data analysis. For the system, issues such as device malfunctions or network problems can lead to partial or erroneous data being written, resulting in incorrect insights. In many cases, these issues are only detected after the data is sent to the data lake. Additionally, the correctness of such data is generally complicated.

To address these issues, the WAP pattern using Iceberg branches is applied for the system in this post. Through this approach, the incoming room data to the data lake is evaluated for quality before being visualized, and you make sure that only qualified room data is used for further data analysis. With the WAP pattern using the branches, you can achieve effective data management and promote data quality in downstream processes. The solution is demonstrated using AWS Glue Studio notebook, which is a managed Jupyter Notebook for interacting with Apache Spark.

Prerequisites

The following prerequisites are necessary for this use case:

Set up resources with AWS CloudFormation

First, you use a provided AWS CloudFormation template to set up resources to build Iceberg environments. The template creates the following resources:

  • An S3 bucket for metadata and data files of an Iceberg table
  • A database for the Iceberg table in AWS Glue Data Catalog
  • An AWS Identity and Access Management (IAM) role for an AWS Glue job

Complete the following steps to deploy the resources.

  1. Choose Launch stack.

Launch Button

  1. For the Parameters, IcebergDatabaseName is set by default. You can also change the default value. Then, choose Next.
  2. Choose Next.
  3. Choose I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  4. Choose Submit.
  5. After the stack creation is complete, check the Outputs The resource values are used in the following sections.

Next, configure the Iceberg JAR files to the session to use the Iceberg branch feature. Complete the following steps:

  1. Select the following JAR files from the Iceberg releases page and download these JAR files on your local machine:
    1. 1.6.1 Spark 3.3_with Scala 2.12 runtime Jar
    2. 1.6.1 aws-bundle Jar
  2. Open the Amazon S3 console and select the S3 bucket you created through the CloudFormation stack. The S3 bucket name can be found on the CloudFormation Outputs tab.
  3. Choose Create folder and create the jars path in the S3 bucket.
  4. Upload the two downloaded JAR files to s3://<IcebergS3Bucket>/jars/ from the S3 console.

Upload a Jupyter Notebook on AWS Glue Studio

After launching the CloudFormation stack, you create an AWS Glue Studio notebook to use Iceberg with AWS Glue. Complete the following steps.

  1. Download wap.ipynb.
  2. Open AWS Glue Studio console.
  3. Under Create job, select Notebook.
  4. Select Upload Notebook, choose Choose file, and upload the notebook you downloaded.
  5. Select the IAM role name, such as IcebergWAPGlueJobRole, that you created through the CloudFormation stack. Then, choose Create notebook.
  6. For Job name at the left top of the page, enter iceberg_wap.
  7. Choose Save.

Configure Iceberg branches

Start by creating an Iceberg table that contains a room temperature and humidity dataset. After creating the Iceberg table, create branches that are used for performing the WAP practice. Complete the following steps:

  1. On the Jupyter Notebook that you created in Upload a Jupyter Notebook on AWS Glue Studio, run the following cell to use Iceberg with Glue. %additional_python_modules pandas==2.2 is used to visualize the temperature and humidity data in the notebook with pandas. Before running the cell, replace <IcebergS3Bucket> with the S3 bucket name where you uploaded the Iceberg JAR files.

bdb4341_1_session-config

  1. Initialize the SparkSession by running the following cell. The first three settings, starting with spark.sql, are required to use Iceberg with Glue. The default catalog name is set to glue_catalog using spark.sql.defaultCatalog. The configuration spark.sql.execution.arrow.pyspark.enabled is set to true and is used for data visualization with pandas.

bdb4341_2_sparksession-init

  1. After the session is created (the notification Session <Session Id> has been created. will be displayed in the notebook), run the following commands to copy the temperature and humidity dataset to the S3 bucket you created through the CloudFormation stack. Before running the cell, replace <IcebergS3Bucket> with the name of the S3 bucket for Iceberg, which you can find on the CloudFormation Outputs tab.
!aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-4341/data/part-00000-fa08487a-43c2-4398-bae9-9cb912f8843c-c000.snappy.parquet s3://<IcebergS3Bucket>/src-data/current/ 
!aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-4341/data/new-part-00000-e8a06ab0-f33d-4b3b-bd0a-f04d366f067e-c000.snappy.parquet s3://<IcebergS3Bucket>/src-data/new/
  1. Configure the data source bucket name and path (DATA_SRC), Iceberg data warehouse path (ICEBERG_LOC), and database and table names for an Iceberg table (DB_TBL). Replace <IcebergS3Bucket> with the S3 bucket from the CloudFormation Outputs tab.
  2. Read the dataset and create the Iceberg table with the dataset using the Create Table As Select (CTAS) query.

bdb4341_3_ctas

  1. Run the following code to display the temperature and humidity data for each room in the Iceberg table. Pandas and matplotlib are used to visualize the data for each room. The data from 10:05 to 10:30 is displayed in the notebook, as shown in the following screenshot, with each room showing approximately 25°C for temperature (displayed as the blue line) and 52% for humidity (displayed as the orange line).
import matplotlib.pyplot as plt
import pandas as pd

CONF = [
    {'room_type': 'myroom', 'cols':['current_temperature', 'current_humidity']},
    {'room_type': 'living', 'cols':['current_temperature', 'current_humidity']},
    {'room_type': 'kitchen', 'cols':['current_temperature', 'current_humidity']}
]

fig, axes = plt.subplots(nrows=3, ncols=1, sharex=True, sharey=True)
for ax, conf in zip(axes.ravel(), CONF):
    df_room = spark.sql(f"""
        SELECT current_time, current_temperature, current_humidity, room_type
        FROM {DB_TBL} WHERE room_type = '{conf['room_type']}'
        ORDER BY current_time ASC
        """)
    pdf = df_room.toPandas()
    pdf.set_index(pdf['current_time'], inplace=True)
    plt.xlabel('time')
    plt.ylabel('temperature/humidity')
    plt.ylim(10, 60)
    plt.yticks([tick for tick in range(10, 60, 10)])
    pdf[conf['cols']].plot.line(ax=ax, grid=True, figsize=(8, 6), title=conf['room_type'], legend=False, marker=".", markersize=2, linewidth=0)

plt.legend(['temperature', 'humidity'], loc='center', bbox_to_anchor=(0, 1, 1, 5.5), ncol=2)

%matplot plt

bdb4341_4_vis-1

  1. You create Iceberg branches by running the following queries before writing data into the Iceberg table. You can create an Iceberg branch by the ALTER TABLE db.table CREATE BRANCH <branch_name> query.
ALTER TABLE iceberg_wap_db.room_data CREATE BRANCH stg
ALTER TABLE iceberg_wap_db.room_data CREATE BRANCH audit

Now, you’re ready to build the WAP pattern with Iceberg.

Build WAP pattern with Iceberg

Use the Iceberg branches created earlier to implement the WAP pattern. You start writing the newly incoming temperature and humidity data including erroneous values to the stg branch in the Iceberg table.

Write phase: Write incoming data into the Iceberg stg branch

To write the incoming data into the stg branch in the Iceberg table, complete the following steps:

  1. Run the following cell and write the data into Iceberg table.

bdb4341_5_write

  1. After the records are written, run the following code to visualize the current temperature and humidity data in the stg On the following screenshot, notice that new data was added after 10:30. The output shows incorrect readings, such as around 100°C for temperature between 10:35 and 10:52 in the living room.
fig, axes = plt.subplots(nrows=3, ncols=1, sharex=True, sharey=True)
for ax, conf in zip(axes.ravel(), CONF):
    df_room_stg = spark.sql(f"""
        SELECT current_time, current_temperature, current_humidity, room_type
        FROM {DB_TBL}.branch_stg WHERE room_type = '{conf['room_type']}'
        ORDER BY current_time ASC
        """)
    pdf = df_room_stg.toPandas()
    pdf.set_index(pdf['current_time'], inplace=True)
    plt.xlabel('time')
    plt.ylabel('temperature/humidity')
    plt.ylim(10, 110)
    plt.yticks([tick for tick in range(10, 110, 30)])
    pdf[conf['cols']].plot.line(ax=ax, grid=True, figsize=(8, 6), title=conf['room_type'], legend=False, marker=".", markersize=2, linewidth=0)

plt.legend(['temperature', 'humidity'], loc='center', bbox_to_anchor=(0, 1, 1, 5.5), ncol=2)

%matplot plt

bdb4341_6_vis-2

The new temperature data including erroneous records was written to the stg branch. This data isn’t visible to the downstream side because it hasn’t been published to the main branch. Next, you evaluate the data quality in the stg branch.

Audit phase: Evaluate the data quality in the stg branch

In this phase, you evaluate the quality of the temperature and humidity data in the stg branch using AWS Glue Data Quality. Then, the data that doesn’t meet the criteria is filtered out based on the data quality rules, and the qualified data is used to update the latest snapshot in the audit branch. Start with the data quality evaluation:

  1. Run the following code to evaluate the current data quality using AWS Glue Data Quality. The evaluation rule is defined in DQ_RULESET, where the normal temperature range is set between −10 and 50°C based on the device specifications. Any values out of this range are considered erroneous in this scenario.
from awsglue.context import GlueContext
from awsglue.transforms import SelectFromCollection
from awsglue.dynamicframe import DynamicFrame
from awsgluedq.transforms import EvaluateDataQuality
DQ_RULESET = """Rules = [ ColumnValues "current_temperature" between -10 and 50 ]"""


dyf = DynamicFrame.fromDF(
    dataframe=spark.sql(f"SELECT * FROM {DB_TBL}.branch_stg"),
    glue_ctx=GlueContext(spark.sparkContext),
    name='dyf')

dyfc_eval_dq = EvaluateDataQuality().process_rows(
    frame=dyf,
    ruleset=DQ_RULESET,
    publishing_options={
        "dataQualityEvaluationContext": "dyfc_eval_dq",
        "enableDataQualityCloudWatchMetrics": False,
        "enableDataQualityResultsPublishing": False,
    },
    additional_options={"performanceTuning.caching": "CACHE_NOTHING"},
)

# Show DQ results
dyfc_rule_outcomes = SelectFromCollection.apply(
    dfc=dyfc_eval_dq,
    key="ruleOutcomes")
dyfc_rule_outcomes.toDF().select('Outcome', 'FailureReason').show(truncate=False)
  1. The output shows the result of the evaluation. It displays Failed because some temperature data, such as 105°C, is out of the normal temperature range of −10 to 50°C.
+-------+------------------------------------------------------+
|Outcome|FailureReason                                         |
+-------+------------------------------------------------------+
|Failed |Value: 105.0 does not meet the constraint requirement!|
+-------+------------------------------------------------------+
  1. After the evaluation, filter out the incorrect temperature data in the stg branch, then update the latest snapshot in the audit branch with the valid temperature data.

bdb4341_7_write-to-audit

Through the data quality evaluation, the audit branch in the Iceberg table now contains the valid data, which is ready for downstream use.

Publish phase: Publish the valid data to the downstream side

To publish the valid data in the audit branch to main, complete the following steps:

  1. Run the fast_forward Iceberg procedure to publish the valid data in the audit branch to the downstream side.

bdb4341_8_publish

  1. After the procedure is complete, review the published data by querying the main branch in the Iceberg table to simulate the query from the downstream side.
fig, axes = plt.subplots(nrows=3, ncols=1, sharex=True, sharey=True)
for ax, conf in zip(axes.ravel(), CONF):
    df_room_main = spark.sql(f"""
        SELECT current_time, current_temperature, current_humidity, room_type
        FROM {DB_TBL} WHERE room_type = '{conf['room_type']}'
        ORDER BY current_time ASC
        """)
    pdf = df_room_main.toPandas()
    pdf.set_index(pdf['current_time'], inplace=True)
    plt.xlabel('time')
    plt.ylabel('temperature/humidity')
    plt.ylim(10, 60)
    plt.yticks([tick for tick in range(10, 60, 10)])
    pdf[conf['cols']].plot.line(ax=ax, grid=True, figsize=(8, 6), title=conf['room_type'], legend=False, marker=".", markersize=2, linewidth=0)

plt.legend(['temperature', 'humidity'], loc='center', bbox_to_anchor=(0, 1, 1, 5.5), ncol=2)

%matplot plt

The query result shows only the valid temperature and humidity data that has passed the data quality evaluation.

bdb4341_9_vis-3

In this scenario, you successfully managed data quality by applying the WAP pattern with Iceberg branches. The room temperature and humidity data, including any erroneous records, was first written to the staging branch for quality evaluation. This approach prevented erroneous data from being visualized and leading to incorrect insights. After the data was validated by AWS Glue Data Quality, only valid data was published to the main branch and visualized in the notebook. Using the WAP pattern with Iceberg branches, you can make sure that only validated data is passed to the downstream side for further analysis.

Clean up resources

To clean up the resources, complete the following steps:

  1. On the Amazon S3 console, select the S3 bucket aws-glue-assets-<ACCOUNT_ID>-<REGION> where the Notebook file (iceberg_wap.ipynb) is stored. Delete the Notebook file located in the notebook path.
  2. Select the S3 bucket you created through the CloudFormation template. You can obtain the bucket name from IcebergS3Bucket key on the CloudFormation Outputs tab. After selecting the bucket, choose Empty to delete all objects.
  3. After you confirm the bucket is empty, delete the CloudFormation stack iceberg-wap-baseline-resources.

Conclusion

In this post, we explored common strategies for maintaining data quality when ingesting data into Apache Iceberg tables. The step-by-step instructions demonstrated how to implement the WAP pattern with Iceberg branches. For use cases requiring data quality validation, the WAP pattern provides the flexibility to manage data latency even with concurrent writer applications without impacting downstream applications.


About the Authors

Tomohiro Tanaka is a Senior Cloud Support Engineer at Amazon Web Services. He’s passionate about helping customers use Apache Iceberg for their data lakes on AWS. In his free time, he enjoys a coffee break with his colleagues and making coffee at home.

Sotaro Hikita is a Solutions Architect. He supports customers in a wide range of industries, especially the financial industry, to build better solutions. He is particularly passionate about big data technologies and open source software.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Implement historical record lookup and Slowly Changing Dimensions Type-2 using Apache Iceberg

Post Syndicated from Tomohiro Tanaka original https://aws.amazon.com/blogs/big-data/implement-historical-record-lookup-and-slowly-changing-dimensions-type-2-using-apache-iceberg/

In today’s data-driven world, tracking and analyzing changes over time has become essential. As organizations process vast amounts of data, maintaining an accurate historical record is crucial. History management in data systems is fundamental for compliance, business intelligence, data quality, and time-based analysis. It enables organizations to maintain audit trails, perform trend analysis, identify data quality issues, and conduct point-in-time reporting. When combined with Change Data Capture (CDC), which identifies and captures database changes, history management becomes even more potent.

Common use cases for historical record management in CDC scenarios span various domains. In customer relationship management, it tracks changes in customer information over time. Financial systems use it for maintaining accurate transaction and balance histories. Inventory management benefits from historical data for analyzing sales patterns and optimizing stock levels. HR systems use it to track employee information changes. In fraud detection, historical data helps identify anomalous patterns in transactions or user behaviors.

This post will explore how to implement these functionalities using Apache Iceberg, focusing on Slowly Changing Dimensions (SCD) Type-2. This method creates new records for each data change while preserving old ones, thus maintaining a full history. By the end, you’ll understand how to use Apache Iceberg to manage historical records effectively on a typical CDC architecture.

Historical record lookup

How can we retrieve the history of given records? This is a fundamental question in data management, especially when dealing with systems that need to track changes over time. Let’s explore this concept with a practical example.

Consider a product (Heater) in an ecommerce database:

product_id product_name price
00001 Heater 250

Now, let’s say we update the price of this product from 250 to 500. After some time, we want to retrieve the price history of this heater. In a traditional database setup, this task could be challenging, especially if we haven’t explicitly designed our system to track historical changes.

This is where the concept of historical record lookup becomes crucial. We need a system that not only stores the current state of our data but also maintains a log of all changes made to each record over time. This allows us to answer questions like:

  • What was the price of the heater at a specific point in time?
  • How many times has the price changed, and when did these changes occur?
  • What was the price trend of the heater over the past year?

Implementing such a system can be complex, requiring careful consideration of data storage, retrieval mechanisms, and query optimization. This is where Apache Iceberg comes into play, offering a feature known as the change log view.

The change log view in Apache Iceberg provides a view of all changes made to a table over time, making it straightforward to query and analyze the history of any record. With change log view, we can easily track insertions, updates, and deletions, giving us a complete picture of how our data has evolved.

For our heater example, Iceberg’s change log view would allow us to effortlessly retrieve a timeline of all price changes, complete with timestamps and other relevant metadata, as shown in the following table.

product_id product_name price _change_type
00001 Heater 250 INSERT
00001 Heater 250 UPDATE_BEFORE
00001 Heater 500 UPDATE_AFTER

This capability not only simplifies historical analysis but also opens possibilities for advanced time-based analytics, auditing, and data governance.

Historical table lookup with SCD Type-2

SCD Type-2 is a key concept in data warehousing and historical data management and is particularly relevant to Change Data Capture (CDC) scenarios. SCD Type-2 creates new rows for changed data instead of overwriting existing records, allowing for comprehensive tracking of changes over time.

SCD Type-2 requires additional fields such as effective_start_date, effective_end_date, and current_flag to manage historical records. This approach has been widely used in data warehouses to track changes in various dimensions such as customer information, product details, and employee data. In the example of the previous section, here’s what the SCD Type-2 looks like assuming the update operation is performed on December 11, 2024.

product_id product_name price effective_start_date effective_end_date current_flag
00001 Heater 250 2024-12-10 2024-12-11 FALSE
00001 Heater 500 2024-12-11 NULL TRUE

SCD Type-2 is particularly valuable in CDC use cases, where capturing all data changes over time is crucial. It enables point-in-time analysis, provides detailed audit trails, aids in data quality management, and helps meet compliance requirements by preserving historical data.

In traditional implementations on data warehouses, SCD Type-2 requires its specific handling in all INSERT, UPDATE, and DELETE operations that affect those additional columns. For example, to update the price of the product, you need to run the following query.

UPDATE product SET effective_end_date = '2024-12-11', current_flag = false
WHERE product_id = '00001' AND current_flag = true;

INSERT INTO product (product_id, product_name, price, effective_start_date, effective_end_date, current_flag)
VALUES ('00001', 'Heater', 500, '2024-12-11', NULL, true);

For modern data lakes, we propose a new approach to implement SCD Type-2. With Iceberg, you can create a dedicated view of SCD Type-2 on top of the change log view, eliminating the need to implement specific handling to make changes on SCD Type-2 tables. With this approach, you can keep managing Iceberg tables without complexity considering SCD Type-2 specification. Anytime when you need SCD Type-2 snapshot of your Iceberg table, you can create the corresponding representation. This approach combines the power of Iceberg’s efficient data management with the historical tracking capabilities of SCD Type-2. By using the change log view, Iceberg can dynamically generate the SCD Type-2 structure without the overhead of maintaining additional tables or manually managing effective dates and flags.

This streamlined method not only makes the implementation of SCD Type-2 more straightforward, but also offers improved performance and scalability for handling large volumes of historical data in CDC scenarios. It represents a significant advancement in historical data management, merging traditional data warehousing concepts with modern big data capabilities.

As we delve deeper into Iceberg’s features, we’ll explore how this approach can be implemented, showcasing the efficiency and flexibility it brings to historical data analysis and CDC processes.

Prerequisites

The following prerequisites are required for the use cases:

Set up resources with AWS CloudFormation

Use a provided AWS CloudFormation template to set up resources to build Iceberg environments. The template creates the following resources:

Complete the following steps to deploy the resources.

  1. Choose Launch stack

Launch Button

  1. For the parameters, IcebergDatabaseName is set by default. You can change the default value. Then, choose Next.
  2. Choose Next
  3. Choose I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  4. Choose Submit.
  5. After the stack creation is complete, check the Outputs tab and make a note of the resource values, which are used in the following sections.

Next, configure the Iceberg JAR files to the session to use the Iceberg change log view feature. Complete the following steps.

  1. Select the following JAR files from the Iceberg releases page and download these JAR files on your local machine:
    1. 1.6.1 Spark 3.3_with Scala 2.12 runtime Jar.
    2. 1.6.1 aws-bundle Jar.
  2. Open the Amazon S3 console and select the S3 bucket you created using the CloudFormation stack. The S3 bucket name can be found on the CloudFormation Outputs tab.
  3. Choose Create folder and create the jars path in the S3 bucket.
  4. Upload the two downloaded JAR files on s3://<IcebergS3Bucket>/jars/ from the S3 console.

Upload a Jupyter Notebook on AWS Glue Studio

After launching the CloudFormation stack, create an AWS Glue Studio notebook to use Iceberg with AWS Glue.

  1. Download history.ipynb.
  2. Open AWS Glue Studio console.
  3. Under Create job, select Notebook.
  4. Select Upload Notebook, choose Choose file and upload the Notebook you downloaded.
  5. Select the IAM role name such as IcebergHistoryGlueJobRole that you created using the CloudFormation template. Then, choose Create notebook.

1_upload-notebook

  1. For Job name at the left top of the page, enter iceberg_history.
  2. Choose Save.

Create an Iceberg table

To create an Iceberg table using a product dataset, complete the following steps.

  1. On the Jupyter Notebook that you created in Upload a Jupyter Notebook on AWS Glue Studio, run the following cell to use Iceberg with AWS Glue. Before running the cell, replace <IcebergS3Bucket> with the S3 bucket name where you uploaded the Iceberg JAR files.

2_session-config

  1. Initialize the SparkSession with Iceberg settings.

3_ss-init

  1. Configure database and table names for an Iceberg table (DB_TBL) and data warehouse path (ICEBERG_LOC). Replace <IcebergS3Bucket> with the S3 bucket from the CloudFormation Outputs tab.
  2. Run the following code to create the Iceberg table using the Spark DataFrame based on the product dataset.
from pyspark.sql import Row
import time
ut = time.time()
product = [
    {'product_id': '00001', 'product_name': 'Heater', 'price': 250, 'category': 'Electronics', 'updated_at': ut},
    {'product_id': '00002', 'product_name': 'Thermostat', 'price': 400, 'category': 'Electronics', 'updated_at': ut},
    {'product_id': '00003', 'product_name': 'Television', 'price': 600, 'category': 'Electronics', 'updated_at': ut},
    {'product_id': '00004', 'product_name': 'Blender', 'price': 100, 'category': 'Electronics', 'updated_at': ut},
    {'product_id': '00005', 'product_name': 'USB charger', 'price': 50, 'category': 'Electronics', 'updated_at': ut}
]
df_products = spark.createDataFrame(Row(**x) for x in product)
df_products.createOrReplaceTempView('tmp')

spark.sql(f"""
CREATE TABLE {DB_TBL} USING iceberg LOCATION '{ICEBERG_LOC}'
AS SELECT * FROM tmp
""")
  1. After creating the Iceberg table, run SELECT * FROM iceberg_history_db.products ORDER BY product_id to show the product data in the Iceberg table. Currently the following five products are stored in the Iceberg table.
+----------+------------+-----+-----------+--------------------+
|product_id|product_name|price|   category|          updated_at|
+----------+------------+-----+-----------+--------------------+
|     00001|      Heater|  250|Electronics|1.7297845122056053E9|
|     00002|  Thermostat|  400|Electronics|1.7297845122056053E9|
|     00003|  Television|  600|Electronics|1.7297845122056053E9|
|     00004|     Blender|  100|Electronics|1.7297845122056053E9|
|     00005| USB charger|   50|Electronics|1.7297845122056053E9|
+----------+------------+-----+-----------+--------------------+

Next, look up the historical changes for a product using Iceberg’s change log view feature.

Implement historical record lookup with Iceberg’s change log view

Suppose that there’s a source table whose table records are replicated to the Iceberg table through a Change Data Capture (CDC) process. When the records in the source table are updated, these changes are then mirrored in the Iceberg table. In this section, you look up the history of a given record for such a system to capture the history of product updates. For example, the following updates occur in the source table. Through the CDC process, these changes are applied to the Iceberg table.

  • Upsert (update and insert) the two records:
    • The price of Heater (product_id: 00001) is updated from 250 to 500.
    • A new product Chair (product_id: 00006) is added.
  • Television (product_id: 00003) is deleted.

To simulate the CDC workflow, you manually apply these changes to the Iceberg table in the notebook.

  1. Use the MERGE INTO query to upsert records. If an input record in the Spark DataFrame has the same product_id as an existing record, the existing record is updated. If no matching product_id is found, the input record is inserted into the Iceberg table.

4-merge-into

  1. Delete Television from the Iceberg table by running the DELETE query.
DELETE FROM iceberg_history_db.products WHERE product_id = '00003'
  1. Then, run SELECT * FROM iceberg_history_db.products ORDER BY product_id to show the product data in the Iceberg table. You can confirm that the price of Heater is updated to 500, Chair is added and Television is deleted.
+----------+------------+-----+-----------+--------------------+
|product_id|product_name|price|   category|          updated_at|
+----------+------------+-----+-----------+--------------------+
|     00001|      Heater|  500|Electronics|    1.729790106579E9|
|     00002|  Thermostat|  400|Electronics|1.7297845122056053E9|
|     00004|     Blender|  100|Electronics|1.7297845122056053E9|
|     00005| USB charger|   50|Electronics|1.7297845122056053E9|
|     00006|       Chair|   50|  Furniture|    1.729790106579E9|
+----------+------------+-----+-----------+--------------------+

For the Iceberg table, where changes from the source table are replicated, you can track the record changes using Iceberg’s change log view. To start, you first create a change log view from the Iceberg table.

  1. Run the create_changelog_view Iceberg procedure to create a change log view.

5-clv

  1. Run the following query to retrieve the historical changes for Heater.
SELECT product_id, product_name, price, category, updated_at, _change_type
FROM products_clv WHERE product_id = '00001'
ORDER BY _change_ordinal, _change_type DESC
  1. The query result shows the historical changes to Heater. You can confirm that the price of Heater was updated from 250 to 500 from the output.
+----------+------------+-----+-----------+--------------------+-------------+
|product_id|product_name|price|   category|          updated_at| _change_type|
+----------+------------+-----+-----------+--------------------+-------------+
|     00001|      Heater|  250|Electronics|1.7297902833360643E9|       INSERT|
|     00001|      Heater|  250|Electronics|1.7297902833360643E9|UPDATE_BEFORE|
|     00001|      Heater|  500|Electronics|1.7297903836233025E9| UPDATE_AFTER|
+----------+------------+-----+-----------+--------------------+-------------+

Using Iceberg’s change log view, you can obtain the history of a given record directly from the Iceberg table’s history, without needing to create a separate table for managing record history. Next, you implement Slowly Changing Dimension (SCD) Type-2 using the change log view.

Implement SCD Type-2 with Iceberg’s change log view

The SCD Type-2 based table retains the full history of record changes and it can be used in multiple cases such as historical tracking, point-in-time analysis, regulatory compliance, and so on. In this section, you implement SCD Type-2 using the change log view (products_clv) that was created in the previous section. The change log view has a schema that’s similar to the schema defined in the SCD Type-2 specifications. For this change log view, you add effective_start, effective_end, and is_current columns. To add these columns and then implement SCD Type-2, complete the following steps.

  1. Run the following query to implement SCD Type-2. In the WITH AS (...) section of the query, the change log view is merged with the Iceberg table snapshots using the snapshot_id key to include the commit time for each record change. You can obtain the table snapshots by querying for db.table.snapshots. The other part in the query identifies both current and non-current entries by comparing the commit times for each product. It then sets the effective time for each product, and marks whether a product is current or not based on the effective time and the change type from the change log view.
WITH clv_snapshots AS (
    SELECT
        clv.*,
        s.snapshot_id,
        s.committed_at,
        s.committed_at as effective_start
    FROM products_clv clv
    JOIN iceberg_history_db.products.snapshots s
    ON clv._commit_snapshot_id = s.snapshot_id
) 
SELECT
    product_id, 
    product_name, 
    price, 
    category, 
    updated_at,
    effective_start,
    CASE
        WHEN effective_start != l_part_committed_at 
            OR _change_type = 'UPDATE_BEFORE' THEN l_part_committed_at
        ELSE CAST(null as timestamp)
    END as effective_end,
    CASE
        WHEN effective_start != l_part_committed_at
            OR _change_type = 'UPDATE_BEFORE' 
            OR _change_type = 'DELETE' THEN CAST(false as boolean)
        ELSE CAST(true as boolean)
    END as is_current
FROM (SELECT *, MAX(committed_at) OVER (PARTITION BY product_id, updated_at) as l_part_committed_at FROM clv_snapshots)
WHERE _change_type != 'UPDATE_BEFORE'
ORDER BY product_id,  _change_ordinal
  1. The query result shows the SCD Type-2 based schema and records.

7-output

After the query result is displayed, this SCD Type-2 based table is stored as scdt2 to allow access for further analysis.

SCD Type-2 is useful for many use cases. To explore how this SCD Type-2 implementation can be used to track the history of table records, run the following example queries.

  1. Run the following query to retrieve deleted or updated records in a specific period. This query captures which records were changed during that timeframe, allowing you to audit changes for further use-cases such as trend analysis, regulatory compliance checks, and so on. Before running the query, replace <START_DATETIME> and <END_DATETIME> with specific time ranges such as 2024-10-24 17:18:00 and 2024-10-24 17:20:00.
SELECT product_id, product_name, price, category, updated_at, effective_start, effective_end, is_current 
FROM scdt2 WHERE product_id IN ( SELECT product_id FROM scdt2 
WHERE (_change_type = 'DELETE' or _change_type = 'UPDATE_AFTER') 
AND effective_start BETWEEN '<START_DATETIME>' AND '<END_DATETIME>') 
ORDER BY product_id, effective_start
  1. The query result shows the deleted and updated records in the specified period. You can confirm that the price of Heater was updated and Television was deleted from the table.
+----------+------------+-----+-----------+--------------------+--------------------+--------------------+----------+
|product_id|product_name|price|   category|          updated_at|     effective_start|       effective_end|is_current|
+----------+------------+-----+-----------+--------------------+--------------------+--------------------+----------+
|     00001|      Heater|  250|Electronics|1.7297902833360643E9|2024-10-24 17:18:...|2024-10-24 17:19:...|     false|
|     00001|      Heater|  500|Electronics|1.7297903836233025E9|2024-10-24 17:19:...|                null|      true|
|     00003|  Television|  600|Electronics|1.7297902833360643E9|2024-10-24 17:18:...|2024-10-24 17:19:...|     false|
|     00003|  Television|  600|Electronics|1.7297902833360643E9|2024-10-24 17:19:...|                null|     false|
+----------+------------+-----+-----------+--------------------+--------------------+--------------------+----------+
  1. As another example, run the following query to retrieve the latest records at a specific point in time from the SCD Type-2 table by filtering with is_current = true for current data reporting.
SELECT product_id, product_name, price, category, updated_at
FROM scdt2 WHERE is_current = true ORDER BY product_id
  1. The query result shows the current table records, reflecting the updated price of Heater, the deletion of Television, and the addition of Chair after the initial records.
+----------+------------+-----+-----------+--------------------+
|product_id|product_name|price|   category|          updated_at|
+----------+------------+-----+-----------+--------------------+
|     00001|      Heater|  500|Electronics|1.7297903836233025E9|
|     00002|  Thermostat|  400|Electronics|1.7297902833360643E9|
|     00004|     Blender|  100|Electronics|1.7297902833360643E9|
|     00005| USB charger|   50|Electronics|1.7297902833360643E9|
|     00006|       Chair|   50|  Furniture|1.7297903836233025E9|
+----------+------------+-----+-----------+--------------------+

You have now successfully implemented SCD Type-2 using the change log view. This SCD Type-2 implementation allows you to track the history of table records. For example, you can use it to search for deleted or updated products such as Heater and Chair in a specific period. Additionally, you can retrieve the current table records by querying the SCD Type-2 table with is_current = true. Using Iceberg’s change log view enables you to implement SCD Type-2 without making any changes to the Iceberg table itself. It also eliminates the need for creating or managing an additional table for SCD Type-2.

Clean up

To clean up the resources used in this post, complete the following steps:

  1. Open the Amazon S3 console
  2. Select the S3 bucket aws-glue-assets-<ACCOUNT_ID>-<REGION> where the Notebook file (iceberg_history.ipynb) is stored. Delete the Notebook file that’s in the notebook path.
  3. Select the S3 bucket you created using the CloudFormation template. You can obtain the bucket name from IcebergS3Bucket key on the CloudFormation Outputs tab. After selecting the bucket, choose Empty to delete all objects
  4. After you confirm the bucket is empty, delete the CloudFormation stack iceberg-history-baseline-resources.

Considerations

Here are important considerations:

Conclusion

In this post, we have explored how to look up the history of records and tables using Apache Iceberg. The instruction demonstrated how to use change log view to look up the history of the records, and also the history of the tables with SCD Type-2. With this method, you can manage the history of records and tables without extra effort.


About the Authors

Tomohiro Tanaka is a Senior Cloud Support Engineer at Amazon Web Services. He’s passionate about helping customers use Apache Iceberg for their data lakes on AWS. In his free time, he enjoys a coffee break with his colleagues and making coffee at home.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Enforce fine-grained access control on data lake tables using AWS Glue 5.0 integrated with AWS Lake Formation

Post Syndicated from Sakti Mishra original https://aws.amazon.com/blogs/big-data/enforce-fine-grained-access-control-on-data-lake-tables-using-aws-glue-5-0-integrated-with-aws-lake-formation/

AWS Glue 5.0 supports fine-grained access control (FGAC) based on your policies defined in AWS Lake Formation. FGAC enables you to granularly control access to your data lake resources at the table, column, and row levels. This level of control is essential for organizations that need to comply with data governance and security regulations, or those that deal with sensitive data.

Lake Formation makes it straightforward to build, secure, and manage data lakes. It allows you to define fine-grained access controls through grant and revoke statements, similar to those used with relational database management systems (RDBMS), and automatically enforce those policies using compatible engines like Amazon Athena, Apache Spark on Amazon EMR, and Amazon Redshift Spectrum. With AWS Glue 5.0, the same Lake Formation rules that you set up for use with other services like Athena now apply to your AWS Glue Spark jobs and Interactive Sessions through built-in Spark SQL and Spark DataFrames. This simplifies security and governance of your data lakes.

This post demonstrates how to enforce FGAC on AWS Glue 5.0 through Lake Formation permissions.

How FGAC works on AWS Glue 5.0

Using AWS Glue 5.0 with Lake Formation lets you enforce a layer of permissions on each Spark job to apply Lake Formation permissions control when AWS Glue runs jobs. AWS Glue uses Spark resource profiles to create two profiles to effectively run jobs. The user profile runs user-supplied code, and the system profile enforces Lake Formation policies. For more information, see the AWS Lake Formation Developer Guide.

The following diagram demonstrates a high-level overview of how AWS Glue 5.0 gets access to data protected by Lake Formation permissions.

The workflow consists of the following steps:

  1. A user calls the StartJobRun API on a Lake Formation enabled AWS Glue job.
  2. AWS Glue sends the job to a user driver and runs the job in the user profile. The user driver runs a lean version of Spark that has no ability to launch tasks, request executors, or access Amazon Simple Storage Service (Amazon S3) or the AWS Glue Data Catalog. It builds a job plan.
  3. AWS Glue sets up a second driver called the system driver and runs it in the system profile (with a privileged identity). AWS Glue sets up an encrypted TLS channel between the two drivers for communication. The user driver uses the channel to send the job plans to the system driver. The system driver doesn’t run user-submitted code. It runs full Spark and communicates with Amazon S3 and the Data Catalog for data access. It requests executors and compiles the Job Plan into a sequence of execution stages.
  4. AWS Glue then runs the stages on executors with the user driver or system driver. The user code in any stage is run exclusively on user profile executors.
  5. Stages that read data from Data Catalog tables protected by Lake Formation or those that apply security filters are delegated to system executors.

Enable FGAC on AWS Glue 5.0

To enable Lake Formation FGAC for your AWS Glue 5.0 jobs on the AWS Glue console, complete the following steps:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Choose your job.
  3. Choose the Job details
  4. For Glue version, choose Glue 5.0 – Supports spark 3.5, Scala 2, Python 3.
  5. For Job parameters, add following parameter:
    1. Key: --enable-lakeformation-fine-grained-access
    2. Value: true
  6. Choose Save.

To enable Lake Formation FGAC for your AWS Glue notebooks on the AWS Glue console, use %%configure magic:

%glue_version 5.0
%%configure
{
    "--enable-lakeformation-fine-grained-access": "true"
}

Example use case

The following diagram represents the high-level architecture of the use case we demonstrate in this post. The objective of the use case is to showcase how can you enforce Lake Formation FGAC on both CSV and Iceberg tables and configure an AWS Glue PySpark job to read from them.

The implementation consists of the following steps:

  1. Create an S3 bucket and upload the input CSV dataset.
  2. Create a standard Data Catalog table and an Iceberg table by reading data from the input CSV table, using an Athena CTAS query.
  3. Use Lake Formation to enable FGAC on both CSV and Iceberg tables using row- and column-based filters.
  4. Run two sample AWS Glue jobs to showcase how you can run a sample PySpark script in AWS Glue that respects the Lake Formation FGAC permissions, and then write the output to Amazon S3.

To demonstrate the implementation steps, we use sample product inventory data that has the following attributes:

  • op – The operation on the source record. This shows values I to represent insert operations, U to represent updates, and D to represent deletes.
  • product_id – The primary key column in the source database’s products table.
  • category – The product’s category, such as Electronics or Cosmetics.
  • product_name – The name of the product.
  • quantity_available – The quantity available in the inventory for a product.
  • last_update_time – The time when the product record was updated at the source database.

To implement this workflow, we create AWS resources such as an S3 bucket, define FGAC with Lake Formation, and build AWS Glue jobs to query those tables.

Prerequisites

Before you get started, make sure you have the following prerequisites:

  • An AWS account with AWS Identity and Access Management (IAM) roles as needed.
  • The required permissions to perform the following actions:
    • Read or write to an S3 bucket.
    • Create and run AWS Glue crawlers and jobs.
    • Manage Data Catalog databases and tables.
    • Manage Athena workgroups and run queries.
  • Lake Formation already set up in the account and a Lake Formation administrator role or a similar role to follow along with the instructions in this post. To learn more about setting up permissions for a data lake administrator role, see Create a data lake administrator.

For this post, we use the eu-west-1 AWS Region, but you can integrate it in your preferred Region if the AWS services included in the architecture are available in that Region.

Next, let’s dive into the implementation steps.

Create an S3 bucket

To create an S3 bucket for the raw input datasets and Iceberg table, complete the following steps:

  1. On the Amazon S3 console, choose Buckets in the navigation pane.
  2. Choose Create bucket.
  3. Enter the bucket name (for example, glue5-lf-demo-${AWS_ACCOUNT_ID}-${AWS_REGION_CODE}), and leave the remaining fields as default.
  4. Choose Create bucket.
  5. On the bucket details page, choose Create folder.
  6. Create two subfolders: raw-csv-input and iceberg-datalake.
  7. Upload the LOAD00000001.csv file into the raw-csv-input folder of the bucket.

Create tables

To create input and output tables in the Data Catalog, complete the following steps:

  1. On the Athena console, navigate to the query editor.
  2. Run the following queries in sequence (provide your S3 bucket name):
    -- Create database for the demo
    CREATE DATABASE glue5_lf_demo;
    
    -- Create external table in input CSV files. Replace the S3 path with your bucket name
    CREATE EXTERNAL TABLE glue5_lf_demo.raw_csv_input(
     op string, 
     product_id bigint, 
     category string, 
     product_name string, 
     quantity_available bigint, 
     last_update_time string)
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
    STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' 
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION 's3://<bucket-name>/raw-csv-input/'
    TBLPROPERTIES (
      'areColumnsQuoted'='false', 
      'classification'='csv', 
      'columnsOrdered'='true', 
      'compressionType'='none', 
      'delimiter'=',', 
      'typeOfData'='file');
     
    -- Create output Iceberg table with partitioning. Replace the S3 bucket name with your bucket name
    CREATE TABLE glue5_lf_demo.iceberg_datalake WITH (
      table_type='ICEBERG',
      format='parquet',
      write_compression = 'SNAPPY',
      is_external = false,
      partitioning=ARRAY['category', 'bucket(product_id, 16)'],
      location='s3://<bucket-name>/iceberg-datalake/'
    ) AS SELECT * FROM glue5_lf_demo.raw_csv_input;

  3. Run the following query to validate the raw CSV input data:
    SELECT * FROM glue5_lf_demo.raw_csv_input;

The following screenshot shows the query result.

  1. Run the following query to validate the Iceberg table data:
    SELECT * FROM glue5_lf_demo.iceberg_datalake;

The following screenshot shows the query result.

This step used DDL to create table definitions. Alternatively, you can use a Data Catalog API, the AWS Glue console, the Lake Formation console, or an AWS Glue crawler.

Next, let’s configure Lake Formation permissions on the raw_csv_input table and iceberg_datalake table.

Configure Lake Formation permissions

To validate the capability, let’s define FGAC permissions for the two Data Catalog tables we created.

For the raw_csv_input table, we enable permission for specific rows, for example allow read access only for the Furniture category. Similarly, for the iceberg_datalake table, we enable a data filter for the Electronics product category and limit read access to a few columns only.

To configure Lake Formation permissions for the two tables, complete the following steps:

  1. On the Lake Formation console, choose Data lake locations under Administration in the navigation pane.
  2. Choose Register location.
  3. For Amazon S3 path, enter the path of your S3 bucket to register the location.
  4. For IAM role, choose your Lake Formation data access IAM role, which is not a service linked role.
  5. For Permission mode, select Lake Formation.
  6. Choose Register location.

Grant table permissions on the standard table

The next step is to grant table permissions on the raw_csv_input table to the AWS Glue job role.

  1. On the Lake Formation console, choose Data lake permissions under Permissions in the navigation pane.
  2. Choose Grant.
  3. For Principals, choose IAM users and roles.
  4. For IAM users and roles, choose your IAM role that is going to be used on an AWS Glue job.
  5. For LF-Tags or catalog resources, choose Named Data Catalog resources.
  6. For Databases, choose glue5_lf_demo.
  7. For Tables, choose raw_csv_input.
  8. For Data filters, choose Create new.
  9. In the Create data filter dialog, provide the following information:
    1. For Data filter name, enter product_furniture.
    2. For Column-level access, select Access to all columns.
    3. Select Filter rows.
    4. For Row filter expression, enter category='Furniture'.
    5. Choose Create filter.
  1. For Data filters, select the filter product_furniture you created.
  2. For Data filter permissions, choose Select and Describe.
  3. Choose Grant.

Grant permissions on the Iceberg table

The next step is to grant table permissions on the iceberg_datalake table to the AWS Glue job role.

  1. On the Lake Formation console, choose Data lake permissions under Permissions in the navigation pane.
  2. Choose Grant.
  3. For Principals, choose IAM users and roles.
  4. For IAM users and roles, choose your IAM role that is going to be used on an AWS Glue job.
  5. For LF-Tags or catalog resources, choose Named Data Catalog resources.
  6. For Databases, choose glue5_lf_demo.
  7. For Tables, choose iceberg_datalake.
  8. For Data filters, choose Create new.
  9. In the Create data filter dialog, provide the following information:
    1. For Data filter name, enter product_electronics.
    2. For Column-level access, select Include columns.
    3. For Included columns, choose category, last_update_time, op, product_name, and quantity_available.
    4. Choose Filter rows.
    5. For Row filter expression, enter category='Electronics'.
    6. Choose Create filter.
  10. For Data filters, select the filter product_electronics you created.
  11. For Data filter permissions, choose Select and Describe.
  12. Choose

Next, let’s create the AWS Glue PySpark job to process the input data.

Query the standard table through an AWS Glue 5.0 job

Complete the following steps to create an AWS Glue job to load data from the raw_csv_input table:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. For Create job, choose Script Editor.
  3. For Engine, choose Spark.
  4. For Options, choose Start fresh.
  5. Choose Create script.
  6. For Script, use the following code, providing your S3 output path. This example script writes the output in Parquet format; you can change this according to your use case.
    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder.getOrCreate()
    
    # Read from raw CSV table
    df = spark.sql("SELECT * FROM glue5_lf_demo.raw_csv_input")
    df.show()
    
    # Write to your preferred location.
    df.write.mode("overwrite").parquet("s3://<s3_output_path>")

  7. On the Job details tab, for Name, enter glue5-lf-demo.
  8. For IAM Role, assign an IAM role that has the required permissions to run an AWS Glue job and read and write to the S3 bucket.
  9. For Glue version, choose Glue 5.0 – Supports spark 3.5, Scala 2, Python 3.
  10. For Job parameters, add following parameter:
    1. Key: --enable-lakeformation-fine-grained-access
    2. Value: true
  1. Choose Save and then Run.
  2. When the job is complete, on the Run details tab at the bottom of job runs, choose Output logs.

You’re redirected to the Amazon CloudWatch console to validate the output.

The printed table is shown in the following screenshot. Only two records were returned because they are Furniture category products.

Query the Iceberg table through an AWS Glue 5.0 job

Next, complete the following steps to create an AWS Glue job to load data from the iceberg_datalake table:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. For Create job, choose Script Editor.
  3. For Engine, choose Spark.
  4. For Options, choose Start fresh.
  5. Choose Create script.
  6. For Script, replace the following parameters:
    1. Replace aws_region with your Region.
    2. Replace aws_account_id with your AWS account ID.
    3. Replace warehouse_path with your S3 warehouse path for the Iceberg table.
    4. Replace <s3_output_path> with your S3 output path.

This example script writes the output in Parquet format; you can change it according to your use case.

from pyspark.context import SparkContext
from pyspark.sql import SparkSession

catalog_name = "spark_catalog"
aws_region = "eu-west-1"
aws_account_id = "123456789012"
warehouse_path = "s3://<bucket-name>/warehouse"

# Create Spark Session with Iceberg Configurations
spark = SparkSession.builder \
    .config(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkSessionCatalog") \
    .config(f"spark.sql.catalog.{catalog_name}.warehouse", f"{warehouse_path}") \
    .config(f"spark.sql.catalog.{catalog_name}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config(f"spark.sql.catalog.{catalog_name}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config(f"spark.sql.catalog.{catalog_name}.client.region", f"{aws_region}") \
    .config(f"spark.sql.catalog.{catalog_name}.glue.account-id", f"{aws_account_id}") \
    .getOrCreate()

# Read from Iceberg table
df = spark.sql(f"SELECT * FROM {catalog_name}.glue5_lf_demo.iceberg_datalake")
df.show()

# Write to your preferred location.
df.write.mode("overwrite").parquet("s3://<s3_output_path>")
  1. On the Job details tab, for Name, enter glue5-lf-demo-iceberg.
  2. For IAM Role, assign an IAM role that has the required permissions to run an AWS Glue job and read and write to the S3 bucket.
  3. For Glue version, choose Glue 5.0 – Supports spark 3.5, Scala 2, Python 3.
  4. For Job parameters, add following parameters:
    1. Key: --enable-lakeformation-fine-grained-access
    2. Value: true
    3. Key: --datalake-formats
    4. Value: iceberg
  5. Choose Save and then Run.
  6. When the job is complete, on the Run details tab, choose Output logs.

You’re redirected to the CloudWatch console to validate the output.

The printed table is shown in the following screenshot. Only two records were returned because they are Electronics category products, and the product_id column is excluded.

You are now able to verify that records of the table raw_csv_input and the table iceberg_datalake are successfully retrieved with configured Lake Formation data cell filters.

Clean up

Complete the following steps to clean up your resources:

  1. Delete the AWS Glue jobs glue5-lf-demo and glue5-lf-demo-iceberg.
  2. Delete the Lake Formation permissions.
  3. Delete the output files written to the S3 bucket.
  4. Delete the bucket you created for the input datasets, which might have a name similar to glue5-lf-demo-${AWS_ACCOUNT_ID}-${AWS_REGION_CODE}.

Conclusion

This post explained how you can enable Lake Formation FGAC in AWS Glue jobs and notebooks that will enforce access control defined using Lake Formation grant commands. Previously, you needed to integrate AWS Glue DynamicFrames to enforce FGAC in AWS Glue jobs, but with this release, you can enforce FGAC through Spark DataFrame or Spark SQL. This capability also works not only with standard file formats like CSV, JSON, and Parquet but also with Apache Iceberg.

This feature can save you effort and encourage portability while migrating Spark scripts to different serverless environments such as AWS Glue and Amazon EMR.


About the Authors

Sakti Mishra is a Principal Solutions Architect at AWS, where he helps customers modernize their data architecture and define end-to end-data strategies, including data security, accessibility, governance, and more. He is also the author of Simplify Big Data Analytics with Amazon EMR and AWS Certified Data Engineer Study Guide. Outside of work, Sakti enjoys learning new technologies, watching movies, and visiting places with family. He can be reached via LinkedIn.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is also the author of the book Serverless ETL and Analytics with AWS Glue. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Matt Su is a Senior Product Manager on the AWS Glue team. He enjoys helping customers uncover insights and make better decisions using their data with AWS Analytics services. In his spare time, he enjoys skiing and gardening.

Layth Yassin is a Software Development Engineer on the AWS Glue team. He’s passionate about tackling challenging problems at a large scale, and building products that push the limits of the field. Outside of work, he enjoys playing/watching basketball, and spending time with friends and family.

Use open table format libraries on AWS Glue 5.0 for Apache Spark

Post Syndicated from Sotaro Hikita original https://aws.amazon.com/blogs/big-data/use-open-table-format-libraries-on-aws-glue-5-0-for-apache-spark/

Open table formats are emerging in the rapidly evolving domain of big data management, fundamentally altering the landscape of data storage and analysis. These formats, exemplified by Apache Iceberg, Apache Hudi, and Delta Lake, addresses persistent challenges in traditional data lake structures by offering an advanced combination of flexibility, performance, and governance capabilities. By providing a standardized framework for data representation, open table formats break down data silos, enhance data quality, and accelerate analytics at scale.

As organizations grapple with exponential data growth and increasingly complex analytical requirements, these formats are transitioning from optional enhancements to essential components of competitive data strategies. Their ability to resolve critical issues such as data consistency, query efficiency, and governance renders them indispensable for data- driven organizations. The adoption of open table formats is a crucial consideration for organizations looking to optimize their data management practices and extract maximum value from their data.

In earlier posts, we discussed AWS Glue 5.0 for Apache Spark. In this post, we highlight notable updates on Iceberg, Hudi, and Delta Lake in AWS Glue 5.0.

Apache Iceberg highlights

AWS Glue 5.0 supports Iceberg 1.6.1. We highlight its notable updates in this section. For more details, refer to Iceberg Release 1.6.1.

Branching

Branches are independent lineage of snapshot history that point to the head of each lineage. These are useful for flexible data lifecycle management. An Iceberg table’s metadata stores a history of snapshots, which are updated with each transaction. Iceberg implements features such as table versioning and concurrency control through the lineage of these snapshots. To expand an Iceberg table’s lifecycle management, you can define branches that stem from other branches. Each branch has an independent snapshot lifecycle, allowing separate referencing and updating.

When an Iceberg table is created, it has only a main branch, which is created implicitly. All transactions are initially written to this branch. You can create additional branches, such as an audit branch, and configure engines to write to them. Changes on one branch can be fast-forwarded to another branch using Spark’s fast_forward procedure.

The following diagram illustrates this setup.

To create a new branch, use the following query:

ALTER TABLE glue_catalog.<database_name>.<table_name> CREATE BRANCH <branch_name>;

After creating a branch, you can run queries on the data in the branch by specifying branch_<branch_name>. To write data to a specific branch, use the following query:

INSERT INTO glue_catalog.<database_name>.<table_name>.branch_<branch_name>
    VALUES (1, 'a'), (2, 'b');

To query a specific branch, use the following query:

SELECT * FROM glue_catalog.<database_name>.<table_name>.branch_<branch_name>;

You can run the fast_forward procedure to publish the sample table data from the audit branch into the main branch using the following query:

CALL glue_catalog.system.fast_forward(
    table => 'db.table',
    branch => 'main',
    to => 'audit')

Tagging

Tags are logical pointers to specific snapshot IDs, useful for managing important historical snapshots for business purposes. In Iceberg tables, new snapshots are created for each transaction, and you can query historical snapshots using time travel queries by specifying either a snapshot ID or timestamp. However, because snapshots are created for every transaction, it can be challenging to distinguish the important ones. Tags help address this by allowing you to point to specific snapshots with arbitrary names.

For example, you can set event tag for snapshot 2 with the following code:

ALTER TABLE glue_catalog.db.sample CREATE TAG `event` AS OF VERSION 2

You can query to the tagged snapshot by using the following code:

SELECT * FROM glue_catalog.<database_name>.<table_name>.tag_<tagname>;

Lifecycle management with branching and tagging

Branching and tagging are useful for flexible table maintenance with the independent snapshot lifecycle management configuration. When data changes in an Iceberg table, each modification is preserved as a new snapshot. Over time, this creates multiple data files and metadata files as changes accumulate. Although these files are essential for Iceberg features like time travel queries, maintaining too many snapshots can increase storage costs. Additionally, they can impact query performance due to the overhead of handling large amounts of metadata. Therefore, organizations should plan regular deletion for snapshots no longer needed.

The AWS Glue Data Catalog addresses these challenges through its managed storage optimization feature. Its optimization job automatically deletes snapshots based on two configurable parameters: the number of snapshots to retain and the maximum days to keep snapshots. Importantly, you can set independent lifecycle policies for both branches and tagged snapshots.

For branches, you can control the maximum days to keep the snapshot and the minimum number of snapshots that must be retained, even if they’re older than the maximum age limit. This setting is independent for each branch.

For example, to keep snapshots 7 days and keep at least 10 snapshots, run the following query:

ALTER TABLE glue_catalog.db.sample CREATE BRANCH audit WITH SNAPSHOT RETENTION 7 DAYS 10 SNAPSHOTS

Tags act as permanent references to specific snapshots of your data. Without setting an expiration time, tagged snapshots persist indefinitely and prevent optimization jobs from cleaning up the associated data files. You can set a time limit for how long to keep a reference when you create it.

For example, to keep snapshots tagged with event for 360 days, run the following query:

ALTER TABLE glue_catalog.db.sample CREATE TAG event RETAIN 360 DAYS

This combination of branching and tagging capabilities enables flexible snapshot lifecycle management that can accommodate various business requirements and use cases. For more information about the Data Catalog’s automated storage optimization feature, refer to The AWS Glue Data Catalog now supports storage optimization of Apache Iceberg tables.

Change log view

The create_changelog_view Spark procedure helps track table modifications by generating a comprehensive change history view. It captures all data alterations, from insert to updates and deletions. This makes it simple to analyze how your data has evolved and audit changes over time.

The change log view created by the create_changelog_view procedure contains all the information about changes, including the modified record content, type of operation performed, order of changes, and the snapshot ID where the change was committed. In addition, it can show the original and modified versions of records by passing designated key columns. These selected columns typically serve as distinct identifiers or primary keys that uniquely identify each record. See the following code:

CALL glue_catalog.system.create_changelog_view(
    table => 'db.test',
    identifier_columns => array('id')
)

By running the procedure, the change log view test_changes is created. When you query the change log view using SELECT * FROM test_changes, you can obtain the following output, which includes the history of record changes in the Iceberg table.

The create_changelog_view procedure helps you monitor and understand data changes. This feature proves valuable for many use cases, including change data capture (CDC), monitoring audit records, and live analysis.

Storage partitioned join

Storage partitioned join is a join optimization technique provided by Iceberg, which enhances both read and write performance. This feature uses existing storage layout to eliminate expensive data shuffles, and significantly improves query performance when joining large datasets that share compatible partitioning schemes. It operates by taking advantage of the physical organization of data on disk. When both datasets are partitioned using a compatible layout, Spark can perform join operations locally by directly reading matching partitions, completely avoiding the need for data shuffling.

To enable and optimize storage partitioned joins, you need to set the following Spark config properties through SparkConf or an AWS Glue job parameter. The following code lists the properties for the Spark config:

spark.sql.sources.v2.bucketing.enabled=true
spark.sql.sources.v2.bucketing.pushPartValues.enabled=true
spark.sql.requireAllClusterKeysForCoPartition=false
spark.sql.adaptive.enabled=false
spark.sql.adaptive.autoBroadcastJoinThreshold=-1
spark.sql.iceberg.planning.preserve-data-grouping=true

To use an AWS Glue job parameter, set the following:

  • Key: --conf
  • Value: spark.sql.sources.v2.bucketing.enabled=true --conf
    spark.sql.sources.v2.bucketing.pushPartValues.enabled=true --conf
    spark.sql.requireAllClusterKeysForCoPartition=false --conf
    spark.sql.adaptive.enabled=false --conf
    spark.sql.adaptive.autoBroadcastJoinThreshold=-1 --conf
    spark.sql.iceberg.planning.preserve-data-grouping=true

The following examples compare sample physical plans obtained by the EXPLAIN query, with and without storage partitioned join. In these plans, both tables product_review and customer have the same bucketed partition keys, such as review_year and product_id. When storage partitioned join is enabled, Spark joins the two tables without a shuffle operation.

The following is a physical plan without storage partitioned join:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [review_year#915L, product_id#920]
+- SortMergeJoin [review_year#915L, product_id#906], [review_year#929L, product_id#920], Inner
:- Sort [review_year#915L ASC NULLS FIRST, product_id#906 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(review_year#915L, product_id#906, 16), ENSURE_REQUIREMENTS, [plan_id=359]
: +- BatchScan glue_catalog.db.product_review[...]
+- Sort [review_year#929L ASC NULLS FIRST, product_id#920 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(review_year#929L, product_id#920, 16), ENSURE_REQUIREMENTS, [plan_id=360]
+- BatchScan glue_catalog.db.customer[...]

The following is a physical plan with storage partitioned join:

== Physical Plan ==
(3) Project [review_year#1301L, product_id#1306]
+- (3) SortMergeJoin [review_year#1301L, product_id#1292], [review_year#1315L, product_id#1306], Inner
    :- (1) Sort [review_year#1301L ASC NULLS FIRST, product_id#1292 ASC NULLS FIRST], false, 0
    : +- (1) ColumnarToRow
    : +- BatchScan glue_catalog.db.product_review[...]
+- (2) Sort [review_year#1315L ASC NULLS FIRST, product_id#1306 ASC NULLS FIRST], false, 0
+- (2) ColumnarToRow
+- BatchScan glue_catalog.db.customer[...]

In this physical plan, we don’t see the Exchange operation that is present in physical plan without storage partitioned join. This indicates that no shuffle operation will be performed.

Delta Lake highlights

AWS Glue 5.0 supports Delta Lake 3.2.1. We highlight its notable updates in this section. For more details, refer to Delta Lake Release 3.2.1.

Deletion vectors

Deletion vectors are a feature in Delta Lake that implements a merge-on-read (MoR) paradigm, providing an alternative to the traditional copy-on-write (CoW) approach. This feature fundamentally changes how DELETE, UPDATE, and MERGE operations are processed in Delta Lake tables. In the CoW paradigm, modifying even a single row requires rewriting entire Parquet files. With deletion vectors, changes are recorded as soft deletes, allowing the original data files to remain untouched while maintaining logical consistency. This approach results in improved write performance.

When deletion vectors are enabled, changes are recorded as soft deletes in a compressed bitmap format during write operations. During read operations, these changes are merged with the base data. Additionally, changes recorded by deletion vectors can be physically applied by rewriting files to purge soft deleted data using the REORG command.

To enable deletion vectors, set the table parameter to delta.enableDeletionVectors = 'true'.

When deletion vector is enabled, you can confirm the deletion vector file is created. The file is highlighted in the following screenshot.

MoR with deletion vectors is especially useful in scenarios requiring efficient write operations to tables with frequent updates and data scattered across multiple files. However, you should consider the read overhead required to merge these files. For more information, refer to What are deletion vectors?

Optimized writes

Delta Lake’s optimized writes feature addresses the small file problem, a common performance challenge in data lakes. This issue typically occurs when numerous small files are created through distributed operations. When reading data, processing many small files creates substantial overhead due to extensive metadata management and file handling.

The optimized writes feature solves this by combining multiple small writes into larger, more efficient files before they are written to disk. The process redistributes data across executors before writing and colocates similar data within the same partition. You can control the target file size using the spark.databricks.delta.optimizeWrite.binSize parameter, which defaults to 512 MB. With optimized writes enabled, the traditional approach of using coalesce(n) or repartition(n) to control output file counts becomes unnecessary, because file size optimization is handled automatically.

To enable deletion vectors, set the table parameter to delta.autoOptimize.optimizeWrite = 'true'.

The optimized writes feature isn’t enabled by default, and you should be aware of potentially higher write latency due to data shuffling before files are written to the table. In some cases, combining this with auto compaction can effectively address small file issues. For more information, refer to Optimizations.

UniForm

Delta Lake Universal Format (UniForm) introduces an approach to data lake interoperability by enabling seamless access to Delta Lake tables through Iceberg and Hudi. Although these formats differ primarily in their metadata layer, Delta Lake UniForm bridges this gap by automatically generating compatible metadata for each format alongside Delta Lake, all referencing a single copy of the data. When you write to a Delta Lake table with UniForm enabled, UniForm automatically and asynchronously generates metadata for other formats.

Delta UniForm allows organizations to use the most suitable tool for each data workload while operating on a single delta lake-based data source. UniForm is read-only from an Iceberg and Hudi perspective, and some features of each format are not available. For more details about limitations, refer to Limitations. To learn more about how to use UniForm on AWS, visit Expand data access through Apache Iceberg using Delta Lake UniForm on AWS.

Apache Hudi highlights

AWS Glue 5.0 supports Hudi 0.15.0. We highlight its notable updates in this section. For more details, refer to Hudi Release 0.15.0.

Record Level Index

Hudi provides indexing mechanisms to map record keys to their corresponding file locations, enabling efficient data operations. To use these indexes, you first need to enable the metadata table using MoR by setting hoodie.metadata.enable=true in your table parameters. Hudi’s multi-modal indexing feature allows it to store various types of indexes. These indexes give you the flexibility to add different index types as your needs evolve.

Record Level Index enhances both write and read operations by maintaining precise mappings between record keys and their corresponding file locations. This mapping enables quick determination of record locations, reducing the number of files that need to be scanned during data retrieval.

During the write workflow, when new records arrive, Record Level Index tags each record with location information if it exists in any file group. This tagging process realizes efficient update operations by directly reducing write latency. For the read workflow, Record Level Index eliminates the need to scan through all files by enabling writers to quickly locate files containing specific data. By tracking which files contain which records, Record Level Index accelerates queries, particularly when performing exact matches on record key columns.

To enable Record Level Index, set the following table parameters:

hoodie.metadata.enable = 'true'
hoodie.metadata.record.index.enable = 'true'
hoodie.index.type = 'RECORD_INDEX'

When Record Level Index is enabled, the record_index partition is created on the metadata table storing indexes, as shown in the following screenshot.

For more information, refer to Record Level Index: Hudi’s blazing fast indexing for large-scale datasets on Hudi’s blog.

Auto generated keys

Traditionally, Hudi required explicit configuration of primary keys for every table. Users needed to specify the record key field using the hoodie.datasource.write.recordkey.field configuration. This requirement sometimes posed challenges for datasets lacking natural unique identifiers, such as in log ingestion scenarios.

With auto generated primary keys, Hudi now offers the flexibility to create tables without explicitly configuring primary keys. When you omit the hoodie.datasource.write.recordkey.field configuration, Hudi automatically generates efficient primary keys that optimize compute, storage, and read operations while maintaining uniqueness requirements. For more details, refer to Key Generation.

CDC queries

In some use cases like streaming ingestion, it’s important to track all changes for the records that belong to a single commit. Although Hudi has provided the incremental query that enables you to obtain a set of records that changed between a start and end commit time, it doesn’t contain before and after images of records. Instead, a CDC query in Hudi allows you to capture and process all mutating operations, including inserts, updates, and deletes, making it possible to track the complete evolution of data over time.

To enable CDC queries, set the table parameter to hoodie.table.cdc.enabled = 'true'.

To perform a CDC query, set the following query option:

cdc_read_options = {
    'hoodie.datasource.query.incremental.format': 'cdc',
    'hoodie.datasource.query.type': 'incremental',
    'hoodie.datasource.read.begin.instanttime': 0
}

spark.read.format("hudi"). \
    options(**cdc_read_options). \
    load(basePath).show()

The following screenshot shows a sample output from a CDC query. In the op column, we can see which operation was performed on each record. The output also displays the before and after images of the modified records.

This feature is currently available for CoW tables; MoR tables are not yet supported at the time of writing. For more information, refer to Change Data Capture Query.

Conclusion

In this post, we discussed the key upgrades on Iceberg, Delta Lake, and Hudi in AWS Glue 5.0. You can take advantage of the new version right away by creating new jobs and transferring your current ones to use the enhanced features.


About the Authors

Sotaro Hikita is an Analytics Solutions Architect. He supports customers across a wide range of industries in building and operating analytics platforms more effectively. He is particularly passionate about big data technologies and open source software.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Introducing AWS Glue 5.0 for Apache Spark

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/introducing-aws-glue-5-0-for-apache-spark/

AWS Glue is a serverless, scalable data integration service that makes it simple to discover, prepare, move, and integrate data from multiple sources. Today, we are launching AWS Glue 5.0, a new version of AWS Glue that accelerates data integration workloads in AWS. AWS Glue 5.0 upgrades the Spark engines to Apache Spark 3.5.2 and Python 3.11, giving you newer Spark and Python releases so you can develop, run, and scale your data integration workloads and get insights faster.

This post describes what’s new in AWS Glue 5.0, performance improvements, key highlights on Spark and related libraries, and how to get started on AWS Glue 5.0.

What’s new in AWS Glue 5.0

AWS Glue 5.0 upgrades the runtimes to Spark 3.5.2, Python 3.11, and Java 17 with new performance and security improvements from the open source. AWS Glue 5.0 also updates support for open table format libraries to Apache Hudi 0.15.0, Apache Iceberg 1.6.1, and Delta Lake 3.2.1 so you can solve advanced use cases around performance, cost, governance, and privacy in your data lakes. AWS Glue 5.0 adds support for Spark-native fine-grained access control with AWS Lake Formation so you can apply table- and column-level permissions on an Amazon Simple Storage Service (Amazon S3) data lake for write operations (such as INSERT INTO and INSERT OVERWRITE) with Spark jobs.

Key features include:

  • Amazon SageMaker Unified Studio support
  • Amazon SageMaker Lakehouse support
  • Frameworks updated to Spark 3.5.2, Python 3.11, Scala 2.12.18, and Java 17
  • Open Table Formats (OTF) updated to Hudi 0.15.0, Iceberg 1.6.1, and Delta Lake 3.2.1
  • Spark-native fine-grained access control using Lake Formation
  • Amazon S3 Access Grants support
  • requirements.txt support to install additional Python libraries
  • Data lineage support in Amazon DataZone

Amazon SageMaker Unified Studio support

Amazon SageMaker Unified Studio supports AWS Glue 5.0 for compute runtime of unified notebooks and visual ETL flow editor.

Amazon SageMaker Lakehouse support

Glue 5.0 supports native integration with Amazon SageMaker Lakehouse to enable unified access across Amazon Redshift data warehouses and S3 data lakes.

Frameworks updated to Spark 3.5.2, Python 3.11, Scala 2.12.18, and Java 17

AWS Glue 5.0 upgrades the runtimes to Spark 3.5.2, Python 3.11, Scala 2.12.18, and Java 17. Glue 5.0 uses AWS performance optimized Spark runtime, 3.9 times faster than open source Spark. Glue 5.0 is 32% faster than AWS Glue 4.0 and reduces costs by 22%.

For more details about updated library dependencies, see Dependent library upgrades section.

Open Table Formats (OTF) updated to Hudi 0.15.0, Iceberg 1.6.1, and Delta Lake 3.2.1

AWS Glue 5.0 upgrades the open table format libraries to Hudi 0.15.0, Iceberg 1.6.1, and Delta Lake 3.2.1.

Spark-native fine-grained access control using Lake Formation

AWS Glue supports AWS Lake Formation Fine Grained Access Control (FGAC) through native Spark DataFrames and Spark SQL.

S3 Access Grants support

S3 Access Grants provides a simplified model for defining access permissions to data in Amazon S3 by prefix, bucket, or object. AWS Glue 5.0 supports S3 Access Grants through EMR File System (EMRFS) using additional Spark configurations:

  • Key: --conf
  • Value: hadoop.fs.s3.s3AccessGrants.enabled=true --conf spark.hadoop.fs.s3.s3AccessGrants.fallbackToIAM=false

To learn more, refer to documentation.

requirements.txt support to install additional Python libraries

In AWS Glue 5.0, you can provide the standard requirements.txt file to manage Python library dependencies. To do that, provide the following job parameters:

  • Parameter 1:
    • Key: --python-modules-installer-option
    • Value: -r
  • Parameter 2:
    • Key: --additional-python-modules
    • Value: s3://path_to_requirements.txt

AWS Glue 5.0 nodes initially load Python libraries specified in requirements.txt. The following code illustrates the sample requirements.txt:

awswrangler==3.9.1 
elasticsearch==8.15.1
PyAthena==3.9.0
PyMySQL==1.1.1
PyYAML==6.0.2
pyodbc==5.2.0
pyorc==0.9.0 
redshift-connector==2.1.3
scipy==1.14.1
scikit-learn==1.5.2
SQLAlchemy==2.0.36

Data lineage support in Amazon DataZone (preview)

AWS Glue 5.0 supports data lineage in Amazon DataZone in preview. You can configure AWS Glue to automatically collect lineage information during Spark job runs and send the lineage events to be visualized in Amazon DataZone.

To configure this on the AWS Glue console, enable Generate lineage events, and enter your Amazon DataZone domain ID on the Job details tab.

Alternatively, you can provide the following job parameter (provide your DataZone domain ID):

  • Key: --conf
  • Value: extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener —conf spark.openlineage.transport.type=amazon_datazone_api —conf spark.openlineage.transport.domainId=<Your-Domain-ID>

Learn more in Amazon DataZone introduces OpenLineage-compatible data lineage visualization in preview.

Improved performance

AWS Glue 5.0 improves the price-performance of your AWS Glue jobs. AWS Glue 5.0 is 32% faster than AWS Glue 4.0 and reduces costs by 22%. The following chart shows the total job runtime for all queries (in seconds) in the 3 TB query dataset between AWS Glue 4.0 and AWS Glue 5.0. The TPC-DS dataset is located in an S3 bucket in Parquet format, and we used 30 G.2X workers in AWS Glue. We observed that our AWS Glue 5.0 TPC-DS tests on Amazon S3 was 58% faster than that on AWS Glue 4.0 while reducing cost by 36%.

. AWS Glue 4.0 AWS Glue 5.0
Total Query Time (seconds) 1896.1904 1197.78755
Geometric Mean (seconds) 10.09472 6.82208
Estimated Cost ($) 45.85533 29.20133

The following graphs illustrates the comparisons of performance and cost.

Dependent library upgrades

The following table lists dependency upgrades.

Dependency Version in AWS Glue 4.0 Version in AWS Glue 5.0
Spark 3.3.0 3.5.2
Hadoop 3.3.3 3.4.0
Scala 2.12 2.12.18
Hive 2.3.9 2.3.9
EMRFS 2.54.0 2.66.0
Arrow 7.0.0 12.0.1
Iceberg 1.0.0 1.6.1
Hudi 0.12.1 0.15.0
Delta Lake 2.1.0 3.2.1
Java 8 17
Python 3.10 3.11
boto3 1.26 1.34.131
AWS SDK for Java 1.12 2.28.8
AWS Glue Data Catalog Client 3.7.0 4.2.0
EMR DynamoDB Connector 4.16.0 5.6.0

The following table lists database connector (JDBC driver) upgrades.

Driver Connector Version in AWS Glue 4.0 Connector Version in AWS Glue 5.0
MySQL 8.0.23 8.0.33
Microsoft SQL Server 9.4.0 10.2.0
Oracle Databases 21.7 23.3.0.23.09
PostgreSQL 42.3.6 42.7.3
Amazon Redshift redshift-jdbc42-2.1.0.16 redshift-jdbc42-2.1.0.29

The following are Spark connector upgrades:

Driver Connector Version in AWS Glue 4.0 Connector Version in AWS Glue 5.0
Amazon Redshift 6.1.3 6.3.0
OpenSearch 1.0.1 1.2.0
MongoDB 10.0.4 10.3.0
Snowflake 2.12.0 3.0.0
BigQuery 0.32.2 0.32.2

Apache Spark highlights

Spark 3.5.2 in AWS Glue 5.0 brings a number of valuable features, which we highlight in this section. To learn more about the highlights and enhancements of Spark 3.4 and 3.5, refer to Spark Release 3.4.0 and Spark Release 3.5.0.

Apache Arrow-optimized Python UDF

Python user-defined functions (UDFs) enable users to build custom code for data processing needs, providing flexibility and accessibility. However, performance suffers because UDFs require serialization between Python and JVM processes. Spark 3.5’s Apache Arrow-optimized UDFs solve this by keeping data in shared memory using Arrow’s high-performance columnar format, eliminating serialization overhead and making UDFs efficient for large-scale processing.

To use Arrow-optimized Python UDFs, set spark.sql.execution.pythonUDF.arrow.enabled to True.

Python user-defined table functions

A user-defined table function (UDTF) is a function that returns an entire output table instead of a single value. PySpark users can now write custom UDTFs with Python logic and use them in PySpark and SQL queries. Called in the FROM clause, UDTFs can accept zero or more arguments, either as scalar expressions or table arguments. The UDTF’s return type, defined as either a StructType (for example, StructType().add("c1", StringType())) or DDL string (for example, c1: string), determines the output table’s schema.

RocksDB state store enhancement

At Spark 3.2, RocksDB state store provider has been added as a built-in state store implementation.

Changelog checkpointing

A new checkpoint mechanism for the RocksDB state store provider called changelog checkpointing persists the changelog (updates) of the state. This reduces the commit latency, thereby reducing end-to-end latency significantly.

You can enable this by setting spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled to True.

You can also enable this feature with existing checkpoints.

Memory management enhancements

Although the RocksDB state store provider is well-known to be useful to address memory issues on the state, there was no fine-grained memory management. Spark 3.5 introduces more fine-grained memory management, which enables users to cap the total memory usage across RocksDB instances in the same executor process, enabling users to configure the memory usage per executor process.

Enhanced Structured Streaming

Spark 3.4 and 3.5 have many enhancements related to Spark Structured Streaming.

This new API deduplicates rows based on certain events. Watermark-based processing allows for more precise control over late data handling:

  • Deduplicate the same rows: dropDuplicatesWithinWatermark()
  • Deduplicate values on ‘value’ columns: dropDuplicatesWithinWatermark(['value'])
  • Deduplicate using the guid column with a watermark based on the eventTime column: withWatermark("eventTime", "10 hours") .dropDuplicatesWithinWatermark(["guid"])

Get started with AWS Glue 5.0

You can start using AWS Glue 5.0 through AWS Glue Studio, the AWS Glue console, the latest AWS SDK, and the AWS Command Line Interface (AWS CLI).

To start using AWS Glue 5.0 jobs in AWS Glue Studio, open the AWS Glue job and on the Job Details tab, choose the version Glue 5.0 – Supports Spark 3.5, Scala 2, Python 3.

To start using AWS Glue 5.0 on an AWS Glue Studio notebook or an interactive session through a Jupyter notebook, set 5.0 in the %glue_version magic:

%%glue_version 5.0

The following output shows that the session is set to use AWS Glue 5.0:

Setting Glue version to: 5.0

Conclusion

In this post, we discussed the key features and benefits of AWS Glue 5.0. You can create new AWS Glue jobs on AWS Glue 5.0 to get the benefit from the improvements, or migrate your existing AWS Glue jobs.

We would like to thank the support of numerous engineers and leaders who helped build Glue 5.0 that enables customers with a performance optimized Spark runtime and several new capabilities.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Stuti Deshpande is a Big Data Specialist Solutions Architect at AWS. She works with customers around the globe, providing them strategic and architectural guidance on implementing analytics solutions using AWS. She has extensive experience in big data, ETL, and analytics. In her free time, Stuti likes to travel, learn new dance forms, and enjoy quality time with family and friends.

Martin Ma is a Software Development Engineer on the AWS Glue team. He is passionate about improving the customer experience by applying problem-solving skills to invent new software solutions, as well as constantly searching for ways to simplify existing ones. In his spare time, he enjoys singing and playing the guitar.

Anshul Sharma is a Software Development Engineer in AWS Glue Team.

Rajendra Gujja is a Software Development Engineer on the AWS Glue team. He is passionate about distributed computing and everything and anything about data.

Maheedhar Reddy Chappidi is a Sr. Software Development Engineer on the AWS Glue team. He is passionate about building fault tolerant and reliable distributed systems at scale. Outside of his work, Maheedhar is passionate about listening to podcasts and playing with his two-year-old kid.

Matt Su is a Senior Product Manager on the AWS Glue team. He enjoys helping customers uncover insights and make better decisions using their data with AWS Analytics services. In his spare time, he enjoys skiing and gardening.

Savio Dsouza is a Software Development Manager on the AWS Glue team. His team works on generative AI applications for the Data Integration domain and distributed systems for efficiently managing data lakes on AWS and optimizing Apache Spark for performance and reliability.

Kartik Panjabi is a Software Development Manager on the AWS Glue team. His team builds generative AI features for the Data Integration and distributed system for data integration.

Mohit Saxena is a Senior Software Development Manager on the AWS Glue and Amazon EMR team. His team focuses on building distributed systems to enable customers with simple-to-use interfaces and AI-driven capabilities to efficiently transform petabytes of data across data lakes on Amazon S3, and databases and data warehouses on the cloud.

Read and write S3 Iceberg table using AWS Glue Iceberg Rest Catalog from Open Source Apache Spark

Post Syndicated from Raj Ramasubbu original https://aws.amazon.com/blogs/big-data/read-and-write-s3-iceberg-table-using-aws-glue-iceberg-rest-catalog-from-open-source-apache-spark/

In today’s data-driven world, organizations are constantly seeking efficient ways to process and analyze vast amounts of information across data lakes and warehouses.

Enter Amazon SageMaker Lakehouse, which you can use to unify all your data across Amazon Simple Storage Service (Amazon S3) data lakes and Amazon Redshift data warehouses, helping you build powerful analytics and AI and machine learning (AI/ML) applications on a single copy of data. SageMaker Lakehouse gives you the flexibility to access and query your data in-place with all Apache Iceberg compatible tools and engines. This opens up exciting possibilities for Open Source Apache Spark users who want to use SageMaker Lakehouse capabilities. Further you can secure your data in SageMaker Lakehouse by defining fine-grained permissions, which are enforced across all analytics and ML tools and engines.

In this post, we will explore how to harness the power of Open source Apache Spark and configure a third-party engine to work with AWS Glue Iceberg REST Catalog. The post will include details on how to perform read/write data operations against Amazon S3 tables with AWS Lake Formation managing metadata and underlying data access using temporary credential vending.

Solution overview

In this post, the customer uses Data Catalog to centrally manage technical metadata for structured and semi-structured datasets in their organization and wants to enable their data team to use Apache Spark for data processing. The customer will create an AWS Glue database and configure Apache Spark to interact with Glue Data Catalog using the Iceberg Rest API for writing/reading Iceberg data on Amazon S3 using Lake Formation permission control.

We will start by running an extract, transform, and load (ETL) script using Apache Spark to create an Iceberg table on Amazon S3 and access the table using the Glue Iceberg REST Catalog. The ETL script will add data to the Iceberg table and then read it back using Spark SQL. This post will showcase how this data can also be queried by other data teams using Amazon Athena .

Prerequisites

Access to an AWS Identity and Access Management (IAM) role that is a Lake Formation data lake administrator in the account that has the Data Catalog. For instructions, see Create a data lake administrator.

  1. Verify that you have Python version 3.7 or later installed. Check if pip3 version is 22.2.2 or higher is installed.
  2. Install or update the latest AWS Command Line Interface (AWS CLI). For instructions, see Installing or updating the latest version of the AWS CLI. Run aws configure using AWS CLI to point to your AWS account.
  3. Create an S3 bucket to store the customer Iceberg table. For this post, we will be using the us-east-2 AWS Region and will name the bucket: ossblog-customer-datalake.
  4. Create an IAM role that will be used in OSS Spark for data access using an AWS Glue Iceberg REST catalog endpoint. Make sure that the role has AWS Glue and Lake Formation policies as defined in Data engineer permissions. For this post, we will use an IAM role named spark_role.

Enable Lake Formation permissions for third-party access

In this section, you will register the S3 bucket with Lake Formation. This step allows Lake Formation to act as a centralized permissions management system for metadata and data stored in Amazon S3, enabling more efficient and secure data governance in data lake environments.

  1. Create a user defined IAM role following the instructions in Requirements for roles used to register locations. For this post, we will use the IAM role: LFRegisterRole.
  2. Register the S3 bucket ossblog-customer-datalake using the IAM role LFRegisterRole by running the following command:
    aws lakeformation register-resource \
    --resource-arn '< S3 bucket ARN for amzn-s3-demo-bucket>' \
    --role-arn '< IAM Role ARN for LFRegisterRole >' \
    --region <aws_region>

Alternatively you can use the AWS Management Console for Lake Formation.

  1. Navigate to the Lake Formation console, choose Administration in the navigation pane, and then Data lake locations and provide the following values:
    1. For Amazon S3 path, select s3://ossblog-customer-datalake.
    2. For IAM role, select LFRegisterRole
    3. For Permission mode, choose Lake Formation.
    4. Choose Register location.
  1. In Lake Formation, enable full table access for external engines to access data.
    1. Sign in as an admin user, choose Administration in the navigation pane.
    2. Choose Application integration settings and select Allow external engines to access data in Amazon S3 locations with full table access.
    3. Choose Save.

Set up resource access for the OSS Spark role:

  1. Create an AWS Glue database called ossblogdb in the default catalog by going to the Lake Formation console and choosing Databases in the navigation pane.
  2. Select the database, choose Edit and clear the checkbox for Use only IAM access control for new tables in this database.

Grant resource permission to OSS  Spark role:

To enable OSS Spark to create and populate the dataset in the ossblogdb database, you will use the IAM role (spark_role) for Apache Spark instance that you created in step 4 of the prerequisites section. Apache Spark will assume this role to create an Iceberg table, add records to it and read from it. To enable this functionality, grant full table access to spark_role and provide data location permission to the S3 bucket where the table data can be stored.

Grant create table permission to the spark_role:

Sign in as Datalake Admin and run the following command using AWS CLI:

aws lakeformation grant-permissions \
--principal '{"DataLakePrincipalIdentifier":"arn:aws:iam::<aws_account_id>:role/<iam_role_name>"}' \
--permissions '["CREATE_TABLE","DESCRIBE"]'\
--resource '{"Database":{"CatalogId":"<aws_account_id>","Name":"ossblogdb"}}' \
--region <aws_region>

Alternatively on the console:

  1. In the Lake Formation console navigation pane, choose Data lake permissions, and then choose Grant.
  2. In the Principals section, for IAM users and roles, select spark_role.
  3. In the LF-Tags or catalog resources section, select Named Data Catalog resources:
    1. Select <accountid> for Catalogs.
    2. Select ossblogdb for Databases.
  4. Select DESCRIBE and CREATE TABLE for Database permissions.
  5. Choose Grant.

Grant data location permission to the spark_role:

Sign in as Datalake Admin and run the following command using the AWS CLI:

aws lakeformation grant-permissions 
--principal '{"DataLakePrincipalIdentifier":"<Principal>"}' 
--permissions DATA_LOCATION_ACCESS 
--resource '{"DataLocation":{"CatalogId":"<Catalog ID>","ResourceArn":"<S3 bucket ARN>"}}' 
--region <aws_region>

Alternatively on the console:

  1. In the Lake Formation console navigation pane, choose Data Locations, and then choose Grant.
  2. For IAM users and roles, select spark_role.
  3. For Storage locations, select the bucket_name
  4. Choose Grant.

Set up a Spark script to use an AWS Glue Iceberg REST catalog endpoint:

Create a file named oss_spark_customer_etl.py in your environment with the following content:

import sys
import os
import time
from pyspark.sql import SparkSession

#Replace <aws_region> with AWS region name.
#Replace <aws_account_id> with AWS account ID.

spark = SparkSession.builder.appName('osspark') \
.config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,software.amazon.awssdk:bundle:2.20.160,software.amazon.awssdk:url-connection-client:2.20.160') \
.config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
.config('spark.sql.defaultCatalog', 'spark_catalog') \
.config('spark.sql.catalog.spark_catalog', 'org.apache.iceberg.spark.SparkCatalog') \
.config('spark.sql.catalog.spark_catalog.type', 'rest') \
.config('spark.sql.catalog.spark_catalog.uri','https://glue.<aws_region>.amazonaws.com/iceberg') \
.config('spark.sql.catalog.spark_catalog.warehouse','<aws_account_id>') \
.config('spark.sql.catalog.spark_catalog.rest.sigv4-enabled','true') \
.config('spark.sql.catalog.spark_catalog.rest.signing-name','glue') \
.config('spark.sql.catalog.spark_catalog.rest.signing-region', <aws_region>) \
.config('spark.sql.catalog.spark_catalog.io-impl','org.apache.iceberg.aws.s3.S3FileIO') \
.config('spark.hadoop.fs.s3a.aws.credentials.provider','org.apache.hadoop.fs.s3a.SimpleAWSCredentialProvider') \
.config('spark.sql.catalog.spark_catalog.rest-metrics-reporting-enabled','false') \
.getOrCreate()
spark.sql("use ossblogdb").show()
spark.sql("""CREATE TABLE ossblogdb.customer (name string) USING iceberg location 's3://<3_bucket_name>/customer'""")
time.sleep(120)
spark.sql("insert into ossblogdb.customer values('Alice') ").show()
spark.sql("select * from ossblogdb.customer").show()

Launch Pyspark locally and validate read/write to the Iceberg table on Amazon S3

Run pip install pyspark. Save the script locally and set the environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_SESSION_TOKEN) with temporary credentials for the spark_role IAM role.

Run python /path/to/oss_spark_customer_etl.py

You can also use Athena to view the data in the Iceberg table:

To enable the other data team to view the content, provide read access to the data team IAM role using the Lake Formation console:

  1. In the Lake Formation console navigation pane, choose Data lake permissions, and then choose Grant.
  2. In the Principals section, for IAM users and roles choose <iam_role>.
  3. In the LF-Tags or catalog resources section, select Named Data Catalog resources:
    1. Select <accountid> for Catalogs.
    2. Select ossblogdb for Databases.
    3. Select customer for Tables.
  4. Select DESCRIBE and SELECT for Table permissions.
  5. Choose Grant.

Sign in as the IAM role and run the command:

SELECT * FROM "ossblogdb"."customer" limit 10;

Clean up

To clean up your resources, complete the following steps:

  1. Delete the resources database/table created in Data Catalog.
  2. Empty and then delete the S3 bucket

Conclusion

In this post, we’ve walked through the seamless integration between Apache Spark and an AWS Glue Iceberg Rest Catalog for accessing Iceberg tables in Amazon S3, demonstrating how to effectively perform read and write operations using Iceberg REST API. The beauty of this solution lies in its flexibility—whether you’re running Spark on bare metal servers in your data center, in a Kubernetes cluster, or any other environment, this architecture can be adapted to suit your needs.


About the Authors

Raj RamasubbuRaj Ramasubbu is a Sr. Analytics Specialist Solutions Architect focused on big data and analytics and AI/ML with Amazon Web Services. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS. Raj provided technical expertise and leadership in building data engineering, big data analytics, business intelligence, and data science solutions for over 20 years prior to joining AWS. He helped customers in various industry verticals like healthcare, medical devices, life science, retail, asset management, car insurance, residential REIT, agriculture, title insurance, supply chain, document management, and real estate.

Srividya Parthasarathy is a Senior Big Data Architect on the AWS Lake Formation team. She works with product team and customer to build robust features and solutions for their analytical data platform.  She enjoys building data mesh solutions and sharing them with the community.

Pratik Das is a Senior Product Manager with AWS Lake Formation. He is passionate about all things data and works with customers to understand their requirements and build delightful experiences. He has a background in building data-driven solutions and machine learning systems in production.

Author visual ETL flows on Amazon SageMaker Unified Studio (preview)

Post Syndicated from Praveen Kumar original https://aws.amazon.com/blogs/big-data/author-visual-etl-flows-on-amazon-sagemaker-unified-studio/

Amazon SageMaker Unified Studio (preview) provides an integrated data and AI development environment within Amazon SageMaker. From the Unified Studio, you can collaborate and build faster using familiar AWS tools for model development, generative AI, data processing, and SQL analytics. This experience includes visual ETL, a new visual interface that makes it simple for data engineers to author, run, and monitor extract, transform, load (ETL) data integration flow. You can use a simple visual interface to compose flows that move and transform data and run them on serverless compute. Additionally, you can choose to author your visual flows with English using generative AI prompts powered by Amazon Q. Visual ETL also automatically converts your visual flow directed acyclic graph (DAG) into Spark native scripts so you can continue authoring by notebook, enabling a quick-start experience for developers who prefer to author using code.

This post shows how you can build a low-code and no-code (LCNC) visual ETL flow that enables seamless data ingestion and transformation across multiple data sources. We demonstrate how to:

Additionally, we explore how generative AI can enhance your LCNC visual ETL development process, creating an intuitive and powerful workflow that streamlines the entire development experience.

Use case walkthrough

In this example, we use Amazon SageMaker Unified Studio to develop a visual ETL flow. This pipeline reads data from an Amazon S3 based file location, performs transformations on the data, and subsequently writes the transformed data back into an Amazon S3 based AWS Glue Data Catalog table. We use allevents_pipe and venue_pipe files from the TICKIT dataset to demonstrate this capability.

The TICKIT dataset records sales activities on the fictional TICKIT website, where users can purchase and sell tickets online for different types of events such as sports games, shows, and concerts. Analysts can use this dataset to track how ticket sales change over time, evaluate the performance of sellers, and determine the most successful events, venues, and seasons in terms of ticket sales.

The process involves merging the allevents_pipe and venue_pipe files from the TICKIT dataset. Next, the merged data is filtered to include only a specific geographic region. The data is then aggregated to calculate the number of events by venue name. In the end, the transformed output data is saved to Amazon S3, and a new AWS Glue Data Catalog table is created.

The following diagram illustrates the architecture:

Prerequisites

To run the instruction, you must complete the following prerequisites:

  • An AWS account
  • A SageMaker Unified Studio domain
  • A SageMaker Unified Studio project with Data analytics and machine learning project profile

Build a visual ETL flow

Complete following steps to build a new visual ETL flow with sample dataset:

  1. On the SageMaker Unified Studio console, on the top menu, choose Build.
  2. Under DATA ANALYSIS & INTEGRATION, choose Visual ETL flows, as shown in the following screenshot.

  1. Select your project and choose Continue.

  1. Choose Create visual ETL flow.

This time, manually define the ETL flow.

  1. On the top left, choose the + icon in the circle. Under Data sources, choose Amazon S3, as shown in the following screenshot. Locate the icon at the canvas.

  1. Choose the Amazon S3 source node and enter the following values:
    • S3 URI: s3://aws-blogs-artifacts-public/artifacts/BDB-4798/data/venue.csv
    • Format: CSV
    • Delimiter: ,
    • Multiline: Enabled
    • Header: Disabled

Leave the rest as default.

  1. Wait for the data preview to be available at the bottom of the screen.

  1. Choose the + icon in the circle to the right of the Amazon S3 node. Under Transforms, choose Rename Columns.

  1. Choose the Rename Columns node and choose Add new rename pair. For Current name and New name, enter the following pairs:
    • _c0: venueid
    • _c1venuename
    • _c2venuecity
    • _c3venuestate
    • _c4venueseats

  1. Choose the + icon to the right of Rename Columns node. Under Transforms, choose Filter.
  2. Choose Add new filter condition.
  3. For Key, choose venuestate. For Operation, choose ==. For Value, enter DC, as shown in the following screenshot.

  1. Repeat steps 5 and 6 to add the Amazon S3 source node for table events.
    • S3 URI: s3://aws-blogs-artifacts-public/artifacts/BDB-4798/data/events.csv
    • Format: CSV
    • Sep: ,
    • Multiline: Enabled
    • Header: Disabled

Leave the rest as default

  1. Repeat steps 7 and 8 for the Amazon S3 source node. On the Rename Columns node, choose Add new rename pair. For Current name and New name, enter the following pairs:
    • _c0: eventid
    • _c1e_venueid
    • _c2catid
    • _c3dateid
    • _c4eventname
    • _c5starttime

  1. Choose the + icon to the right of Rename Column node. Under Transforms, choose Join.
  2. Drag the + icon at the right of the Filter node and drop it at the left of the Join node.
  3. For Join type, choose Inner. For Left data source, choose e_venueid. For Right data source, choose venue_id.

  1. Choose the + icon to the right of the Join node. Under Transforms, choose SQL Query.
  2. Enter the following query statement:
select 
  venuename,
  count(distinct eventid) as eventid_count 
from {myDataSource} 
group by venuename

  1. Choose the + icon to the right of the SQL Query node. Under Data target, choose Amazon S3.
  2. Choose the Amazon S3 target node and enter the following values:
    • S3 URI: <choose s3 location from project overview page and add suffix “/output/venue_event/”> (for example, s3://<bucket-name>/dzd_bd693kieeb65yf/52d3z1nutb42w7/dev/output/venue_event/)
    • Format: Parquet
    • Compression: Snappy
    • Mode: Overwrite
    • Update catalog: True
    • Database: Choose your database
    • Table: venue_event_agg

At this point, you should encounter this end-to-end visual flow. Now you can publish it.

  1. On the top right, choose Save to project to save the draft flow. You can optionally change the name and add a description. Choose Save to project, as shown in the following screenshot.

The visual ETL flow has been successfully saved.

Run flow

This section shows you how to run the visual ETL flow you authored.

  1. On the top right, choose Run.

At the bottom of the screen, the run status is shown. The run status transitions from Starting to Running and Running to Finished.

  1. Wait for the run to be Finished.

Query using Amazon Athena

The output data has been written to the target S3 bucket. This section shows you how to query the output table.

  1. On the top left menu, under DATA ANALYSIS & INTEGRATION, choose Query Editor.

  1. On the data explorer, under Lakehouse, choose AwsDataCatalog. Navigate to the table venue_event_agg.
  2. From the three dots icon, choose Query with Athena.

Four records will be returned, as shown in the following screenshot. This indicates you succeeded in querying the output table written by the visual ETL flow.

Generative AI section to generate a visual ETL flow

The preceding instruction is done in step-by-step operations on the visual console. On the other hand, SageMaker Unified Studio can automate job authoring steps by using generative AI powered by Amazon Q.

  1. On the top left menu, choose Visual ETL flows.
  2. Choose Create visual ETL flow.
  3. Enter the following text and choose Submit.

Create a flow to connect 2 Glue catalog tables venue and event in database glue_db, join on event id , filter on venue state with condition as venuestate=='DC' and write output to a S3 location

This creates the following boilerplate flow that you can edit to quickly author the visual ETL flow.

The generated flow keeps the context of the prompt at the node level.

Clean Up

To avoid incurring future charges, clean up the resources you created during this walkthrough:

  1. From the SQL querybook, enter the following SQL to drop table:
drop table venue_event_agg
  1. To delete the flow, under Actions, choose Delete flow

Conclusion

This post demonstrated how you can use Amazon SageMaker Unified Studio to build a low-code no-code (LCNC) visual ETL flow. This allows for a seamless data ingestion and transformation across multiple data sources.

To learn more, refer to our documentation and the AWS News Blog.


About the Authors

praveenPraveen Kumar is an Analytics Solutions Architect at AWS with expertise in designing, building, and implementing modern data and analytics platforms using cloud-based services. His areas of interest are serverless technology, data governance, and data-driven AI applications.

noriNoritaka Sekiyama is a Principal Big Data Architect with AWS Analytics services. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling on his road bike.

alexandraAlexandra Tello is a Senior Front End Engineer with the AWS Analytics services in New York City. She is a passionate advocate for usability and accessibility. In her free time, she’s an espresso enthusiast and enjoys building mechanical keyboards.

ranuRanu Shah is a Software Development Manager with AWS Analytics services. She loves building data analytics features for customers. Outside work, she enjoys reading books or listening to music.

Gal blog picGal Heyne is a Technical Product Manager for AWS Analytics services with a strong focus on AI/ML and data engineering. She is passionate about developing a deep understanding of customers’ business needs and collaborating with engineers to design simple-to-use data products.

Simplify data integration with AWS Glue and zero-ETL to Amazon SageMaker Lakehouse

Post Syndicated from Shovan Kanjilal original https://aws.amazon.com/blogs/big-data/simplify-data-integration-with-aws-glue-and-zero-etl-to-amazon-sagemaker-lakehouse/

With the growing emphasis on data, organizations are constantly seeking more efficient and agile ways to integrate their data, especially from a wide variety of applications. While traditional extract, transform, and load (ETL) processes have long been a staple of data integration due to its flexibility, for common use cases such as replication and ingestion, they often prove time-consuming, complex, and less adaptable to the fast-changing demands of modern data architectures.

In addition, organizations rely on an increasingly diverse array of digital systems, data fragmentation has become a significant challenge. Valuable information is often scattered across multiple repositories, including databases, applications, and other platforms. To harness the full potential of their data, businesses must enable seamless access and consolidation from these varied sources. However, this task is complicated by the unique characteristics of modern systems, such as differing API protocols, implementations, and rate limits. To address these challenges and accelerate innovation, AWS Glue has recently expanded its third-party application support by introducing native connectors for 19 applications.

To utilize these new application connectors for well-defined use cases such as replication and ingestion, AWS Glue is also launching zero-ETL integration support from external applications. With this new functionality, customers can create up-to-date replicas of their data from applications such as Salesforce, ServiceNow, and Zendesk in an Amazon SageMaker Lakehouse and Amazon Redshift.

Amazon SageMaker Lakehouse unifies all your data across Amazon S3 data lakes and Amazon Redshift data warehouses, helping you build powerful analytics and AI/ML applications on a single copy of data. SageMaker Lakehouse gives you the flexibility to access and query your data in-place with all Apache Iceberg compatible tools and engines. By directly integrating with Lakehouse, all the data is automatically cataloged and can be secured through fine-grained permissions in Lake Formation.

What is zero-ETL?

Zero-ETL is a set of fully managed integrations by AWS that minimizes the need to build ETL data pipelines. It makes data available in Amazon SageMaker Lakehouse and Amazon Redshift from multiple operational, transactional, and enterprise sources. Extract, transform, and load (ETL) is the process of combining, cleaning, and normalizing data from different sources to prepare it for analytics, artificial intelligence (AI), and machine learning (ML) workloads. You don’t need to maintain complex ETL pipelines. We take care of the ETL for you by automating the creation and management of data replication.

What’s the difference between zero-ETL and Glue ETL?

AWS Glue now offers multiple ways for you to build data integration pipelines, depending on your integration needs.

  • Zero-ETL provides service-managed replication. It’s designed for scenarios where customers need a fully managed, efficient way to replicate data from one source to AWS with minimal configuration. Zero-ETL handles the entire replication process, including schema discovery and evolution, without requiring customers to write or manage any custom logic. This approach is ideal for creating up-to-date replicas of source data in near-real-time, with AWS managing the underlying infrastructure and replication process.
  • Glue ETL offers customer-managed data ingestion. It’s the preferred choice when customers need more control and customization over the data integration process or require complex transformations. With Glue ETL, customers can write custom transformation logic, combine data from multiple sources, apply data quality rules, add calculated fields, and perform advanced data cleansing or aggregation. This flexibility makes Glue ETL suitable for scenarios where data must be transformed or enriched before analysis.

It’s worth mentioning that the source connections are reusable between Glue ETL and Glue zero-ETL so that can easily support both patterns. After you create a connection once, you can choose to use the same connection across various AWS Glue components including Glue ETL, Glue Visual ETL and zero-ETL.  For example, you might start by creating a connection and a zero-ETL integration, but decide later to use the same connection to create a custom GlueETL pipeline.

This blog post will explore how zero-ETL capabilities combined with its new application connectors are transforming the way businesses integrate and analyze their data from popular platforms such as ServiceNow, Salesforce, Zendesk, SAP and others.

Use case

Consider a large company that relies heavily on data-driven insights to optimize its customer support processes. The company stores vast amounts of transactional data in ServiceNow. To gain a comprehensive understanding of their business and make informed decisions, the company needs to integrate and analyze data from ServiceNow seamlessly, identifying and addressing problems and root causes, managing service level agreements and compliance, and proactively planning for incident prevention.

The company is looking for an efficient, scalable, and cost-effective solution to collecting and ingesting data from ServiceNow, ensuring continuous near real-time replication, automated availability of new data attributes, robust monitoring capabilities to track data load statistics, and reliable data lake foundation supporting data versioning. This allows data analysts, data engineers, and data scientists to quickly explore ingested data and develop data products that meet the needs of business teams.

Solution overview

The following architecture diagram illustrates an efficient and scalable solution for collecting and ingesting replicated data from ServiceNow with zero-ETL integration. In this example we use ServiceNow as a source, but this can be done with any supported source such as Salesforce, Zendesk, SAP, or others. The AWS Glue managed connectors act as a bridge between ServiceNow and the target Amazon SageMaker Lakehouse, enabling seamless, near real-time data flow without the need for custom ETL and scheduling.

The following are the key components and steps in the integration process:

  1. Zero-ETL extracts and loads the data into Amazon S3, a highly scalable object storage service. The data is also registered in the Glue Data Catalog, a metadata repository. Additionally, it keeps the information synchronized by capturing changes that occur in ServiceNow and maintains data consistency by automatically performing schema evolution.
  2. Amazon CloudWatch, a monitoring and observability service, collects logs and metrics from the data integration process.
  3. Amazon EventBridge, a serverless event bus service, triggers a downstream process that allows you to build event-driven architecture as soon as your new data arrives in your target. Through EventBridge, customers can build on top of zero-ETL for a diverse set of use cases such as:

Prerequisites

Complete the following prerequisites before setting up the solution:

  1. Create a bucket in Amazon S3 called zero-etl-demo-<your AWS Account Number>-<AWS Region> (for example, zero-etl-demo-012345678901-us-east-1). The bucket will be used to store the data ingested by zero-ETL in Apache Iceberg which is an open table format (OTF) supporting ACID transactions (atomicity, consistency, isolation, and durability), seamless schema evolution, and data versioning using time travel.
  2. Create an AWS Glue database <your database name>, such as zero_etl_demo_db and associate the S3 bucket zero-etl-demo-<your AWS Account Number>-<AWS Region> as a location of the database. The database will be used to store the metadata related to the data integrations performed by zero-ETL.
  3. Update AWS Glue Data Catalog settings using the following IAM policy for fine-grained access control of the data catalog for zero-ETL.
  4. Create an AWS Identity and Access Management (IAM) role named zero_etl_demo_role. The IAM role will be used by zero-ETL to access the Glue Connector to read from the Service Now and write the data into the target. Optionally, you can create two separate IAM roles (one associated with your source data and another associated with your target).
  5. Make sure you have a ServiceNow instance named ServiceNowInstance, a user named ServiceNowUser, and a password passwordServiceNowPassword with the required permissions to read from ServiceNow. The instance name, user, and password are used in the AWS Glue connection to authenticate within ServiceNow using the BASIC authentication type. Optionally, you can choose OAUTH2 if your ServiceNow supports it.
  6. Create the secret zero_etl_demo_secret in AWS Secrets Manager to store ServiceNow credentials.

Build and verify the zero-ETL integration

Complete the following steps to create and validate zero-ETL integration:

Step 1: Set up a connector

Zero-ETL integration, when used with AWS Glue natively supported applications connectors, provides a straightforward way to bring third-party data into an Amazon S3 transactional data lake or Amazon Redshift. Use the following steps to create a ServiceNow data connection:

  1. Open the AWS Glue console.
  2. In the navigation pane, under Data catalog, choose Connections.
  3. Choose Create Connection.
  4. In the Create Connection pane, enter ServiceNow in Data Sources.
  5. Choose ServiceNow.
  6. Choose Next.
  7. For Instance Name, enter ServiceNowInstance (created as part of the prerequisites).
  8. For IAM service role, choose the zero_etl_demo_role (created as part of the prerequisites).
  9. For Authentication Type, choose the authentication type that you’re using for ServiceNow. In this example. we have chosen OAUTH2, which requires the set up of Application Registries in ServiceNow.
  10. For AWS Secret, choose the secret zero_etl_demo_secret (created as part of the prerequisites).
  11. Choose Next.
  12. In the Connection Properties section, for Name, enter zero_etl_demo_conn.
  13. Choose Next.
  14. Choose Create connection.

  15. There will be a popup from ServiceNow after you choose Create connection. Choose Allow.

Step 2: Set up Zero-ETL integration

After creating the data connection to ServiceNow, use the following steps to create the zero-ETL integration:

  1. Open the AWS Glue console.
  2. In the navigation pane, under Data catalog, choose Zero-ETL integrations.
  3. Choose Create zero-ETL integration.
  4. In the Create integration pane, enter ServiceNow in Data Sources.
  5. Choose ServiceNow.
  6. Choose Next.
  7. For ServiceNow connection, choose the data connection created on Step 1—zero_etl_demo_conn.
  8. For Source IAM role, choose the zero_etl_demo_role (from the prerequisites).
  9. For ServiceNow objects, choose the objects you want to perform the ingestion managed by zero-ETL integration. For this post, choose problem and incident objects.
  10. For Namespace or Database, choose <your database name>. In this example, we use the zero_etl_demo_db (from the prerequisites).
  11. For Target IAM role, choose the zero_etl_demo_role (from the prerequisites).
  12. Choose Next.
  13. For Security and data encryption, you can choose either AWS Managed KMS Key or choose a customer KMS key managed by AWS Key Management Service. For this post, choose Use AWS managed KMS key.
  14. In the Integration details section, for Name, enter zero-etl-demo-integration.
  15. Choose Next.
  16. Review the details and choose Create and launch integration.
  17. The newly created integration will show as Active in about a minute.

Step 3: Verify the initial SEED load

The SEED load refers to the initial loading of the tables that you want to ingest into an Amazon SageMaker Lakehouse using zero-ETL integration. The status and statistics of the SEED load are published into CloudWatch and the data ingested by zero-ETL integration can be accessed in AWS using a set of services such Amazon Sagemaker Unified StudioAmazon QuickSight, and others. Use the following steps to access zero-ETL integration logs and query the data:

  1. Open the AWS Glue console.
  2. In the navigation pane, choose Zero-ETL integrations.
  3. In the Zero-ETL integrations section, choose zero-etl-demo-integration.
  4. In the Activity summary (all time) section, choose CloudWatch logs.
  5. Check CloudWatch log events for the SEED Load. For each table ingested by the zero-ETL integration, two groups of logs are created: status and statistics. Highlighted in the following screenshot in IngestionTableStatistics are the statistics. The insertCount represents how many rows were extracted and loaded by zero-ETL integration. For the SEED load, you will always see only insertCount because it’s the initial load. In addition, in IngestionCompleted you will find information about the Zero-ETL integration such as status, load type, and message.

To validate the SEED load, query the data using Amazon Sagemaker Unified Studio.

  1. Access Amazon Sagemaker Unified Studio for your specific domain through your AWS Console.
  2. Open the Amazon SageMaker Unified Studio URL.
  3. Sign in with SSO or AWS IAM user.
  4. Select your project.
  5. Go to Data from the left menu, expand the Lakehouse AWSDataCatalog, expand your database, and select the incident table. Click the icon and select Query with Athena.
  6. For Query, enter the following statement:
    SELECT count(*) AS incidents_count
    FROM "zero_etl_demo_db"."incident"

  7. Choose Run.
  8. Let’s check an existing incident in ServiceNow. This is the incident that you will update the description of in ServiceNow to validate change data capture (CDC). In the query editor, pane, for Query, enter the following statement:
    SELECT number
    , short_description
    , description
    FROM "zero_etl_demo_db"."incident"
    WHERE number = 'INC0000003' -- update to your Incident number

  9. Choose Run.

Step 4: Validate CDC

The CDC load is a technique used to identify and process only the data that has changed in a source system since the last extraction. Instead of reloading an entire dataset, CDC captures and transfers only the new, updated, or deleted records into the target system, making data processing more efficient and reducing load times. The status and statistics of the CDC load are published into CloudWatch. For this post, you will use Amazon SageMaker unified studio to query the data ingested. Use the following steps to access zero-ETL integration logs and query the data ingested. For the next step in this example, you will select an incident and perform an update in ServiceNow, changing the short_description and description of the incident.

  1. To demonstrate CDC event, in this blog we are going to edit 1 incident and delete 1 incident in ServiceNow.
  2. Open the AWS Glue console.
  3. In the navigation pane, under Data catalog, choose Zero-ETL integrations.
  4. In the Zero-ETL integrations section, choose zero-etl-demo-integration.
  5. In the Activity summary (all time) section, choose CloudWatch logs.
  6. Zero-ETL integration replicates the changes to the Amazon S3 transactional data lake every 60 minutes by default. Check CloudWatch log events for the CDC load. Shown in the following figure in IngestionTableStatistics, review updateCount and deleteCount for each specific object managed by zero-ETL integration. It’s applying the updates and deletes that happened in ServiceNow to the transactional data lake.

To validate the CDC load, query the data using Amazon SageMaker Unified Studio.

  1. You can go back to Amazon SageMaker Unified Studio.
  2. For Query, enter the following statement:
    SELECT count(*) AS incidents_count
    FROM "zero_etl_demo_db"."incident"

  3. For Query, enter the following statement to record initial snapshot results before CDC:
    SELECT number
        , short_description
        , description
    FROM "zero_etl_demo_db"."incident"
    WHERE number = 'INC0000003' -- update to your Incident number

  4. Choose Run and confirm that one record was updated in short_description and description attributes.

By following these steps, you can effectively set up, build, and verify a zero-ETL job using the new AWS Glue application connector for ServiceNow. This process demonstrates the simplicity and efficiency of the zero-ETL approach in integrating applications data into your AWS environment.

Apache Iceberg Time Travel: Enhancing data versioning in zero-ETL

One of the benefits of using Apache Iceberg in zero-ETL integration is the ability to perform Time Travel. This feature allows you to access and query historical versions of your data effortlessly. With Iceberg Time Travel, you can easily roll back to previous data states, compare data across different points in time, or recover from accidental data changes. In the context of zero-ETL integrations, this capability becomes particularly valuable when dealing with rapidly changing applications data.

To demonstrate this feature, let’s consider a scenario where you’re analyzing ServiceNow incident data ingested through zero-ETL integration using Amazon SageMaker Unified Studio. Here’s an example query that showcases Iceberg time travel:

-- Query incident data as of particular timestamp before CDC
SELECT number,
    short_description,
    description
FROM "zero_etl_demo_db"."incident" 
FOR TIMESTAMP AS OF TIMESTAMP '2024-11-06 05:10:00 UTC' 
-- update this timestamp value to before your CDC update
WHERE number = 'INC0000003' -- update to your Incident number
-- Compare with current data
SELECT number,
    short_description,
    description
FROM "zero_etl_demo_db"."incident"
WHERE number = 'INC0000003' -- update to your Incident number

In this example:

  1. The first query uses the FOR TIMESTAMP AS OF clause for time travel queries on Iceberg tables. It retrieves incident data as it existed before CDC update for the specific incident number INC0000003.
  2. The second query fetches the current state of the data for the same incident number.

This capability allows you to track the evolution of incidents, identify trends in resolution times, or recover information that may have been inadvertently altered.

Clean up

To avoid incurring future charges, remove up the resources used in this post from your AWS account by completing the following steps:

  1. Delete zero-ETL integration zero-etl-demo-integration.
  2. Delete content from the S3 bucket zeroetl-etl-demo-<your AWS Account Number>-<AWS Region>.
  3. Delete the Data Catalog database zero_etl_demo_db.
  4. Delete the Data Catalog connection zero_etl_demo_conn.
  5. Delete the AWS Secrets manager Secret.

Conclusion

As the pace of business continues to accelerate, the ability to quickly and efficiently integrate data from various applications and enterprise platforms has become a critical competitive advantage. By adopting a zero-ETL integration powered by AWS Glue and its new set of managed connectors, you organization can unlock the full potential of its data across multiple platforms faster and stay ahead of the curve.

To learn more about how AWS Amazon SageMaker Lakehouse can help your organization streamline its data integration efforts, visit Amazon SageMaker Lakehouse.

Get started with zero-ETL on AWS by creating a free account today!


About the authors

Shovan Kanjilal is a Senior Analytics and Machine Learning Architect with Amazon Web Services. He is passionate about helping customers build scalable, secure and high-performance data solutions in the cloud.

Vivek Pinyani is a Data Architect at AWS Professional Services with expertise in Big Data technologies. He focuses on helping customers build robust and performant Data Analytics solutions and Data Lake migrations. In his free time, he loves to spend time with his family and enjoys playing cricket and running.

Kartikay KhatorKartikay Khator is a Solutions Architect within Global Life Sciences at AWS, where he dedicates his efforts to developing innovative and scalable solutions that cater to the evolving needs of customers. His expertise lies in harnessing the capabilities of AWS analytics services. Extending beyond his professional pursuits, he finds joy and fulfillment in the world of running and hiking. Having already completed multiple marathons, he is currently preparing for his next marathon challenge.

Caio Sgaraboto Montovani is a Sr. Specialist Solutions Architect, Data Lake and AI/ML within AWS Professional Services, developing scalable solutions according customer needs. His vast experience has helped customers in different industries such as life sciences and healthcare, retail, banking, and aviation build solutions in data analytics, machine learning, and generative AI. He is passionate about rock and roll and cooking and loves to spend time with his family.

Kamen SharlandjievKamen Sharlandjiev is a Sr. Big Data and ETL Solutions Architect, Amazon MWAA and AWS Glue ETL expert. He’s on a mission to make life easier for customers who are facing complex data integration and orchestration challenges. His secret weapon? Fully managed AWS services that can get the job done with minimal effort. Follow Kamen on LinkedIn to keep up to date with the latest Amazon MWAA and AWS Glue features and news!

How ANZ Institutional Division built a federated data platform to enable their domain teams to build data products to support business outcomes

Post Syndicated from Leo Ramsamy original https://aws.amazon.com/blogs/big-data/how-anz-institutional-division-built-a-federated-data-platform-to-enable-their-domain-teams-to-build-data-products-to-support-business-outcomes/

In today’s rapidly evolving financial landscape, data is the bedrock of innovation, enhancing customer and employee experiences and securing a competitive edge. Recognizing this paradigm shift, ANZ Institutional Division has embarked on a transformative journey to redefine its approach to data management, utilization, and extracting significant business value from data insights.

Like many large financial institutions, ANZ Institutional Division operated with siloed data practices and centralized data management teams. As time went on, the limitations of this approach became apparent due to rising data complexity, larger volumes, and the growing demand for swift, business-driven insights. Consequently, the bank encountered several challenges and needed to take the following actions:

  • Create business insights from untapped data potential, estimated to be approximately $150 million in the Institutional Division alone
  • Improve operational efficiency by removing manual data handling, the use of spreadsheets, and duplicate data entries
  • Increase agility by making data expertise more readily available, thereby improving time to market and overall customer experience
  • Address data quality
  • Standardize tooling and remove the Shadow IT culture, driving scalability, reducing risk, and minimizing overall operational inefficiencies

These challenges are not unique to ANZ Institutional Division. Globally, financial institutions have been experiencing similar issues, prompting a widespread reassessment of traditional data management approaches.

One major trend, embraced by many financial institutions, has been the adoption of the data mesh architecture and the shift towards treating data as a product. This paradigm, pioneered by thought leaders like Zhamak Dehghani, introduces a decentralized approach to data management that aligns closely with modern organizational structures and agile methodologies.

Some notable global examples of leading companies embracing and implementing this trend are JPMorgan Chase, Capital One, and Saxo Bank.

Inspired by these global trends and driven by its own unique challenges, ANZ’s Institutional Division decided to pivot from viewing data as a byproduct of projects to treating it as a valuable product in its own right.

This shift promises several business benefits:

  • Empowered domain expertise – By decentralizing data ownership to domain-based teams, ANZ can use the deep business knowledge within each unit to create more relevant and valuable data products
  • Increased agility – Domain teams can now respond more quickly to business needs, creating and iterating on data products without relying on a centralized bottleneck
  • Improved data quality – With domain experts overseeing their own data, there’s a greater likelihood of catching and correcting quality issues at the source
  • Scalability – The federated approach allows for greater scalability, enabling ANZ to handle increasing data volumes and complexity more effectively
  • Innovation catalyst – By democratizing data access and empowering teams to create data products, ANZ is fostering a culture of innovation and data-driven decision-making across the organization

This transition is not just about technology; it represents a fundamental shift in how ANZ views and values its data assets. By treating data as a product, the bank is positioned to not only overcome current challenges, but to unlock new opportunities for growth, customer service, and competitive advantage.

This post explores how the shift to a data product mindset is being implemented, the challenges faced, and the early wins that are shaping the future of data management in the Institutional Division.

ANZ’s federated data strategy

In response to the challenges, ANZ Group formulated a data strategy that focuses on empowering employees to securely use data to improve the sustainability and financial well-being of their customers. At its core are the following pillars:

  • Introducing new ways of working that focus on generating customer value first
  • New technology platforms and tooling that allow the bank to collect, share, archive, and dispose data in a secure and controlled way
  • Achieving consistency in how data is produced and consumed across the entire bank through data products and better-connected systems
  • Supporting the bank’s risk and regulatory obligations by providing a secure and resilient data platform that provides fine-grained, controlled access to quality data products

ANZ has made the strategic decision to adopt an architectural and operational model aligned with the data mesh paradigm, which revolves around four key principles: domain ownership, data as a product, a self-serve data platform, and federated computational governance.

Domain ownership recognizes that the teams generating the data have the deepest understanding of it and are therefore best suited to manage, govern, and share it effectively. This principle makes sure data accountability remains close to the source, fostering higher data quality and relevance.

Treating data as a product instils a product-centric mindset, emphasizing that data must be secure, discoverable, understandable, interoperable, reusable, and managed throughout its lifecycle. This principle makes sure data consumers, both internal and external, derive consistent value from well-designed data products.

A self-serve data platform empowers domains to create, discover, and consume data products independently. It abstracts technical complexities and provides user-friendly tools, enabling a scalable, repeatable, and automated approach to producing high-quality data products.

Under the federated mesh architecture, each divisional mesh functions as a node within the broader enterprise data mesh, maintaining a degree of autonomy in managing its data products. To effectively coordinate these autonomous nodes and facilitate seamless integration, enterprise-wide standards, such as those related to data governance, interoperability, and security, are essential to maintain alignment and consistency across all nodes and domains and teams within.

With this approach, each node in ANZ maintains its divisional alignment and adherence to data risk and governance standards and policies to manage local data products and data assets. This enables global discoverability and collaboration without centralizing ownership or operations.

As a result, governance resides with the data products themselves, making sure standards and policies, such as access control, data quality, and compliance, are enforced where the data lives. In this regard, the enterprise data product catalog acts as a federated portal, facilitating cross-domain access and interoperability while maintaining alignment with governance principles. This model balances node or domain-level autonomy with enterprise-level oversight, creating a scalable and consistent framework across ANZ.

Within the ANZ enterprise data mesh strategy, aligning data mesh nodes with the ANZ Group’s divisional structure provides optimal alignment between data mesh principles and organizational structure, as shown in the following diagram.

Central to the success of this strategy is its support for each division’s autonomy and freedom to choose their own domain structure, which is closely aligned to their business needs. Divisions decide how many domains to have within their node; some may have one, others many. These nodes can implement analytical platforms like data lake houses, data warehouses, or data marts, all united by producing data products. Nodes and domains serve business needs and are not technology mandated.

Under the federated computational governance model, the ANZ Group strategy defines guardrails that treat a node as a logical data container suitable for the following:

  • Ingestion and metadata management
  • Creating source-aligned data products complying with ANZ’s Data Product Specification (DPS)
  • Integrating source-aligned data products from other nodes
  • Producing consumer-aligned data products for specific business purposes
  • Publishing conforming data products to ANZ’s Data Product Catalog (DPC)

Following on from this strategy is organizing its domain structure to provide autonomy to various functional teams while preserving the core values of data mesh. The following diagram depicts an example of the possible structure.

For instance, Domain A will have the flexibility to create data products that can be published to the divisional catalog, while also maintaining the autonomy to develop data products that are exclusively accessible to teams within the domain. These products will not be available to others until they are deemed ready for broader enterprise use.

This strategy supports each division’s autonomy to implement their own data catalogs and decide which data products to publish to the group-level catalog. This flexibility extends to divisional domains, which can choose which data products to publish to the divisional catalog or keep visible only to domain consumers.

Institutional Data & AI Platform architecture

The Institutional Division has implemented a self-service data platform to enable the domain teams to build and manage data products autonomously. The Institutional Data & AI platform adopts a federated approach to data while centralizing the metadata to facilitate simpler discovery and sharing of data products. The following diagram illustrates the building blocks of the Institutional Data & AI Platform.

The building blocks are as follows:

  1. Foundational Data & AI Platform capabilities – A dedicated data platform team provides domain-agnostic tools, systems, and capabilities to enable autonomous data product development across domains. This self-serve infrastructure allows domain teams to manage the full data lifecycle without relying on a centralized data team. Key capabilities include data storage, data onboarding and transformation, and data utilities that facilitate data sharing with interoperability between domains. These capabilities abstract the technical complexities associated with data management infrastructure, allowing domain experts to focus on creating valuable data products rather than infrastructure management.
  2. Domain-owned data assets – The domain-oriented data ownership approach distributes responsibility for data across the business units within the Institutional Division. Domain teams are responsible for developing, deploying, and managing their own analytical data products alongside operational data services. Data contracts authored by data product owners automate data product creation and provide a standard to access data products. By treating the data as a product, the outcome is a reusable asset that outlives a project and meets the needs of the enterprise consumer. Consumer feedback and demand drives creation and maintenance of the data product.
  3. Division-level metadata management and data governance – A centrally hosted service provides domain teams with the capability to publish their data products along with relevant metadata, like business definitions and lineage. Some of the key features implemented are:
    1. Metadata management that centralizes metadata and presents it within the context of data products, such as data quality scores and data product lineage.
    2. A data portal for consumers to discover data products and access associated metadata.
    3. Subscription workflows that simplify access management to the data products.
    4. Computational governance that enforces divisional and enterprise data policies and standards, such as data classification and business data models for aligning terminology.

The following diagram is a high-level example of the technical architecture approach towards the Institutional Data & AI Platform. The solution uses a building block approach, on a cloud-centered platform comprised of AWS services, with partner solutions and open standards like OpenLineage and Apache Iceberg.

Let’s look at the key services that enable the federated platform to operate at scale:

  • Data storage and processing:
    • Apache Iceberg on Amazon Simple Storage Service (Amazon S3) offers an optimized way to store data assets and products and promotes interoperability across other services
    • Amazon Redshift allows domain teams to create and manage fit-for-purpose data marts
    • AWS Lambda and AWS Glue are used for data onboarding and processing, and data utilities created in Python and PySpark promote reusability and quality across the data processing pipelines
    • dbt simplifies data transformation rules and allows sub-domain data analysts to build modeling logic as SQL statements
    • Amazon Managed Workflows for Apache Airflow (Amazon MWAA) enables efficient management of workflows and data pipeline orchestration using out-of-the-box integrations with AWS services
  • Metadata management and data governance:
    • To maintain data reliability and accuracy, a robust data quality framework using Soda core is used that automates data quality using checks defined in a data contract
    • Amazon DataZone enables data product cataloging, discovery, metadata management, and implementing computational governance
    • OpenLineage simplifies harvesting and collection of data and process-level lineage, which are then published to Amazon DataZone
    • AWS Lake Formation, combined with AWS Glue Data Catalog, provides data governance and access management to data products that reside within sub-domains
  • Analytics:
    • Tableau offers capabilities for sub-domains with data visualization and business intelligence capabilities
  • Observability and security:
    • Observability needs of the platform are built into all the processes using monitoring, with logging functionality provided by Amazon CloudWatch and AWS CloudTrail
    • AWS Secrets Manager makes sure secrets are stored and made available for data pipelines to access services in a secure manner

The technical implementation actualizes the data product strategy at ANZ Institutional Division. Amazon DataZone plays an essential role in facilitating data product management for the domain teams. The service addresses several critical aspects of the Institutional Division’s data product strategy, including:

  • Data cataloging and metadata management – Amazon DataZone provides comprehensive data cataloging and metadata management capabilities
  • Data governance and compliance – Effective data governance is essential for scaling data products
  • Self-service capabilities – Amazon DataZone empowers domain teams with self-service capabilities, enabling them to create, manage, and deploy data products independently
  • Integration and interoperability – One of the challenges in scaling data products is providing seamless integration across various data sources and systems
  • Collaboration and sharing – Amazon DataZone provides a platform for sharing data and metadata across teams and domains

Institutional Division’s delivery model to achieve scale

The Institutional Division has successfully used the federated architecture, and key to this delivery model is the implementation of Foundational Data & AI Platform capabilities that serve all domains within the division. This model promotes self-service and accelerates the delivery of subsequent initiatives by using the capabilities built for previous use cases.

To evaluate the success of the delivery model, ANZ has implemented key metrics, such as cost transparency and domain adoption, to guide the data mesh governance team in refining the delivery approach. For instance, one enhancement involves integrating cross-functional squads to support data literacy.

The key to scaling the Institutional Division operating model are the following considerations:

  • Data as a product approach – Use techniques like event storming and domain-driven design to capture business events and their meanings.
  • Education and enablement – Conduct learning interventions to upskill teams on understanding and using the data as a product approach.
  • Iterative data platform delivery – Work backward from business initiative to iteratively deliver self-service data platform infrastructure capabilities.
  • Managing demand efficiently – Implement a feedback mechanism to manage demand on data products. Track and manage data debt using standard data contract specifications. Most importantly, adopt governance and standards to make sure data products are built and maintained with a long-term perspective, minimizing technical debt.

“The Institutional Data & Analytics Platform (IDAP) has allowed the Institutional team to establish a base foundation to allow various teams to aggregate and consume the wealth of data across the division. This self-service platform enables business leaders to both create and consume reusable data products, unlocking value across this division. It’s also an excellent proof point for our broader data mesh architecture, allowing us to connect this divisional data to broader enterprise data stores—further positioning us to put the customer at the center of everything we do.”

– Tim Hogarth, CTO ANZ

“AWS believes that democratizing data, while not compromising on security and fine-grained access, is a key component of any future-proof, scalable data platform, so we are pleased to be enabling ANZ bank’s IDAP metadata management and data governance capabilities through Amazon DataZone. This allows the diverse business functions at ANZ the autonomy to self-serve on their data needs with built-in governance.”

– Shikha Verma, Head of Product, Amazon DataZone

Conclusion

ANZ’s journey to move towards a data product approach has improved the organization’s approach to manage data and reduce data silos, and has positioned it to become a data-driven, customer-centric organization. By combining federated platform practices and adopting AWS services and open standards, ANZ Institutional Division is achieving its objectives in decentralization with a scalable data platform that enables its domain teams to make informed decisions, drive innovation, and maintain a competitive edge.

Special thanks: This implementation success is a result of close collaboration between ANZ Institutional Division, AWS ProServe, and the AWS account team. We want to thank ANZ Institutional Executives and the Leadership Team for the strong sponsorship and direction.


About the Authors

Leo Ramsamy is a Platform Architect specializing in data and analytics for ANZ’s Institutional division. He focuses on modern data practices, including Data Mesh architecture, data governance, quality management, and observability. His work aligns data strategies with business goals, improving accessibility and enabling better decision-making across ANZ.

Srinivasan Kuppusamy is a Senior Cloud Architect – Data at AWS ProServe, where he helps customers solve their business problems using the power of AWS Cloud technology. His areas of interests are data and analytics, data governance, and AI/ML.

Rada Stanic is a Chief Technologist at Amazon Web Services, where she helps ANZ customers across different segments solve their business problems using AWS Cloud technologies. Her special areas of interest are data analytics, machine learning/AI, and application modernization.

Introducing AWS Glue Data Catalog automation for table statistics collection for improved query performance on Amazon Redshift and Amazon Athena

Post Syndicated from Sotaro Hikita original https://aws.amazon.com/blogs/big-data/introducing-aws-glue-data-catalog-automation-for-table-statistics-collection-for-improved-query-performance-on-amazon-redshift-and-amazon-athena/

The AWS Glue Data Catalog now automates generating statistics for new tables. These statistics are integrated with the cost-based optimizer (CBO) from Amazon Redshift Spectrum and Amazon Athena, resulting in improved query performance and potential cost savings.

Queries on large datasets often read extensive amounts of data and perform complex join operations across multiple datasets. When a query engine like Redshift Spectrum or Athena processes the query, the CBO uses table statistics to optimize it. For example, if the CBO knows the number of distinct values in a table column, it can choose the optimal join order and strategy. These statistics must be collected beforehand and should be kept up to date to reflect the latest data state.

Previously, the Data Catalog has supported collecting table statistics used by the CBO for Redshift Spectrum and Athena for tables with Parquet, ORC, JSON, ION, CSV, and XML formats. We introduced this feature and its performance benefits in Enhance query performance using AWS Glue Data Catalog column-level statistics. Additionally, the Data Catalog also has supported Apache Iceberg tables. We’ve also covered this in detail in Accelerate query performance with Apache Iceberg statistics on the AWS Glue Data Catalog.

Previously, creating statistics for Iceberg tables in the Data Catalog required you to continuously monitor and update configurations for your tables. You had to do undifferentiated heavy lifting to do the following:

  • Discover new tables with specific data table formats (such as Parquet, JSON, CSV, XML, ORC, ION) and specific transactional data table formats such as Iceberg and their individual bucket paths
  • Determine and set up compute tasks based on scan strategy (sampling percentage and schedules)
  • Configure AWS Identity and Access Management (IAM) and AWS Lake Formation roles for specific tasks to provide specific Amazon Simple Storage Service (Amazon S3) access, Amazon CloudWatch logs, AWS Key Management Service (AWS KMS) keys for CloudWatch encryption, and trust policies
  • Set up event notification systems to understand changes in data lakes
  • Set up specific optimizer configuration-based query performance and storage improvement strategies
  • Set up a scheduler or build your own event-based compute tasks with setup and teardown

Now, the Data Catalog lets you generate statistics automatically for updated and created tables with a one-time catalog configuration. You can get started by selecting the default catalog on the Lake Formation console and enabling table statistics on the table optimization configuration tab. As new tables are created, the number of distinct values (NDVs) are collected for Iceberg tables, and additional statistics such as the number of nulls, maximum, minimum, and average length are collected for other file formats such as Parquet. Redshift Spectrum and Athena can use the updated statistics to optimize queries, using optimizations such as optimal join order or cost-based aggregation pushdown. The AWS Glue console provides you visibility into the updated statistics and statistics generation runs.

Now, data lake administrators can configure weekly statistics collection across all databases and tables in their catalog. When the automation is enabled, the Data Catalog generates and updates column statistics for all columns in the tables on a weekly basis. This job analyzes 20% of records in the tables to calculate statistics. These statistics can be used by Redshift Spectrum and Athena CBO to optimize queries.

Furthermore, this new feature provides the flexibility to configure automation settings and scheduled collection configurations at the table level. Individual data owners can override catalog-level automation settings based on specific requirements. Data owners can customize settings for individual tables, including whether to enable automation, collection frequency, target columns, and sampling percentage. This flexibility allows administrators to maintain an optimized platform overall, while enabling data owners to fine-tune individual table statistics.

In this post, we discuss how the Data Catalog automates table statistics collection and how you can use it to enhance your data platform’s efficiency.

Enable catalog-level statistics collection

The data lake administrator can enable catalog-level statistics collection on the Lake Formation console. Complete the following steps:

  1. On the Lake Formation console, choose Catalogs in the navigation pane.
  2. Select the catalog that you want to configure, and choose Edit on the Actions menu.

  1. Select Enable automatic statistics generation for the tables of the catalog and choose an IAM role. For the required permissions, see Prerequisites for generating column statistics.
  2. Choose Submit.

You can also enable catalog-level statistics collection through the AWS Command Line Interface (AWS CLI):

aws glue update-catalog --cli-input-json '{
    "name": "123456789012",
    "catalogInput": {
        "description": "Updating root catalog with role arn",
        "catalogProperties": {
            "customProperties": {
                "ColumnStatistics.RoleArn": "arn:aws:iam::123456789012:role/service-role/AWSGlueServiceRole",
                "ColumnStatistics.Enabled": "true"
            }
        }
    }
}'

The command calls the AWS Glue UpdateCatalog API, which takes in a CatalogProperties structure that expects the following key-value pairs for catalog-level statistics:

  • ColumnStatistics.RoleArn – The IAM role Amazon Resource Name (ARN) to be used for all jobs triggered for catalog-level statistics
  • ColumnStatistics.Enabled – A Boolean value indicating whether the catalog-level settings are enabled or disabled

Callers of UpdateCatalog must have UpdateCatalog IAM permissions and be granted ALTER on CATALOG permissions on the root catalog if using Lake Formation permissions. You can call the GetCatalog API to verify the properties that are set to your catalog properties. For the required permissions used by the role passed, see Prerequisites for generating column statistics.

By following these steps, catalog-level statistics collection is enabled. AWS Glue then automatically updates statistics for all columns in each table, sampling 20% of records on a weekly basis. This allows data lake administrators to effectively manage the data platform’s performance and cost-efficiency.

View automated table-level settings

When catalog-level statistics collection is enabled, when an Apache Hive table or Iceberg table is created or updated using the AWS Glue CreateTable or UpdateTable APIs through the AWS Glue console, AWS SDK, or AWS Glue crawlers, an equivalent table level setting is created for that table.

Tables with automatic statistics generation enabled must follow one of following properties:

  • HIVE table formats such as Parquet, Avro, ORC, JSON, ION, CSV, and XML
  • Apache Iceberg table format

After a table has been created or updated, you can confirm that a statistics collection setting has been set by checking the table description on the AWS Glue console. The setting should have the Schedule property set as Auto and Statistics configuration set as Inherited from catalog. Any table setting with the following settings is automatically triggered by AWS Glue internally.

The following is an image of a Hive Table where catalog-level statistics collection has been applied and statistics have been collected:

The following is an image of a Iceberg Table where catalog-level statistics collection has been applied and statistics have been collected:

Configure table-level statistics collection

Data owners can customize statistics collection at the table level to meet specific needs. For frequently updated tables, statistics can be refreshed more often than weekly. You can also specify target columns to focus on those most commonly queried.

Moreover, you can set what percentage of table records to use when calculating statistics. Therefore, you can increase this percentage for tables that need more precise statistics, or decrease it for tables where a smaller sample is sufficient to optimize costs and statistics generation performance.

These table-level settings can override the catalog-level settings previously described.

To configure table-level statistics collection on AWS Glue console, complete the following steps:

  1. On the AWS Glue console, choose Databases under Data Catalog in the navigation pane.
  2. Choose a database to view all available tables (for example, optimization_test).
  3. Choose the table to be configured (for example, catalog_returns).
  4. Go to Column statistics and choose Generate on schedule.
  5. In the Schedule section, choose the frequency from Hourly, Daily, Weekly, Monthly and Custom (cron expression). In this example, for Frequency, choose Daily.
  6. For Start time, enter 06:43 in UTC.

  1. For Column options, select All columns.
  2. For IAM role, choose an existing role, or create a new role. For the required permissions, see Prerequisites for generating column statistics.

  1. Under Advanced configuration, for Security configuration, optionally choose your security configuration to enable at-rest encryption on the logs pushed to CloudWatch.
  2. For Sample rows, enter 100 as the percentage of rows to sample.
  3. Choose Generate statistics.

In the table description on the AWS Glue console, you can confirm that a statistics collection job has been scheduled for the specified date and time.

By following these steps, you have configured table-level statistics collection. This allows data owners to manage table statistics based on their specific requirements. Combining this with catalog-level settings by data lake administrators enables securing a baseline for optimizing the entire data platform while flexibly addressing individual table requirements.

You can also create a column statistics generation schedule through the AWS CLI:

aws glue create-column-statistics-task-settings \
  --database-name 'database_name' \
  --table-name table_name \
  --role 'arn:aws:iam::123456789012:role/stats-role' \
  --schedule 'cron(8 0-5 14 * * ?)' \
  --column-name-list 'col-1' \
  --catalog-id '123456789012' \
  --sample-size '10.0' \
  --security-configuration 'test-security'

The required parameters are database-name, table-name, and role. You can also include optional parameters such as schedule, column-name-list, catalog-id, sample-size, and security-configuration. For more information, see Generating column statistics on a schedule.

Conclusion

This post introduced a new feature in the Data Catalog that enables automated statistics collection at the catalog level with flexible per-table controls. Organizations can effectively manage and maintain up-to-date column-level statistics. By incorporating these statistics, CBO in both Redshift Spectrum and Athena can optimize query processing and cost-efficiency.

Try out this feature for your own use case, and let us know your feedback in the comments.


About the Authors

Sotaro Hikita is an Analytics Solutions Architect. He supports customers across a wide range of industries in building and operating analytics platforms more effectively. He is particularly passionate about big data technologies and open source software.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Kyle Duong is a Senior Software Development Engineer on the AWS Glue and AWS Lake Formation team. He is passionate about building big data technologies and distributed systems.

Sandeep Adwankar is a Senior Product Manager at AWS. Based in the California Bay Area, he works with customers around the globe to translate business and technical requirements into products that enable customers to improve how they manage, secure, and access data.

Amazon SageMaker Lakehouse and Amazon Redshift supports zero-ETL integrations from applications

Post Syndicated from Veliswa Boya original https://aws.amazon.com/blogs/aws/introducing-amazon-sagemaker-lakehouse-support-for-zero-etl-integrations-from-applications/

Today, we announced the general availability of Amazon SageMaker Lakehouse and Amazon Redshift support for zero-ETL integrations from applications. Amazon SageMaker Lakehouse unifies all your data across Amazon Simple Storage Service (Amazon S3) data lakes and Amazon Redshift data warehouses, helping you build powerful analytics and AI/ML applications on a single copy of data. SageMaker Lakehouse gives you the flexibility to access and query your data in-place with all Apache Iceberg compatible tools and engines. Zero-ETL is a set of fully managed integrations by AWS that minimizes the need to build ETL data pipelines for common ingestion and replication use cases. With zero-ETL integrations from applications such as Salesforce, SAP, and Zendesk, you can reduce time spent building data pipelines and focus on running unified analytics on all your data in Amazon SageMaker Lakehouse and Amazon Redshift.

As organizations rely on an increasingly diverse array of digital systems, data fragmentation has become a significant challenge. Valuable information is often scattered across multiple repositories, including databases, applications, and other platforms. To harness the full potential of their data, businesses must enable access and consolidation from these varied sources. In response to this challenge, users build data pipelines to extract and load (EL) from multiple applications into centralized data lakes and data warehouses. Using zero-ETL, you can efficiently replicate valuable data from your customer support, relationship management, and enterprise resource planning (ERP) applications for analytics and AI/ML to datalakes and data warehouses, saving you weeks of engineering effort needed to design, build, and test data pipelines.

Prerequisites

  • An Amazon SageMaker Lakehouse catalog configured through AWS Glue Data Catalog and AWS Lake Formation.
  • An AWS Glue database that is configured for Amazon S3 where the data will be stored.
  • A secret in AWS Secret Manager to use for the connection to the data source. The credentials must contain the username and password that you use to sign in to your application.
  • An AWS Identity and Access Management (IAM) role for the Amazon SageMaker Lakehouse or Amazon Redshift job to use. The role must grant access to all resources used by the job, including Amazon S3 and AWS Secrets Manager.
  • A valid AWS Glue connection to the desired application.

How it works – creating a Glue connection prerequisite
I start by creating a connection using the AWS Glue console. I opt for a Salesforce integration as the data source.

Next, I provide the location of the Salesforce instance to be used for the connection, together with the rest of the required information. Be sure to use the .salesforce.com domain instead of .force.com. Users can choose between two authentication methods, JSON Web Token (JWT), which is obtained through Salesforce access tokens, or OAuth login through the browser.

I review all the information and then choose Create connection.

After I sign into the Salesforce instance through a popup (not shown here), the connection is successfully created.

How it works – creating a zero-ETL integration
Now that I have a connection, I choose zero-ETL integrations from the left navigation panel, then choose Create zero-ETL integration.

First I choose the source type for my integration – in this case Salesforce so I can use my recently created connection.

Next, I select objects from the data source that I want to replicate to the target database in AWS Glue.

While in the process of adding objects, I can quickly preview both data and metadata to confirm that I am selecting the correct object.

By default, zero-ETL integration will synchronize data from the source to the target every 60 minutes. However, you can change this interval to reduce the cost of replication for cases that do not require frequent updates.

I review and then choose Create and launch integration.

The data in the source (Salesforce instance) has now been replicated to the target database salesforcezeroETL in my AWS account. This integration has two phases. Phase 1: initial load will ingest all the data for the selected objects and may take between 15 min to a few hours depending on the size of the data in these objects. Phase 2: incremental load will detect any changes (such as new records, updated records, or deleted records) and apply these to the target.

Each of the objects that I selected earlier has been stored in its respective table within the database. From here I can view the Table data for each of the objects that have been replicated from the data source.

Lastly, here’s a view of the data in Salesforce. As new entities are created, or existing entities are updated or changed in Salesforce, the data changes will synchronize to the target in AWS Glue automatically.

Now available
Amazon SageMaker Lakehouse and Amazon Redshift support for zero-ETL integrations from applications is now available in US East (N. Virginia), US East (Ohio), US West (Oregon), Asia Pacific (Hong Kong), Asia Pacific (Singapore), Asia Pacific (Sydney), Asia Pacific (Tokyo), Europe (Frankfurt), Europe (Ireland), and Europe (Stockholm) AWS Regions. For pricing information, visit the AWS Glue pricing page.

To learn more, visit our AWS Glue User Guide. Send feedback to AWS re:Post for AWS Glue or through your usual AWS Support contacts. Get started by creating a new zero-ETL integration today.

– Veliswa