Tag Archives: Apache Iceberg

How Ancestry optimizes a 100-billion-row Iceberg table

Post Syndicated from Thomas Cardenas original https://aws.amazon.com/blogs/big-data/how-ancestry-optimizes-a-100-billion-row-iceberg-table/

This is a guest post by Thomas Cardenas, Staff Software Engineer at Ancestry, in partnership with AWS.

Ancestry, the global leader in family history and consumer genomics, uses family trees, historical records, and DNA to help people on their journeys of personal discovery. Ancestry has the largest collection of family history records, consisting of 40 billion records. They serve more than 3 million subscribers and have over 23 million people in their growing DNA network. Their customers can use this data to discover their family story.

Ancestry is proud to connect users with their families past and present. They help people learn more about their own identity by learning about their ancestors. Users build a family tree through which we surface relevant records, historical documents, photos, and stories that might contain details about their ancestors. These artifacts are surfaced through Hints. The Hints dataset is one of the most interesting datasets at Ancestry. It’s used to alert users that potential new information is available. The dataset has multiple shards, and there are currently 100 billion rows being used by machine learning models and analysts. Not only is the dataset large, it also changes rapidly.

In this post, we share the best practices that Ancestry used to implement an Apache Iceberg-based hints table capable of handling 100 billion rows with 7 million hourly changes. The optimizations covered here resulted in cost reductions of 75%.

Overview of solution

Ancestry’s Enterprise Data Management (EDM) team faced a critical challenge—how to provide a unified, performant data ecosystem that could serve diverse analytical workloads across financial, marketing, and product analytics teams. The ecosystem needed to support everything from data scientists training recommendation models to geneticists developing population studies—all requiring access to the same Hints data.

The ecosystem around Hints data had been developed organically, without a well-defined architecture. Teams independently accessed Hints data through direct service calls, Kafka topic subscriptions, or warehouse queries, creating significant data duplication and unnecessary system load. To reduce cost and improve performance, EDM implemented a centralized Apache Iceberg data lake on Amazon Simple Storage Service (Amazon S3), with Amazon EMR providing the processing power. This architecture, shown in the following image, creates a single source of truth for the Hints dataset while using Iceberg’s ACID transactions, schema evolution, and partition evolution capabilities to handle scale and update frequency.

End-to-end AWS analytics architecture showcasing data movement from Fargate through MSK, EMR, to S3 data lake with Glue Catalog

Hints table management architecture

Managing datasets exceeding one billion rows presents unique challenges, and Ancestry faced this challenge with the trees collection of 20–100 billion rows across multiple tables. At this scale, dataset updates require careful execution to control costs and prevent memory issues. To solve these challenges, EDM chose Amazon EMR on Amazon EC2 running Spark to write Iceberg tables on Amazon S3 for storage. With large and steady Amazon EMR workloads, running the clusters on Amazon EC2, as opposed to Serverless, proved cost effective. EDM has scheduled an Apache Spark job to run every hour on their Amazon EMR on EC2. This job uses the merge operation to update the Iceberg table with recently changed rows. Performing updates like this on such a large dataset can easily lead to runaway costs and out-of-memory errors.

Key optimization techniques

The engineers needed to enable fast, row-level updates without impacting query performance or incurring substantial cost. To achieve this, Ancestry used a combination of partitioning strategies, table configurations, Iceberg procedures, and incremental updates. The following is covered in detail:

  • Partitioning
  • Sorting
  • Merge-on-read
  • Compaction
  • Snapshot management
  • Storage-partitioned joins

Partitioning strategy

Developing an effective partitioning strategy was crucial for the 100-billion-row Hints table. Iceberg supports various partition transforms including column value, temporal functions (year, month, day, hour), and numerical transforms (bucket, truncate). Following AWS best practices, Ancestry carefully analyzed query patterns to identify a partitioning approach that would support these queries while balancing these two competing considerations:

  • Too few partitions would force queries to scan excessive data, degrading performance and increasing costs.
  • Too many partitions would create small files and excessive metadata, causing management overhead and slower query planning. It’s generally best to avoid parquet files smaller than 100 MB.

Through query pattern analysis, Ancestry discovered that most analytical queries filtered on hint status (particularly pending status) and hint type. This insight led us to implement a two-level partitioning strategy-first on status and then on type, which dramatically reduced the amount of data scanned during typical queries.

Sorting

To further optimize query performance, Ancestry implemented strategic data organization within partitions using Iceberg’s sort orders. While Iceberg doesn’t maintain perfect ordering, even approximate sorting significantly improves data locality and compression ratios.

For the Hints table with 100 billion rows, Ancestry faced a unique challenge: the primary identifiers (PersonId and HintId) are high-cardinality numeric columns that would be prohibitively expensive to sort completely. The solution uses Iceberg’s truncate transform function to support sorting on just a portion of the number, effectively creating another partition by grouping a collection of IDs together. For example, we can specify truncate(100_000_000, hintId) to create groups of 100 million hint IDs, greatly improving the performance of queries that specify that column.

Merge on read

With 7 million changes to the Hints table occurring hourly, optimizing write performance became critical to the architecture. In addition to making sure queries performed well, Ancestry also needed to make sure our frequent updates would perform well in both time and cost. It was quickly discovered that the default copy-on-write (CoW) strategy, which copies an entire file when any part of it changes, was too slow and expensive for their use case. Ancestry was able to get the performance we needed by instead specifying the merge-on-read (MoR) update strategy, which maintains new information in diff files that are reconciled on read. The large updates that happen every hour led us to choose faster updates at the cost of slower reads.

File compaction

The frequent updates mean files are constantly needing to be re-written to maintain performance. Iceberg provides the rewrite_data_files procedure for compaction, but default configurations proved insufficient for our scale. Leaving the default configuration in place, the rewrite operation wrote to five partitions at a time and didn’t meet our performance objective. We found that increasing the concurrent writes improved performance. We used the following set of parameters, setting a relatively high max-concurrent-file-group-rewrites value of 100 to more efficiently deal with our thousands of partitions. The default of rewriting only one file at a time couldn’t keep up with the frequency of our updates.

CALL datalake.system.rewrite_data_files(
  table => ‘database.table’, 
  strategy => ‘binpack’, 
  options => map (
    'max-concurrent-file-group-rewrites','100',
    'partial-progress.enabled','true',
    'rewrite-all','true'
  )
)

Key optimizations in Ancestry’s approach include:

  • High concurrency: We increased max-concurrent-file-group-rewrites from the default 5 to 100, enabling parallel processing of our thousands of partitions. This increased compute costs but was necessary to help ensure that the jobs finished.
  • Resilience at scale: We enabled partial-progress to create compaction checkpoints, essential when operating at our scale where failures are particularly costly.
  • Comprehensive delta elimination: Setting rewrite-all to true helps ensure that both data files and delete files are compacted, preventing the accumulation of delete files. By default, the delete files created as part of this strategy aren’t re-written and would continue to accumulate, slowing queries.

We arrived at these optimizations through successive trials and evaluations. For example, with our very large dataset, we discovered that we could use a WHERE clause to limit re-writes to a single partition. Based on the partitions, we see varied execution times and resource utilization. For some partitions, we needed to reduce concurrency to avoid running into out of memory errors.

Snapshot management

Iceberg tables maintain snapshots to preserve the history of the table, allowing you to time travel through the changes. As these snapshots accrue, they add to storage costs and degrade performance. This is why maintaining an Iceberg table requires you to periodically call the expire_snapshots procedure. We found we needed to enable concurrency for snapshot management so that it would complete in a timely manner:

CALL datalake.system.expire_snapshots(
        table => '`database`.table', 
        retain_last => 1, 
        max_concurrent_deletes => 20)

Consider how to balance performance, cost, and the need to keep historical records depending on your use case. When you do so, note that there is a table-level setting for maximum snapshot age which can override the retain_last parameter and retain only the active snapshot.

Reducing shuffle with Storage-Partitioned Joins

We use Storage-Partitioned Joins (SPJ) in Iceberg tables to minimize expensive shuffles during data processing. SPJ is an advanced Iceberg feature (available in Spark 3.3 or later with Iceberg 1.2 or later) that uses the physical storage layout of tables to eliminate shuffle operations entirely. For our Hints update pipeline, this optimization was transformational.

SPJ is especially useful during MERGE INTO operations, where datasets have identical partitioning. Proper configuration helps ensure effective use of SPJ to optimize joins.

SPJ has a few requirements such as both tables must be Iceberg partitioned the same way and joined on the partition key. Then Iceberg will know that it doesn’t have to shuffle the data when the tables are loaded. This even works when there are a different number of partitions on either side.

Updates to the Hints database are first staged in the Hint Changes database where data is transformed from the original Kafka data format into how it will look in the target (Hints) table. This is a temporary Iceberg table where we are able to perform audits using Write-Audit-Publish (WAP) pattern. In addition to using the WAP pattern we are able to use the SPJ functionality.

Technical workflow showing AWS data processing pipeline with following sequence: Amazon MSK starting point Parallel paths to: Hint changes in S3 (Apache Iceberg) Hint backups in S3 (Apache Iceberg) Stage hourly updates via EMR Cluster Staging table in S3 (Apache Iceberg) EMR hourly table maintenance jobs Final hints table in S3 (Apache Iceberg)

The Hints data pipeline

Reducing full-table scans

Another strategy to reduce shuffle is minimizing the data involved in joins by dynamically pushing down filters. In production, these filters vary between batches, so a multi-step operation is often necessary for setting up merges. The following example code first limits its scope by setting minimum and maximum values for the ID, then performs an update or delete to the target table depending on whether a target value exists.

val stats: Dataset[Row] = session.read.table("catalog.database.source")
  .agg(
    min(col("id")).as("min_value"),
    max(col("id")).as("max_value")
)

val statRow: Row = stats.head
val minId: String = statRow.getInt(0)
val maxId: String = statRow.getInt(1)

session.sql(s"""
  MERGE INTO catalog.database.target t
    USING (SELECT * FROM catalog.database.source) s
  ON (t.id BETWEEN $minId AND $maxId)
    AND (t.id = s.id)
  WHEN MATCHED
    THEN UPDATE SET *
  WHEN NOT MATCHED
    THEN INSERT *
""")

This technique reduces cost in several ways: the bounded merge reduces the number of affected rows, it allows for predicate pushdown optimization, which filters at the storage layer, and it reduces shuffle operations when compared with a join.

Additional insights

Apart from the Hints table, we have implemented over 1,000 Iceberg tables in our data ecosystem. The following are some key insights that we observed:

  • Updating a table using MERGE is typically the most expensive action, so this is where we spent the most time optimizing. It was still our best option.
  • Using complex data types can help co-locate similar data in the table.
  • Monitor costs of each pipeline because while following good practice you can stumble across things you miss that are causing costs to increase.

Conclusion

Organizations can use Apache Iceberg tables on Amazon S3 with Amazon EMR to manage massive datasets with frequent updates. Many customers will be able to achieve excellent performance with a low maintenance burden by using the AWS Glue table optimizer for automatic, asynchronous compaction. Some customers, like Ancestry, will require custom optimizations of their maintenance procedures to meet their cost and performance goals. These customers should start with a careful assessment of query patterns to develop a partitioning strategy to minimize the amount of data that needs to be read and processed. Update frequency and latency requirements will dictate other choices, like whether merge-on-read or copy-on-write is the better strategy.

If your organization faces similar challenges with high volumes of data requiring frequent updates, you can use a combination of Apache Iceberg’s advanced features with AWS services like Amazon EMR Serverless, Amazon S3, and AWS Glue to build a truly modern data lake that delivers the scale, performance, and cost-efficiency you need.

Further reading


About the authors

Thomas Cardenas

Thomas Cardenas

Thomas is a Staff Software Engineer at Ancestry. He focuses on building data lake infrastructure and improving data quality for financial reporting and analytics. He loves building the technical foundations that help millions of people discover their family history.

Robert Fisher

Robert Fisher

Robert is an AWS Sr. Solutions Architect. He has over twenty years experience designing software solutions and leading software engineering teams. He is passionate about helping customers use technology to achieve their business objectives.

Harsh Vardan

Harsh Vardan

Harsh is an AWS Solutions Architect, specializing in big data and analytics. He has a decade of experience working in the field of data science. He is passionate about helping customers adopt best practices and discover insights from their data.

The Amazon SageMaker lakehouse architecture now automates optimization configuration of Apache Iceberg tables on Amazon S3

Post Syndicated from Tomohiro Tanaka original https://aws.amazon.com/blogs/big-data/the-amazon-sagemaker-lakehouse-architecture-now-automates-optimization-configuration-of-apache-iceberg-tables-on-amazon-s3/

As organizations increasingly adopt Apache Iceberg tables for their data lake architectures on Amazon Web Services (AWS), maintaining these tables becomes crucial for long-term success. Without proper maintenance, Iceberg tables can face several challenges: degraded query performance, unnecessary retention of old data that should be removed, and a decline in storage cost efficiency. These issues can significantly impact the effectiveness and economics of your data lake. Regular table maintenance operations help ensure your Iceberg tables remain high performing, compliant with data retention policies, and cost-effective for production workloads. To help you manage your Iceberg tables at scale, AWS Glue automated those Iceberg table maintenance operations: compaction with sort and z-order and snapshots expiration and orphan data management. After the launch of the feature, many customers have enabled automated table optimization through AWS Glue Data Catalog to reduce operational burden.

The Amazon SageMaker lakehouse architecture now automates optimization of Iceberg tables stored in Amazon S3 with catalog-level configuration, optimizing storage in your Iceberg tables and improving query performance. Previously, optimizing Iceberg tables in AWS Glue Data Catalog required updating configurations for each table individually. Now, you can enable automatic optimization for new Iceberg tables with one-time Data Catalog configuration. Once enabled, for any new table or updated table, Data Catalog continuously optimizes tables by compacting small files, removing snapshots, and unreferenced files that are no longer needed.

This post demonstrates an end-to-end flow to enable catalog level table optimization setting.

Prerequisites

The following prerequisites are required to use the new catalog-level table optimizations:

Enable table optimizations at the catalog level

The data lake administrator can enable the catalog-level table optimization on the AWS Lake Formation console. Complete the following steps:

  1. On the AWS Lake Formation console, choose Catalogs in the navigation pane.
  2. Select the catalog to be enabled with catalog-level table optimizations.
  3. Choose Table optimizations tab, and choose Edit in Table optimizations, as shown in the following screenshot.

setup-catalog-level-optimizations

  1. In Optimization options, select Compaction, Snapshot retention, and Orphan file deletion, as shown in the following screenshot.

enable-optimizations

  1. Select an IAM role. Refer to Table optimization prerequisites for permissions.
  2. Choose Grant required permissions.
  3. Choose I acknowledge that expired data will be deleted as part of the optimizers.

After you enable the table optimizations at the catalog level, the configuration is displayed on the AWS Lake Formation console, as shown in the following screenshot.

optimizations-configuration

When you select an Iceberg table registered in the catalog, you can confirm that the table optimizations configuration is inherited from the table view because Configuration source shows catalog, as shown in the following screenshot.

catalog-level-optimizations

The table optimizations history is displayed on the table view. The following result shows one of the compaction runs by the table optimizations.

binpack-compaction-result

The catalog-level table optimizations for all databases and Iceberg tables are now enabled.

Customize setting of table optimizations at both the catalog and table-level

Although the catalog-level optimization applies common settings across all databases and Iceberg tables in your catalog, you might want to apply different strategies for specific Iceberg tables. You can use AWS Glue Data Catalog to enable both catalog-level and table-level optimizations based on specific table characteristics and access patterns. For example, in addition to configuring the catalog-level compaction with the bin-pack strategy for general-purpose Iceberg tables, you can apply the sort strategy at the table-level to tables with frequent range queries on timestamp columns.

This section shows configuring catalog-level and table-specific optimizations through a practical scenario. Imagine a real-time analytics table with frequent write operations that generates more orphan files due to constant metadata updates. Users also run selective queries filtering specific columns, which makes sort-order strategy preferable. Complete the following steps:

  1. Select another Iceberg table in the same catalog as before to configure the table-level optimizations on the AWS Lake Formation console. At this point, the catalog-level table optimizations are configured for this table.
  2. Choose Edit in Optimization configuration, as shown in the following screenshot.

new-optimizations-configuration

  1. In Optimization options, choose Compaction, Snapshot retention, and Orphan file deletion.
  2. In Optimization configuration, choose Customize settings.
  3. Select the same IAM role.
  4. In Compaction configuration, select Sort, as shown in the following screenshot. Also configure 80 files to Minimum input files, which is a threshold of the number of files to trigger the compaction. To configure Sort, a sort order needs to be defined in your Iceberg table. You can define the sort order with Spark SQL such as ALTER TABLE db.tbl WRITE ORDERED BY <columns>.

sort-config

  1. In Snapshot retention configuration and Snapshot deletion run rate, select Specify a custom value in hours. Then, configure 12 hours to the interval between two deletion job runs, as shown in the following screenshot.

snapshot-retention

  1. In Orphan file deletion configuration, configure 1 day to Files under the provided Table Location with a creation time older than this number of days will be deleted if they are no longer referenced by the Apache Iceberg Table metadata.

orphan-deletion

  1. Choose Grant required permissions.
  2. Choose I acknowledge that expired data will be deleted as part of the optimizers.
  3. Choose Save.
  4. The Table optimization tab on the AWS Lake Formation console displays the custom setting of table optimizers. In Compaction, Compaction strategy is configured to sort and Minimum input files is also configured to 80 files. In Snapshot retention, Snapshot deletion run rate is configured to 12 hours. In Orphan file deletion, Orphan files will be deleted after is configured to 1 days, as shown in the following screenshot.

new-table-level-optimizations

The compaction history shows sort as its table-level compaction strategy even if the strategy in the catalog-level is configured to binpack, as shown in the following screenshot.

sort-compaction-result

In this scenario, the table-specific optimizations are configured along with the catalog-level optimizations. Combining the table and catalog-level optimizations means you can more flexibly manage your Iceberg table data deletions and compactions.

Conclusion

In this post, we demonstrated how to enable and manage using Amazon SageMaker lakehouse architecture with AWS Glue Data Catalog’s catalog-level table optimization feature for Iceberg tables. This enhancement significantly simplifies the management of Iceberg tables because you can enable automated maintenance operations across all tables with a single setting. Instead of configuring optimization settings for individual tables, you can now maintain your entire data lake more efficiently, reducing operational overhead while ensuring consistent optimization policies. We recommend enabling catalog-level table optimization to help you maintain a well-organized, high-performing, and cost-effective data lake while freeing up your teams to focus on deriving value from your data.

Try out this feature for your own use case and share your feedback and questions in the comments. To learn more about AWS Glue Data Catalog table optimizer, visit Optimizing Iceberg tables.

Acknowledgment: A special thanks to everyone who contributed to the development and launch of catalog level optimization: Siddharth Padmanabhan Ramanarayanan, Dhrithi Chidananda, Noella Jiang, Sangeet Lohariwala, Shyam Rathi, Anuj Jigneshkumar Vakil, and Jeremy Song.


About the authors

Tomohiro Tanaka is a Senior Cloud Support Engineer at Amazon Web Services (AWS). He’s passionate about helping customers use Apache Iceberg for their data lakes on AWS. In his free time, he enjoys a coffee break with his colleagues and making coffee at home.

Noritaka Sekiyama is a Principal Big Data Architect with AWS Analytics services. He’s responsible for building software artifacts to help customers. In his spare time, he enjoys cycling on his road bike.

Sandeep Adwankar is a Senior Product Manager at Amazon Web Services (AWS). Based in the California Bay Area, he works with customers around the globe to translate business and technical requirements into products customers can use to improve how they manage, secure, and access data.

Siddharth Padmanabhan Ramanarayanan is a Senior Software Engineer on the AWS Glue and AWS Lake Formation team, where he focuses on building scalable distributed systems for data analytics workloads. He is passionate about helping customers optimize their cloud infrastructure for performance and cost efficiency.

Configure cross-account access of Amazon SageMaker Lakehouse multi-catalog tables using AWS Glue 5.0 Spark

Post Syndicated from Aarthi Srinivasan original https://aws.amazon.com/blogs/big-data/configure-cross-account-access-of-amazon-sagemaker-lakehouse-multi-catalog-tables-using-aws-glue-5-0-spark/

Many organizations build and operate enterprise-wide data mesh architectures using the AWS Glue Data Catalog and AWS Lake Formation for their Amazon Simple Storage Service (Amazon S3) based data lakes. Now, with Amazon SageMaker Lakehouse, these organizations can unify their data analytics and AI/ML workflows while maintaining secure cross-account access without data replication. By centralizing access to a single copy of data and using the secure fine-grained permissions of Lake Formation, enterprises can accelerate their analytics initiatives while reducing operational complexity across business units.

SageMaker Lakehouse organizes data using logical containers called catalogs, enabling teams to seamlessly query and analyze data across their entire ecosystem—from S3 data lakes to Amazon Redshift warehouses—using familiar Apache Iceberg compatible tools. Organizations can either mount their existing data warehouse to the lakehouse or create new catalogs using Amazon Redshift managed storage. Built-in zero-ETL connectors reduce data silos by integrating various data sources, enabling unified analytics across teams. This seamless integration particularly benefits existing AWS customers who already use the Data Catalog and Lake Formation, because they can immediately take advantage of SageMaker Lakehouse capabilities.

AWS Glue is a serverless service that makes data integration simpler, faster, and cheaper. We launched AWS Glue 5.0 with upgraded Apache Spark 3.5.4 and Python 3.11. AWS Glue 5.0 adds support for SageMaker Lakehouse to unify your data across S3 data lakes and Redshift data warehouses.

In our previous blog post, we demonstrated the process of creating tables in both the Amazon Redshift managed catalog and Amazon Redshift federated catalog within a single AWS account. In this post, we show you how to share a Redshift table and Amazon S3 based Iceberg table from the account that owns the data to another account that consumes the data. In the recipient account, we run a join query on the shared data lake and data warehouse tables using Spark in AWS Glue 5.0. We walk you through the complete cross-account setup and provide the Spark configuration in a Python notebook.

Solution overview

To demonstrate the functionality of SageMaker Lakehouse multi-catalog tables using AWS Glue 5.0 Spark, let’s assume the retail company Example Retail Corp launches a campaign to understand their market and drive growth by country of operation. Their infrastructure consists of a Redshift data warehouse for structured data and an S3 data lake for structured and semi-structured data. The marketing team realizes that customer data is spread across those two systems and wants to use the support of their data engineering and analysts to analyze and provide insights. As a company, they prefer unified governance for managing data access while enabling a secure sharing mechanism for business and engineering teams.

Let’s see how they can achieve the goal using SageMaker Lakehouse. The solution is represented in the following diagram.

001-BDB 5089

The setup could be extended to enterprise data meshes where a data producer account will own the Redshift clusters, catalog the tables in a central governance account, and share with any number of consumer accounts from the central account. Multiple consumer accounts could analyze the shared Redshift tables using the SageMaker Lakehouse integrated analytics engines.

The solution also works for cross-Region table access. You would create a resource link for the catalog tables in an AWS Region where you want to run your analyses and create dashboards. For cross-Region resource link setup, refer to Setting up cross-Region table access.

Prerequisites

To implement this solution, you need the following prerequisites:

  • Two AWS accounts with Lake Formation cross-account sharing version 4 and Lake Formation administrator configured. Refer to the Lake Formation data administrator permissions and initial setup of Lake Formation.
  • Permissions from Prerequisites for managing Amazon Redshift namespaces in the AWS Glue Data Catalog granted to the Lake Formation administrator role on both accounts.
  • An S3 bucket in the producer account to host the sample Iceberg table data.
  • An AWS Identity and Access Management (IAM) role, LakeFormationS3Registration_custom, in the producer account to register your Iceberg table’s Amazon S3 location with Lake Formation. For details, refer to Registering an Amazon S3 location and Requirements for roles used to register locations.
  • An Amazon Redshift Serverless namespace in the producer account. Follow the instructions in Creating a data warehouse with Amazon Redshift Serverless to launch a serverless namespace with default settings.
  • Two sample datasets, orders and returns, in CSV format. This is Example Retail Corp’s data on their customer purchase and return trends. Their marketing team has collected these data in a Redshift table and Amazon S3 from various systems. The instructions to create these tables are provided in the appendix at the end of this post. After completing the steps in the appendix, you should have customerdb.returnstbl_iceberg in your default catalog and ordersdb.orderstbl in your Redshift Serverless application default namespace.
  • An IAM role, Glue-execution-role, in the consumer account, with the following policies:
    1. AWS managed policies AWSGlueServiceRole and AmazonRedshiftDataFullAccess.
    2. Create a new in-line policy with the following permissions and attach it:
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "LFandRSserverlessAccess",
                  "Effect": "Allow",
                  "Action": [
                      "lakeformation:GetDataAccess",
                      "redshift-serverless:GetCredentials"
                  ],
                  "Resource": "*"
              },
              {
                  "Effect": "Allow",
                  "Action": "iam:PassRole",
                  "Resource": "*",
                  "Condition": {
                      "StringEquals": {
                          "iam:PassedToService": "glue.amazonaws.com"
                      }
                  }
              }
          ]
      }

    3. Add the following trust policy to Glue-execution-role, allowing AWS Glue to assume this role:
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Principal": {
                      "Service": [
                          "glue.amazonaws.com"
                      ]
                  },
                  "Action": "sts:AssumeRole"
              }
          ]
      }

    Steps for producer account setup

    For the producer account setup, you can either use your IAM administrator role added as Lake Formation administrator or use a Lake Formation administrator role with permissions added as discussed in the prerequisites. For illustration purposes, we use the IAM admin role Admin added as Lake Formation administrator.

    002-BDB 5089

    Configure your catalog

    Complete the following steps to set up your catalog:

    1. Log in to AWS Management Console as Admin.
    2. On the Amazon Redshift console, follow the instructions in Registering Amazon Redshift clusters and namespaces to the AWS Glue Data Catalog.
    3. After the registration is initiated, you will see the invite from Amazon Redshift on the Lake Formation console.
    4. Select the pending catalog invitation and choose Approve and create catalog.

    003-BDB 5089

    1. On the Set catalog details page, configure your catalog:
      1. For Name, enter a name (for this post, redshiftserverless1-uswest2).
      2. Select Access this catalog from Apache Iceberg compatible engines.
      3. Choose the IAM role you created for the data transfer.
      4. Choose Next.

      004-BDB 5089

    2. On the Grant permissions – optional page, choose Add permissions.
      1. Grant the Admin user Super user permissions for Catalog permissions and Grantable permissions.
      2. Choose Add.

      005-BDB 5089

    3. Verify the granted permission on the next page and choose Next.
      006-BDB 5089
    4. Review the details on the Review and create page and choose Create catalog.
      007-BDB 5089

    Wait a few seconds for the catalog to show up.

    1. Choose Catalogs in the navigation pane and verify that the redshiftserverless1-uswest2 catalog is created.
      008-BDB 5089
    2. Explore the catalog detail page to verify the ordersdb.public database.
      009-BDB 5089
    3. On the database View dropdown menu, view the table and verify that the orderstbl table shows up.
      010-BDB 5089

    As the Admin role, you can also query the orderstbl in Amazon Athena and confirm the data is available.

    011-BDB 5089

    Grant permissions on the tables from the producer account to the consumer account

    In this step, we share the Amazon Redshift federated catalog database redshiftserverless1-uswest2:ordersdb.public and table orderstbl as well as the Amazon S3 based Iceberg table returnstbl_iceberg and its database customerdb from the default catalog to the consumer account. We can’t share the entire catalog to external accounts as a catalog-level permission; we just share the database and table.

    1. On the Lake Formation console, choose Data permissions in the navigation pane.
    2. Choose Grant.
      012-BDB 5089
    3. Under Principals, select External accounts.
    4. Provide the consumer account ID.
    5. Under LF-Tags or catalog resources, select Named Data Catalog resources.
    6. For Catalogs, choose the account ID that represents the default catalog.
    7. For Databases, choose customerdb.
      013-BDB 5089
    8. Under Database permissions, select Describe under Database permissions and Grantable permissions.
    9. Choose Grant.
      014-BDB 5089
    10. Repeat these steps and grant table-level Select and Describe permissions on returnstbl_iceberg.
    11. Repeat these steps again to grant database- and table-level permissions for the ordertbl table of the federated catalog database redshiftserverless1-uswest2/ordersdb.

    The following screenshots show the configuration for database-level permissions.

    015-BDB 5089

    016-BDB 5089

    The following screenshots show the configuration for table-level permissions.

    017-BDB 5089

    018-BDB 5089

    1. Choose Data permissions in the navigation pane and verify that the consumer account has been granted database- and table-level permissions for both orderstbl from the federated catalog and returnstbl_iceberg from the default catalog.
      019-BDB 5089

    Register the Amazon S3 location of the returnstbl_iceberg with Lake Formation.

    In this step, we register the Amazon S3 based Iceberg table returnstbl_iceberg data location with Lake Formation to be managed by Lake Formation permissions. Complete the following steps:

    1. On the Lake Formation console, choose Data lake locations in the navigation pane.
    2. Choose Register location.
      020-BDB 5089
    3. For Amazon S3 path, enter the path for your S3 bucket that you provided while creating the Iceberg table returnstbl_iceberg.
    4. For IAM role, provide the user-defined role LakeFormationS3Registration_custom that you created as a prerequisite.
    5. For Permission mode, select Lake Formation.
    6. Choose Register location.
      021-BDB 5089
    7. Choose Data lake locations in the navigation pane to verify the Amazon S3 registration.
      022-BDB 5089

    With this step, the producer account setup is complete.

    Steps for consumer account setup

    For the consumer account setup, we use the IAM admin role Admin, added as a Lake Formation administrator.

    The steps in the consumer account are quite involved. In the consumer account, a Lake Formation administrator will accept the AWS Resource Access Manager (AWS RAM) shares and create the required resource links that point to the shared catalog, database, and tables. The Lake Formation admin verifies that the shared resources are accessible by running test queries in Athena. The admin further grants permissions to the role Glue-execution-role on the resource links, database, and tables. The admin then runs a join query in AWS Glue 5.0 Spark using Glue-execution-role.

    Accept and verify the shared resources

    Lake Formation uses AWS RAM shares to enable cross-account sharing with Data Catalog resource policies in the AWS RAM policies. To view and verify the shared resources from producer account, complete the following steps:

    1. Log in to the consumer AWS console and set the AWS Region to match the producer’s shared resource Region. For this post, we use us-west-2.
    2. Open the Lake Formation console. You will see a message indicating there is a pending invite and asking you accept it on the AWS RAM console.
      023-BDB 5089
    3. Follow the instructions in Accepting a resource share invitation from AWS RAM to review and accept the pending invites.
    4. When the invite status changes to Accepted, choose Shared resources under Shared with me in the navigation pane.
    5. Verify that the Redshift Serverless federated catalog redshiftserverless1-uswest2, the default catalog database customerdb, the table returnstbl_iceberg, and the producer account ID under Owner ID column display correctly.
      024-BDB 5089
    6. On the Lake Formation console, under Data Catalog in the navigation pane, choose Databases.
    7. Search by the producer account ID.
      You should see the customerdb and public databases. You can further select each database and choose View tables on the Actions dropdown menu and verify the table names

    025-BDB 5089

    You will not see an AWS RAM share invite for the catalog level on the Lake Formation console, because catalog-level sharing isn’t possible. You can review the shared federated catalog and Amazon Redshift managed catalog names on the AWS RAM console, or using the AWS Command Line Interface (AWS CLI) or SDK.

    Create a catalog link container and resource links

    A catalog link container is a Data Catalog object that references a local or cross-account federated database-level catalog from other AWS accounts. For more details, refer to Accessing a shared federated catalog. Catalog link containers are essentially Lake Formation resource links at the catalog level that reference or point to a Redshift cluster federated catalog or Amazon Redshift managed catalog object from other accounts.

    In the following steps, we create a catalog link container that points to the producer shared federated catalog redshiftserverless1-uswest2. Inside the catalog link container, we create a database. Inside the database, we create a resource link for the table that points to the shared federated catalog table <<producer account id>>:redshiftserverless1-uswest2/ordersdb.public.orderstbl.

    1. On the Lake Formation console, under Data Catalog in the navigation pane, choose Catalogs.
    2. Choose Create catalog.

    026-BDB 5089

    1. Provide the following details for the catalog:
      1. For Name, enter a name for the catalog (for this post, rl_link_container_ordersdb).
      2. For Type, choose Catalog Link container.
      3. For Source, choose Redshift.
      4. For Target Redshift Catalog, enter the Amazon Resource Name (ARN) of the producer federated catalog (arn:aws:glue:us-west-2:<<producer account id>>:catalog/redshiftserverless1-uswest2/ordersdb).
      5. Under Access from engines, select Access this catalog from Apache Iceberg compatible engines.
      6. For IAM role, provide the Redshift-S3 data transfer role that you had created in the prerequisites.
      7. Choose Next.

    027-BDB 5089

    1. On the Grant permissions – optional page, choose Add permissions.
      1. Grant the Admin user Super user permissions for Catalog permissions and Grantable permissions.
      2. Choose Add and then choose Next.

    028-BDB 5089

    1. Review the details on the Review and create page and choose Create catalog.

    Wait a few seconds for the catalog to show up.

    029-BDB 5089

    1. In the navigation pane, choose Catalogs.
    2. Verify that rl_link_container_ordersdb is created.

    030-BDB 5089

    Create a database under rl_link_container_ordersdb

    Complete the following steps:

    1. On the Lake Formation console, under Data Catalog in the navigation pane, choose Databases.
    2. On the Choose catalog dropdown menu, choose rl_link_container_ordersdb.
    3. Choose Create database.

    Alternatively, you can choose the Create dropdown menu and then choose Database.

    1. Provide details for the database:
      1. For Name, enter a name (for this post, public_db).
      2. For Catalog, choose rl_link_container_ordersdb.
      3. Leave Location – optional as blank.
      4. Under Default permissions for newly created tables, deselect Use only IAM access control for new tables in this database.
      5. Choose Create database.

    031-BDB 5089

    1. Choose Catalogs in the navigation pane to verify that public_db is created under rl_link_container_ordersdb.

    032-BDB 5089

    Create a table resource link for the shared federated catalog table

    A resource link to a shared federated catalog table can reside only inside the database of a catalog link container. A resource link for such tables will not work if created inside the default catalog. For more details on resource links, refer to Creating a resource link to a shared Data Catalog table.

    Complete the following steps to create a table resource link:

    1. On the Lake Formation console, under Data Catalog in the navigation pane, choose Tables.
    2. On the Create dropdown menu, choose Resource link.

    033-BDB 5089

    1. Provide details for the table resource link:
      1. For Resource link name, enter a name (for this post, rl_orderstbl).
      2. For Destination catalog, choose rl_link_container_ordersdb.
      3. For Database, choose public_db.
      4. For Shared table’s region, choose US West (Oregon).
      5. For Shared table, choose orderstbl.
      6. After the Shared table is selected, Shared table’s database and Shared table’s catalog ID should get automatically populated.
      7. Choose Create.

    034-BDB 5089

    1. In the navigation pane, choose Databases to verify that rl_orderstbl is created under public_db, inside rl_link_container_ordersdb.

    035-BDB 5089

    036-BDB 5089

    Create a database resource link for the shared default catalog database.

    Now we create a database resource link in the default catalog to query the Amazon S3 based Iceberg table shared from the producer. For details on database resource links, refer Creating a resource link to a shared Data Catalog database.

    Though we are able to see the shared database in the default catalog of the consumer, a resource link is required to query from analytics engines, such as Athena, Amazon EMR, and AWS Glue. When using AWS Glue with Lake Formation tables, the resource link needs to be named identically to the source account’s resource. For additional details on using AWS Glue with Lake Formation, refer to Considerations and limitations.

    Complete the following steps to create a database resource link:

    1. On the Lake Formation console, under Data Catalog in the navigation pane, choose Databases.
    2. On the Choose catalog dropdown menu, choose the account ID to choose the default catalog.
    3. Search for customerdb.

    You should see the shared database name customerdb with the Owner account ID as that of your producer account ID.

    1. Select customerdb, and on the Create dropdown menu, choose Resource link.
    2. Provide details for the resource link:
      1. For Resource link name, enter a name (for this post, customerdb).
      2. The rest of the fields should be already populated.
      3. Choose Create.
    3. In the navigation pane, choose Databases and verify that customerdb is created under the default catalog. Resource link names will show in italicized font.

    037-BDB 5089

    Verify access as Admin using Athena

    Now you can verify your access using Athena. Complete the following steps:

    1. Open the Athena console.
    2. Make sure an S3 bucket is provided to store the Athena query results. For details, refer to Specify a query result location using the Athena console.
    3. In the navigation pane, verify both the default catalog and federated catalog tables by previewing them.
    4. You can also run a join query as follows. Pay attention to the three-point notation for referring to the tables from two different catalogs:
    SELECT
    returns_tb.market as Market,
    sum(orders_tb.quantity) as Total_Quantity
    FROM rl_link_container_ordersdb.public_db.rl_orderstbl as orders_tb
    JOIN awsdatacatalog.customerdb.returnstbl_iceberg as returns_tb
    ON orders_tb.order_id = returns_tb.order_id
    GROUP BY returns_tb.market;

    038-BDB 5089

    This verifies the new capability of SageMaker Lakehouse, which enables accessing Redshift cluster tables and Amazon S3 based Iceberg tables in the same query, across AWS accounts, through the Data Catalog, using Lake Formation permissions.

    Grant permissions to Glue-execution-role

    Now we will share the resources from the producer account with additional IAM principals in the consumer account. Usually, the data lake admin grants permissions to data analysts, data scientists, and data engineers in the consumer account to do their job functions, such as processing and analyzing the data.

    We set up Lake Formation permissions on the catalog link container, databases, tables, and resource links to the AWS Glue job execution role Glue-execution-role that we created in the prerequisites.

    Resource links allow only Describe and Drop permissions. You need to use the Grant on target configuration to provide database Describe and table Select permissions.

    Complete the following steps:

    1. On the Lake Formation console, choose Data permissions in the navigation pane.
    2. Choose Grant.
    3. Under Principals, select IAM users and roles.
    4. For IAM users and roles, enter Glue-execution-role.
    5. Under LF-Tags or catalog resources, select Named Data Catalog resources.
    6. For Catalogs, choose rl_link_container_ordersdb and the consumer account ID, which indicates the default catalog.
    7. Under Catalog permissions, select Describe for Catalog permissions.
    8. Choose Grant.

    039-BDB 5089

    040-BDB 5089

    1. Repeat these steps for the catalog rl_link_container_ordersdb:
      1. On the Databases dropdown menu, choose public_db.
      2. Under Database permissions, select Describe.
      3. Choose Grant.
    2. Repeat these steps again, but after choosing rl_link_container_ordersdb and public_db, on the Tables dropdown menu, choose rl_orderstbl.
      1. Under Resource link permissions, select Describe.
      2. Choose Grant.
    3. Repeat these steps to grant additional permissions to Glue-execution-role.
      1. For this iteration, grant Describe permissions on the default catalog databases public and customerdb.
      2. Grant Describe permission on the resource link customerdb.
      3. Grant Select permission on the tables returnstbl_iceberg and orderstbl.

    The following screenshots show the configuration for database public and customerdb permissions.

    041-BDB 5089

    042-BDB 5089

    The following screenshots show the configuration for resource link customerdb permissions.

    043-BDB 5089

    044-BDB 5089

    The following screenshots show the configuration for table returnstbl_iceberg permissions.

    045-BDB 5089

    046-BDB 5089

    The following screenshots show the configuration for table orderstbl permissions.

    047-BDB 5089

    048-BDB 5089

    1. In the navigation pane, choose Data permissions and verify permissions on Glue-execution-role.

    049-BDB 5089

    Run a PySpark job in AWS Glue 5.0

    Download the PySpark script LakeHouseGlueSparkJob.py. This AWS Glue PySpark script runs Spark SQL by joining the producer shared federated orderstbl table and Amazon S3 based returns table in the consumer account to analyze the data and identify the total orders placed per market.

    Replace <<consumer_account_id>> in the script with your consumer account ID. Complete the following steps to create and run an AWS Glue job:

    1. On the AWS Glue console, in the navigation pane, choose ETL jobs.
    2. Choose Create job, then choose Script editor.

    050-BDB 5089

    1. For Engine, choose Spark.
    2. For Options, choose Start fresh.
    3. Choose Upload script.
    4. Browse to the location where you downloaded and edited the script, select the script, and choose Open.
    5. On the Job details tab, provide the following information:
      1. For Name, enter a name (for this post, LakeHouseGlueSparkJob).
      2. Under Basic properties, for IAM role, choose Glue-execution-role.
      3. For Glue version, select Glue 5.0.
      4. Under Advanced properties, for Job parameters, choose Add new parameter.
      5. Add the parameters --datalake-formats = iceberg and --enable-lakeformation-fine-grained-access = true.
    6. Save the job.
    7. Choose Run to execute the AWS Glue job, and wait for the job to complete.
    8. Review the job run details from the Output logs

    051-BDB 5089

    052-BDB 5089

    Clean up

    To avoid incurring costs on your AWS accounts, clean up the resources you created:

    1. Delete the Lake Formation permissions, catalog link container, database, and tables in the consumer account.
    2. Delete the AWS Glue job in the consumer account.
    3. Delete the federated catalog, database, and table resources in the producer account.
    4. Delete the Redshift Serverless namespace in the producer account.
    5. Delete the S3 buckets you created as part of data transfer in both accounts and the Athena query results bucket in the consumer account.
    6. Clean up the IAM roles you created for the SageMaker Lakehouse setup as part of the prerequisites.

    Conclusion

    In this post, we illustrated how to bring your existing Redshift tables to SageMaker Lakehouse and share them securely with external AWS accounts. We also showed how to query the shared data warehouse and data lakehouse tables in the same Spark session, from a recipient account, using Spark in AWS Glue 5.0.

    We hope you find this useful to integrate your Redshift tables with an existing data mesh and access the tables using AWS Glue Spark. Test this solution in your accounts and share feedback in the comments section. Stay tuned for more updates and feel free to explore the features of SageMaker Lakehouse and AWS Glue versions.

    Appendix: Table creation

    Complete the following steps to create a returns table in the Amazon S3 based default catalog and an orders table in Amazon Redshift:

    1. Download the CSV format datasets orders and returns.
    2. Upload them to your S3 bucket under the corresponding table prefix path.
    3. Use the following SQL statements in Athena. First-time users of Athena should refer to Specify a query result location.
    CREATE DATABASE customerdb;
    CREATE EXTERNAL TABLE customerdb.returnstbl_csv(
      `returned` string, 
      `order_id` string, 
      `market` string)
    ROW FORMAT DELIMITED 
      FIELDS TERMINATED BY '\;' 
    LOCATION
      's3://<your-S3-bucket>/<prefix-for-returns-table-data>/'
    TBLPROPERTIES (
      'skip.header.line.count'='1'
    );
    
    select * from customerdb.returnstbl_csv limit 10; 
    

    053-BDB 5089

    1. Create an Iceberg format table in the default catalog and insert data from the CSV format table:
    CREATE TABLE customerdb.returnstbl_iceberg(
      `returned` string, 
      `order_id` string, 
      `market` string)
    LOCATION 's3://<your-producer-account-bucket>/returnstbl_iceberg/' 
    TBLPROPERTIES (
      'table_type'='ICEBERG'
    );
    
    INSERT INTO customerdb.returnstbl_iceberg
    SELECT *
    FROM returnstbl_csv;  
    
    SELECT * FROM customerdb.returnstbl_iceberg LIMIT 10; 
    

    054-BDB 5089

    1. To create the orders table in the Redshift Serverless namespace, open the Query Editor v2 on the Amazon Redshift console.
    2. Connect to the default namespace using your database admin user credentials.
    3. Run the following commands in the SQL editor to create the database ordersdb and table orderstbl in it. Copy the data from your S3 location of the orders data to the orderstbl:
    create database ordersdb;
    use ordersdb;
    
    create table orderstbl(
      row_id int, 
      order_id VARCHAR, 
      order_date VARCHAR, 
      ship_date VARCHAR, 
      ship_mode VARCHAR, 
      customer_id VARCHAR, 
      customer_name VARCHAR, 
      segment VARCHAR, 
      city VARCHAR, 
      state VARCHAR, 
      country VARCHAR, 
      postal_code int, 
      market VARCHAR, 
      region VARCHAR, 
      product_id VARCHAR, 
      category VARCHAR, 
      sub_category VARCHAR, 
      product_name VARCHAR, 
      sales VARCHAR, 
      quantity bigint, 
      discount VARCHAR, 
      profit VARCHAR, 
      shipping_cost VARCHAR, 
      order_priority VARCHAR
      );
    
    copy orderstbl
    from 's3://<your-s3-bucket>/ordersdatacsv/orders.csv' 
    iam_role 'arn:aws:iam::<producer-account-id>:role/service-role/<your-Redshift-Role>'
    CSV 
    DELIMITER ';'
    IGNOREHEADER 1
    ;
    
    select * from ordersdb.orderstbl limit 5;
    


    About the Authors

    055-BDB 5089Aarthi Srinivasan is a Senior Big Data Architect with Amazon SageMaker Lakehouse. She collaborates with the service team to enhance product features, works with AWS customers and partners to architect lakehouse solutions, and establishes best practices for data governance.

    056-BDB 5089Subhasis Sarkar is a Senior Data Engineer with Amazon. Subhasis thrives on solving complex technological challenges with innovative solutions. He specializes in AWS data architectures, particularly data mesh implementations using AWS CDK components.

Build a high-performance quant research platform with Apache Iceberg

Post Syndicated from Guy Bachar original https://aws.amazon.com/blogs/big-data/build-a-high-performance-quant-research-platform-with-apache-iceberg/

In our previous post Backtesting index rebalancing arbitrage with Amazon EMR and Apache Iceberg, we showed how to use Apache Iceberg in the context of strategy backtesting. In this post, we focus on data management implementation options such as accessing data directly in Amazon Simple Storage Service (Amazon S3), using popular data formats like Parquet, or using open table formats like Iceberg. Our experiments are based on real-world historical full order book data, provided by our partner CryptoStruct, and compare the trade-offs between these choices, focusing on performance, cost, and quant developer productivity.

Data management is the foundation of quantitative research. Quant researchers spend approximately 80% of their time on necessary but not impactful data management tasks such as data ingestion, validation, correction, and reformatting. Traditional data management choices include relational, SQL, NoSQL, and specialized time series databases. In recent years, advances in parallel computing in the cloud have made object stores like Amazon S3 and columnar file formats like Parquet a preferred choice.

This post explores how Iceberg can enhance quant research platforms by improving query performance, reducing costs, and increasing productivity, ultimately enabling faster and more efficient strategy development in quantitative finance. Our analysis shows that Iceberg can accelerate query performance by up to 52%, reduce operational costs, and significantly improve data management at scale.

Having chosen Amazon S3 as our storage layer, a key decision is whether to access Parquet files directly or use an open table format like Iceberg. Iceberg offers distinct advantages through its metadata layer over Parquet, such as improved data management, performance optimization, and integration with various query engines.

In this post, we use the term vanilla Parquet to refer to Parquet files stored directly in Amazon S3 and accessed through standard query engines like Apache Spark, without the additional features provided by table formats such as Iceberg.

Quant developer and researcher productivity

In this section, we focus on the productivity features offered by Iceberg and how it compares to directly reading files in Amazon S3. As mentioned earlier, 80% of quantitative research work is attributed to data management tasks. Business impact heavily relies on quality data (“garbage in, garbage out”). Quants and platform teams have to ingest data from multiple sources with different velocities and update frequencies, and then validate and correct the data. These activities translate into the ability to run append, insert, update, and delete operations. For simple append operations, both Parquet on Amazon S3 and Iceberg offer similar convenience and productivity. However, real-world data is never perfect and needs to be corrected. Gaps filling (inserts), error corrections and restatements (updates), and removing duplicates (deletes) are the most obvious examples. When writing data in the Parquet format directly to Amazon S3 without using an open table format like Iceberg, you have to write code to identify the affected partition, correct errors, and rewrite the partition. Moreover, if the write job fails or a downstream read job occurs during this write operation, all downstream jobs have the possibility of reading inconsistent data. However, Iceberg has built-in insert, update, and delete features with ACID (Atomicity, Consistency, Isolation, Durability) properties, and the framework itself manages the Amazon S3 mechanics on your behalf.

Guarding against lookahead bias is an essential capability of any quant research platform—what backtests as a profitable trading strategy can render itself useless and unprofitable in real time. Iceberg provides time travel and snapshotting capabilities out of the box to manage lookahead bias that could be embedded in the data (such as delayed data delivery).

Simplified data corrections and updates

Iceberg enhances data management for quants in capital markets through its robust insert, delete, and update capabilities. These features allow efficient data corrections, gap-filling in time series, and historical data updates without disrupting ongoing analyses or compromising data integrity.

Unlike direct Amazon S3 access, Iceberg supports these operations on petabyte-scale data lakes without requiring complex custom code. This simplifies data modification processes, which is crucial for ingesting and updating large volumes of market and trade data, quickly iterating on backtesting and reprocessing workflows, and maintaining detailed audit trails for risk and compliance requirements.

Iceberg’s table format separates data files from metadata files, enabling efficient data modifications without full dataset rewrites. This approach also reduces expensive ListObjects API calls typically needed when directly accessing Parquet files in Amazon S3.

Additionally, Iceberg offers merge on read (MoR) and copy on write (CoW) approaches, providing flexibility for different quant research needs. MoR enables faster writes, suitable for frequently updated datasets, and CoW provides faster reads, beneficial for read-heavy workflows like backtesting.

For example, when a new data source or attribute is added, quant researchers can seamlessly incorporate it into their Iceberg tables and then reprocess historical data, confident they’re using correct, time-appropriate information. This capability is particularly valuable in maintaining the integrity of backtests and the reliability of trading strategies.

In scenarios involving large-scale data corrections or updates, such as adjusting for stock splits or dividend payments across historical data, Iceberg’s efficient update mechanisms significantly reduce processing time and resource usage compared to traditional methods.

These features collectively improve productivity and data management efficiency in quant research environments, allowing researchers to focus more on strategy development and less on data handling complexities.

Historical data access for backtesting and validation

Iceberg’s time travel feature can enable quant developers and researchers to access and analyze historical snapshots of their data. This capability can be useful while performing tasks like backtesting, model validation, and understanding data lineage.

Iceberg simplifies time travel workflows on Amazon S3 by introducing a metadata layer that tracks the history of changes made to the table. You can refer to this metadata layer to create a mental model of how Iceberg’s time travel capability works.

Iceberg’s time travel capability is driven by a concept called snapshots, which are recorded in metadata files. These metadata files act as a central repository that stores table metadata, including the history of snapshots. Additionally, Iceberg uses manifest files to provide a representation of data files, their partitions, and any associated deleted files. These manifest files are referenced in the metadata snapshots, allowing Iceberg to identify the relevant data for a specific point in time.

When a user requests a time travel query, the typical workflow involves querying a specific snapshot. Iceberg uses the snapshot identifier to locate the corresponding metadata snapshot in the metadata files. The time travel capability is invaluable to quants, enabling them to backtest and validate strategies against historical data, reproduce and debug issues, perform what-if analysis, comply with regulations by maintaining audit trails and reproducing past states, and roll back and recover from data corruption or errors. Quants can also gain deeper insights into current market trends and correlate them with historical patterns. Also, the time travel feature can further mitigate any risks of lookahead bias. Researchers can access the exact data snapshots that were present in the past, and then run their models and strategies against this historical data, without the risk of inadvertently incorporating future information.

Seamless integration with familiar tools

Iceberg provides a variety of interfaces that enable seamless integration with the open source tools and AWS services that quant developers and researchers are familiar with.

Iceberg provides a comprehensive SQL interface that allows quant teams to interact with their data using familiar SQL syntax. This SQL interface is compatible with popular query engines and data processing frameworks, such as Spark, Trino, Amazon Athena, and Hive. Quant developers and researchers can use their existing SQL knowledge and tools to query, filter, aggregate, and analyze their data stored in Iceberg tables.

In addition to the primary interface of SQL, Iceberg also provides the DataFrame API, which allows quant teams to programmatically interact with their data with popular distributed data processing frameworks like Spark and Flink as well as thin clients like PyIceberg. Quants can further use this API to build more programmatic approaches to access and manipulate data, allowing for the implementation of custom logic and integration of Iceberg with other AWS ecosystems like Amazon EMR.

Although accessing data from Amazon S3 is a viable option, Iceberg provides several advantages like metadata management, performance optimization using partition pruning, data manipulation, and a rich AWS ecosystem integration including services like Athena and Amazon EMR with more seamless and feature-rich data processing experience.

Undifferentiated heavy lifting

Data partitioning is one of major contributing factors to optimizing aggregate throughput to and from Amazon S3, contributing to overall High Performance Computing (HPC) environment price-performance.

Quant researchers often face performance bottlenecks and complex data management challenges when dealing with large-scale datasets in Amazon S3. As discussed in Best practices design patterns: optimizing Amazon S3 performance, single prefix performance is limited to 3,500 PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per partitioned Amazon S3 prefix. Iceberg’s metadata layer and intelligent partitioning strategies automatically optimize data access patterns, reducing the likelihood of I/O throttling and minimizing the need for manual performance tuning. This automation allows quant teams to focus on developing and refining trading strategies rather than troubleshooting data access issues or optimizing storage layouts.

In this section, we discuss situations we discovered while running our experiments at scale and solutions provided by Iceberg vs. vanilla Parquet when accessing data in Amazon S3.

As we mentioned in the introduction, the nature of quant research is “fail fast”—new ideas have to be quickly evaluated and then either prioritized for a deep dive or dismissed. This makes it impossible to come up with universal partitioning that works all the time and for all research styles.

When accessing data directly as Parquet files in Amazon S3, without using an open table format like Iceberg, partitioning and throttling issues can arise. Partitioning in this case is determined by the physical layout of files in Amazon S3, and a mismatch between the intended partitioning and the actual file layout can lead to I/O throttling exceptions. Additionally, listing directories in Amazon S3 can also result in throttling exceptions due to the high number of API calls required.

In contrast, Iceberg provides a metadata layer that abstracts away the physical file layout in Amazon S3. Partitioning is defined at the table level, and Iceberg handles the mapping between logical partitions and the underlying file structure. This abstraction helps mitigate partitioning issues and reduces the likelihood of I/O throttling exceptions. Furthermore, Iceberg’s metadata caching mechanism minimizes the number of List API calls required, addressing the directory listing throttling issue.

Although both approaches involve direct access to Amazon S3, Iceberg is an open table format that introduces a metadata layer, providing better partitioning management and reducing the risk of throttling exceptions. It doesn’t act as a database itself, but rather as a data format and processing engine on top of the underlying storage (in this case, Amazon S3).

One of the most effective techniques to address Amazon S3 API quota limits is salting (random hash prefixes)—a method that adds random partition IDs to Amazon S3 paths. This increases the probability of prefixes residing on different physical partitions, helping distribute API requests more evenly. Iceberg supports this functionality out of the box for both data ingestion and reading.

Implementing salting directly in Amazon S3 requires complex custom code to create and use partitioning schemes with random keys in the naming hierarchy. This approach necessitates a custom data catalog and metadata system to map physical paths to logical paths, allowing direct partition access without relying on Amazon S3 List API calls. Without such a system, applications risk exceeding Amazon S3 API quotas when accessing specific partitions.

At petabyte scale, Iceberg’s advantages become clear. It efficiently manages data through the following features:

  • Directory caching
  • Configurable partitioning strategies (range, bucket)
  • Data management functionality (compaction)
  • Catalog, metadata, and statistics use for optimal execution plans

These built-in features eliminate the need for custom solutions to manage Amazon S3 API quotas and data organization at scale, reducing development time and maintenance costs while improving query performance and reliability.

Performance

We highlighted a lot of the functionality of Iceberg that eliminates undifferentiated heavy lifting and improves developer and quant productivity. What about performance?

This section evaluates whether Iceberg’s metadata layer introduces overhead or delivers optimization for quantitative research use cases, comparing it with vanilla Parquet access on Amazon S3. We examine how these approaches impact common quant research queries and workflows.

The key question is whether Iceberg’s metadata layer, designed to optimize vanilla Parquet access on Amazon S3, introduces overhead or delivers the intended optimization for quantitative research use cases. Then we discuss overlapping optimization techniques, such as data distribution and sorting. We also discuss that there is no magic partitioning and all sorting scheme where one size fits all in the context of quant research. Our benchmarks show that Iceberg performs comparably to direct Amazon S3 access, with additional optimizations from its metadata and statistics usage, similar to database indexing.

Vanilla Parquet vs Iceberg: Amazon S3 read performance

We created four different datasets: two using Iceberg and two with direct Amazon S3 Parquet access, each with both sorted and unsorted write distributions. The purpose of this exercise was to compare the performance of direct Amazon S3 Parquet access vs. the Iceberg open table format, taking into account the impact of write distribution patterns when running various queries commonly used in quantitative trading research.

Query 1

We first run a simple count query to get the total number of records in the table. This query helps understand the baseline performance for a straightforward operation. For example, if the table contains tick-level market data for various financial instruments, the count can give an idea of the total number of data points available for analysis.

The following is the code for vanilla Parquet:

count = spark.read.parquet(s3://example-s3-bucket/path/to/data).count()

The following is the code for Iceberg:

count = spark.read.table(table_name).count()
# We used typical count query for the performance comparision however this could have been also done using metadata as shown below which completes in few seconds 
spark.read.format("iceberg").load(f"{table_name}.files").select(sum("record_count")).show(truncate=False)

Query 2

Our second query is a grouping and counting query to find the number of records for each combination of exchange_code and instrument. This query is commonly used in quantitative trading research to analyze market liquidity and trading activity across different instruments and exchanges.

The following is the code for vanilla Parquet:

spark.read.parquet(s3://example-s3-bucket/path/to/data) \
         .groupBy("exchange_code", "instrument") \
         .count() \
         .orderBy("count", ascending=False) \
         .count().show(truncate=False)

The following is the code for Iceberg:

spark.read.table(table_name) \
        .groupBy("exchange_code", "instrument") \
        .count() \
        .orderBy("count", ascending=False) \
        .show(truncate=False)

Query 3

Next, we run a distinct query to retrieve the distinct combinations of year, month, and day from the adapterTimestamp_ts_utc column. In quantitative trading research, this query can be helpful for understanding the time range covered by the dataset. Researchers can use this information to identify periods of interest for their analysis, such as specific market events, economic cycles, or seasonal patterns.

 The following is the code for vanilla Parquet:

spark.read.parquet(s3://example-s3-bucket/path/to/data) \
         .select(f.year("adapterTimestamp_ts_utc").alias("year"),
                 f.month("adapterTimestamp_ts_utc").alias("month"),
                 f.dayofmonth("adapterTimestamp_ts_utc").alias("day")) \
         .distinct() \
         .count() \
         .show(truncate=False)

The following is the code for Iceberg:

spark.read.table(table_name) \
        .select(f.year("adapterTimestamp_ts_utc").alias("year"),
                f.month("adapterTimestamp_ts_utc").alias("month"),
                f.dayofmonth("adapterTimestamp_ts_utc").alias("day")) \
        .distinct() \
        .count() \
        .show(truncate=False)

Query 4

Lastly, we run a grouping and counting query with a date range filter on the adapterTimestamp_ts_utc column. This query is similar to Query 2 but focuses on a specific time period. You could use this query to analyze market activity or liquidity during specific time periods, such as periods of high volatility, market crashes, or economic events. Researchers can use this information to identify potential trading opportunities or investigate the impact of these events on market dynamics.

 The following is the code for vanilla Parquet:

spark.read.parquet(s3://example-s3-bucket/path/to/data) \
         .filter((f.col("adapterTimestamp_ts_utc") >= "2023-04-17 00:00:00") &
                 (f.col("adapterTimestamp_ts_utc") <= "2023-04-18 23:59:59.999")) \
         .groupBy("exchange_code", "instrument") \
         .count() \
         .orderBy("count", ascending=False) \
         .show(truncate=False)

The following is the code for Iceberg. Because Iceberg has a metadata layer, the row count can be fetched from metadata:

spark.read.table(table_name) \
        .filter((f.col("adapterTimestamp_ts_utc") >= "2023-04-17 00:00:00") &
                (f.col("adapterTimestamp_ts_utc") <= "2023-04-18 23:59:59.999")) \
        .groupBy("exchange_code", "instrument") \
        .count() \
        .orderBy("count", ascending=False) \
        .show(truncate=False)

Test results

To evaluate the performance and cost benefits of using Iceberg for our quant research data lake, we created four different datasets: two with Iceberg tables and two with direct Amazon S3 Parquet access, each using both sorted and unsorted write distributions. We first ran AWS Glue write jobs to create the Iceberg tables and then mirrored the same write processes for the Amazon S3 Parquet datasets. For the unsorted datasets, we partitioned the data by exchange and instrument, and for the sorted datasets, we added a sort key on the time column.

Next, we ran a series of queries commonly used in quantitative trading research, including simple count queries, grouping and counting, distinct value queries, and queries with date range filters. Our benchmarking process involved reading data from Amazon S3, performing various transformations and joins, and writing the processed data back to Amazon S3 as Parquet files.

By comparing runtimes and costs across different data formats and write distributions, we quantified the benefits of Iceberg’s optimized data organization, metadata management, and efficient Amazon S3 data handling. The results showed that Iceberg not only enhanced query performance without introducing significant overhead, but also reduced the likelihood of task failures, reruns, and throttling issues, leading to more stable and predictable job execution, particularly with large datasets stored in Amazon S3.

AWS Glue write jobs

In the following table, we compare the performance and the cost implications of using Iceberg vs. vanilla Parquet access on Amazon S3, taking into account the following use cases:

  • Iceberg table (unsorted) – We created an Iceberg table partitioned by exchange_code and instrument This means that the data was physically partitioned in Amazon S3 based on the unique combinations of exchange_code and instrument values. Partitioning the data in this way can improve query performance, because Iceberg can prune out partitions that aren’t relevant to a particular query, reducing the amount of data that needs to be scanned. The data was not sorted on any column in this case, which is the default behavior.
  • Vanilla Parquet (unsorted) – For this use case, we wrote the data directly as Parquet files to Amazon S3, without using Iceberg. We repartitioned the data by exchange_code and instrument columns using standard hash partitioning before writing it out. Repartitioning was necessary to avoid potential throttling issues when reading the data later, because accessing data directly from Amazon S3 without intelligent partitioning can lead to too many requests hitting the same S3 prefix. Like the Iceberg table, the data was not sorted on any column in this case. To make comparison fair, we used the exact repartition count that Iceberg uses.
  • Iceberg table (sorted) – We created another Iceberg table, this time partitioned by exchange_code and instrument Additionally, we sorted the data in this table on the adapterTimestamp_ts_utc column. Sorting the data can improve query performance for certain types of queries, such as those that involve range filters or ordered outputs. Iceberg automatically handles the sorting and partitioning of the data transparently to the user.
  • Vanilla Parquet (sorted) – For this use case, we again wrote the data directly as Parquet files to Amazon S3, without using Iceberg. We repartitioned the data by range on the exchange_code, instrument, and adapterTimestamp_ts_utc columns before writing it out using standard range partitioning with 1996 partition count, because this was what Iceberg was using based on SparkUI. Repartitioning on the time column (adapterTimestamp_ts_utc) was necessary to achieve a sorted write distribution, because Parquet files are sorted within each partition. This sorted write distribution can improve query performance for certain types of queries, similar to the sorted Iceberg table.
Write Distribution Pattern Iceberg Table (Unsorted) Vanilla Parquet (Unsorted) Iceberg Table (Sorted) Vanilla Parquet
(Sorted)
DPU Hours 899.46639 915.70222 1402 1365
Number of S3 Objects 7444 7288 9283 9283
Size of S3 Parquet Objects 567.7 GB 629.8 GB 525.6 GB 627.1 GB
Runtime 1h 51m 40s 1h 53m 29s 2h 52m 7s 2h 47m 36s

AWS Glue read jobs

For the AWS Glue read jobs, we ran a series of queries commonly used in quantitative trading research, such as simple counts, grouping and counting, distinct value queries, and queries with date range filters. We compared the performance of these queries between the Iceberg tables and the vanilla Parquet files read in Amazon S3. In the following table, you can see two AWS Glue jobs that show the performance and cost implications of access patterns described earlier.

Read Queries / Runtime in Seconds Iceberg Table Vanilla Parquet
COUNT(1) on unsorted 35.76s 74.62s
GROUP BY and ORDER BY on unsorted 34.29s 67.99s
DISTINCT and SELECT on unsorted 51.40s 82.95s
FILTER and GROUP BY and ORDER BY on unsorted 25.84s 49.05s
COUNT(1) on sorted 15.29s 24.25s
GROUP BY and ORDER BY on sorted 15.88s 28.73s
DISTINCT and SELECT on sorted 30.85s 42.06s
FILTER and GROUP BY and ORDER BY on sorted 15.51s 31.51s
AWS Glue DPU hours 45.98 67.97

Test results insights

These test results offered the following insights:

  • Accelerated query performance – Iceberg improved read operations by up to 52% for unsorted data and 51% for sorted data. This speed boost enables quant researchers to analyze larger datasets and test trading strategies more rapidly. In quantitative finance, where speed is crucial, this performance gain allows teams to uncover market insights faster, potentially gaining a competitive edge.
  • Reduced operational costs – For read-intensive workloads, Iceberg reduced DPU hours by 32.4% and achieved a 10–16% reduction in Amazon S3 storage. These efficiency gains translate to cost savings in data-intensive quant operations. With Iceberg, firms can run more comprehensive analyses within the same budget or reallocate resources to other high-value activities, optimizing their research capabilities.
  • Enhanced data management and scalability – Iceberg showed comparable write performance for unsorted data (899.47 DPU hours vs. 915.70 for vanilla Parquet) and maintained consistent object counts across sorted and unsorted scenarios (7,444 and 9,283, respectively). This consistency leads to more reliable and predictable job execution. For quant teams dealing with large-scale datasets, this reduces time spent on troubleshooting data infrastructure issues and increases focus on developing trading strategies.
  • Improved productivity – Iceberg outperformed vanilla Parquet access across various query types. Simple counts were 52.1% faster, grouping and ordering operations improved by 49.6%, and filtered queries were 47.3% faster for unsorted data. This performance enhancement boosts productivity in quant research workflows. It reduces query completion times, allowing quant developers and researchers to spend more time on model development and market analysis, leading to faster iteration on trading strategies.

Conclusion

Quant research platforms often avoid adopting new data management solutions like Iceberg, fearing performance penalties and increased costs. Our analysis disproves these concerns, demonstrating that Iceberg not only matches or enhances performance compared to direct Amazon S3 access, but also provides substantial additional benefits.

Our tests reveal that Iceberg significantly accelerates query performance, with improvements of up to 52% for unsorted data and 51% for sorted data. This speed boost enables quant researchers to analyze larger datasets and test trading strategies more rapidly, potentially uncovering valuable market insights faster.

Iceberg streamlines data management tasks, allowing researchers to focus on strategy development. Its robust insert, update, and delete capabilities, combined with time travel features, enable effortless management of complex datasets, improving backtest accuracy and facilitating rapid strategy iteration.

The platform’s intelligent handling of partitioning and Amazon S3 API quota issues eliminates undifferentiated heavy lifting, freeing quant teams from low-level data engineering tasks. This automation redirects efforts to high-value activities such as model development and market analysis. Moreover, our tests show that for read-intensive workloads, Iceberg reduced DPU hours by 32.4% and achieved a 10–16% reduction in Amazon S3 storage, leading to significant cost savings.

Flexibility is a key advantage of Iceberg. Its various interfaces, including SQL, DataFrames, and programmatic APIs, integrate seamlessly with existing quant research workflows, accommodating diverse analysis needs and coding preferences.

By adopting Iceberg, quant research teams gain both performance enhancements and powerful data management tools. This combination creates an environment where researchers can push analytical boundaries, maintain high data integrity standards, and focus on generating valuable insights. The improved productivity and reduced operational costs enable quant teams to allocate resources more effectively, ultimately leading to a more competitive edge in quantitative finance.


About the Authors

Guy Bachar is a Senior Solutions Architect at AWS based in New York. He specializes in assisting capital markets customers with their cloud transformation journeys. His expertise encompasses identity management, security, and unified communication.

Sercan KaraogluSercan Karaoglu is Senior Solutions Architect, specialized in capital markets. He is a former data engineer and passionate about quantitative investment research.

Boris LitvinBoris Litvin is a Principal Solutions Architect at AWS. His job is in financial services industry innovation. Boris joined AWS from the industry, most recently Goldman Sachs, where he held a variety of quantitative roles across equity, FX, and interest rates, and was CEO and Founder of a quantitative trading FinTech startup.

Salim TutuncuSalim Tutuncu is a Senior Partner Solutions Architect Specialist on Data & AI, based in Dubai with a focus on the EMEA. With a background in the technology sector that spans roles as a data engineer, data scientist, and machine learning engineer, Salim has built a formidable expertise in navigating the complex landscape of data and artificial intelligence. His current role involves working closely with partners to develop long-term, profitable businesses using the AWS platform, particularly in data and AI use cases.

Alex TarasovAlex Tarasov is a Senior Solutions Architect working with Fintech startup customers, helping them to design and run their data workloads on AWS. He is a former data engineer and is passionate about all things data and machine learning.

Jiwan PanjikerJiwan Panjiker is a Solutions Architect at Amazon Web Services, based in the Greater New York City area. He works with AWS enterprise customers, helping them in their cloud journey to solve complex business problems by making effective use of AWS services. Outside of work, he likes spending time with his friends and family, going for long drives, and exploring local cuisine.

Accelerate queries on Apache Iceberg tables through AWS Glue auto compaction

Post Syndicated from Navnit Shukla original https://aws.amazon.com/blogs/big-data/accelerate-queries-on-apache-iceberg-tables-through-aws-glue-auto-compaction/

Data lakes were originally designed to store large volumes of raw, unstructured, or semi-structured data at a low cost, primarily serving big data and analytics use cases. Over time, as organizations began to explore broader applications, data lakes have become essential for various data-driven processes beyond just reporting and analytics. Today, they play a critical role in syncing with customer applications, enabling the ability to manage concurrent data operations while maintaining the integrity and consistency of information. This shift includes not only storing batch data but also ingesting and processing near real-time data streams, allowing businesses to merge historical insights with live data to power more responsive and adaptive decision-making. However, this new data lake architecture brings challenges around managing transactional support and handling the influx of small files generated by real-time data streams. Traditionally, customers addressed these challenges by performing complex extract, transform, and load (ETL) processes, which often led to data duplication and increased complexity in data pipelines. Additionally, to cope with the proliferation of small files, organizations had to develop custom mechanisms to compact and merge these files, leading to the creation and maintenance of bespoke solutions that were difficult to scale and manage. As data lakes increasingly handle sensitive business data and transactional workloads, maintaining strong data quality, governance, and compliance becomes vital to maintaining trust and regulatory alignment.

To simplify these challenges, organizations have adopted open table formats (OTFs) like Apache Iceberg, which provide built-in transactional capabilities and mechanisms for compaction. OTFs, such as Iceberg, address key limitations in traditional data lakes by offering features like ACID transactions, which maintain data consistency across concurrent operations, and compaction, which helps manage the issue of small files by merging them efficiently. By using features like Iceberg’s compaction, OTFs streamline maintenance, making it straightforward to manage object and metadata versioning at scale. However, although OTFs reduce the complexity of maintaining efficient tables, they still require some regular maintenance to make sure tables remain in an optimal state.

In this post, we explore new features of the AWS Glue Data Catalog, which now supports improved automatic compaction of Iceberg tables for streaming data, making it straightforward for you to keep your transactional data lakes consistently performant. Enabling automatic compaction on Iceberg tables reduces metadata overhead on your Iceberg tables and improves query performance. Many customers have streaming data continuously ingested in Iceberg tables, resulting in a large number of delete files that track changes in data files. With this new feature, as you enable the Data Catalog optimizer. It constantly monitors table partitions and runs the compaction process for both data and delta or delete files, and it regularly commits partial progress. The Data Catalog also now supports heavily nested complex data and supports schema evolution as you reorder or rename columns.

Automatic compaction with AWS Glue

Automatic compaction in the Data Catalog makes sure your Iceberg tables are always in optimal condition. The data compaction optimizer continuously monitors table partitions and invokes the compaction process when specific thresholds for the number of files and file sizes are met. For example, based on the Iceberg table configuration of the target file size, the compaction process will start and continue if the table or any of the partitions within the table have more than the default configuration (for example 100 files), each smaller than 75% of the target file size.

Iceberg supports two table modes: Merge-on-Read (MoR) and Copy-on-Write (CoW). These table modes provide different approaches for handling data updates and play a critical role in how data lakes manage changes and maintain performance:

  • Data compaction on Iceberg CoW – With CoW, any updates or deletes are directly applied to the table files. This means the entire dataset is rewritten when changes are made. Although this provides immediate consistency and simplifies reads (because readers only access the latest snapshot of the data), it can become costly and slow for write-heavy workloads due to the need for frequent rewrites. Announced during AWS re:Invent 2023, this feature focuses on optimizing data storage for Iceberg tables using the CoW mechanism. Compaction in CoW makes sure updates to the data result in new files being created, which are then compacted to improve query performance.
  • Data compaction on Iceberg MoR – Unlike CoW, MoR allows updates to be written separately from the existing dataset, and those changes are only merged when the data is read. This approach is beneficial for write-heavy scenarios because it avoids frequent full table rewrites. However, it can introduce complexity during reads because the system has to merge base and delta files as needed to provide a complete view of the data. MoR compaction, now generally available, allows for efficient handling of streaming data. It makes sure that while data is being continuously ingested, it’s also compacted in a way that optimizes read performance without compromising the ingestion speed.

Whether you are using CoW, MoR, or a hybrid of both, one challenge remains consistent: maintenance around the growing number of small files generated by each transaction. AWS Glue automatic compaction addresses this by making sure your Iceberg tables remain efficient and performant across both table modes.

This post provides a detailed comparison of query performance between auto compacted and non-compacted Iceberg tables. By analyzing key metrics such as query latency and storage efficiency, we demonstrate how the automatic compaction feature optimizes data lakes for better performance and cost savings. This comparison will help guide you in making informed decisions on enhancing your data lake environments.

Solution overview

This blog post explores the performance benefits of the newly launched feature in AWS Glue that supports automatic compaction of Iceberg tables with MoR capabilities. We run two versions of the same architecture: one where the tables are auto compacted, and another without compaction. By comparing both scenarios, this post demonstrates the efficiency, query performance, and cost benefits of auto compacted tables vs. non-compacted tables in a simulated Internet of Things (IoT) data pipeline.

The following diagram illustrates the solution architecture.

The solution consists of the following components:

  • Amazon Elastic Compute Cloud (Amazon EC2) simulates continuous IoT data streams, sending them to Amazon MSK for processing
  • Amazon Managed Streaming for Apache Kafka (Amazon MSK) ingests and streams data from the IoT simulator for real-time processing
  • Amazon EMR Serverless processes streaming data from Amazon MSK without managing clusters, writing results to the Amazon S3 data lake
  • Amazon Simple Storage Service (Amazon S3) stores data using Iceberg’s MoR format for efficient querying and analysis
  • The Data Catalog manages metadata for the datasets in Amazon S3, enabling organized data discovery and querying through Amazon Athena
  • Amazon Athena queries data from the S3 data lake with two table options:
    • Non-compacted table – Queries raw data from the Iceberg table
    • Compacted table – Queries data optimized by automatic compaction for faster performance.

The data flow consists of the following steps:

  1. The IoT simulator on Amazon EC2 generates continuous data streams.
  2. The data is sent to Amazon MSK, which acts as a streaming table.
  3. EMR Serverless processes streaming data and writes the output to Amazon S3 in Iceberg format.
  4. The Data Catalog manages the metadata for the datasets.
  5. Athena is used to query the data, either directly from the non-compacted table or from the compacted table after auto compaction.

In this post, we guide you through setting up an evaluation environment for AWS Glue Iceberg auto compaction performance using the following GitHub repository. The process involves simulating IoT data ingestion, deduplication, and querying performance using Athena.

Compaction IoT performance test

We simulated IoT data ingestion with over 20 billion events and used MERGE INTO for data deduplication across two time-based partitions, involving heavy partition reads and shuffling. After ingestion, we ran queries in Athena to compare performance between compacted and non-compacted tables using the MoR format. This test aims to have low latency on ingestion but will lead to hundreds of millions of small files.

We use the following table configuration settings:

'write.delete.mode'='merge-on-read'
'write.update.mode'='merge-on-read'
'write.merge.mode'='merge-on-read'
'write.distribution.mode=none'

We use 'write.distribution.mode=none' to lower the latency. However, it will increase the number of Parquet files. For other scenarios, you may want to use hash or range distribution write modes to reduce the file count.

This test makes make append operations because we’re appending new data to the table but we don’t have any delete operations.

The following table shows some metrics of the Athena query performance.

 

Execution Time (sec) Performance Improvement (%) Data Scanned (GB)
Query employee (without compaction) employeeauto (with compaction) employee (without compaction) employeeauto (with compaction)
SELECT count(*) FROM "bigdata"."<tablename>" 67.5896 3.8472 94.31% 0 0
SELECT team, name, min(age) AS youngest_age
FROM "bigdata"."<tablename>"
GROUP BY team, name
ORDER BY youngest_age ASC
72.0152 50.4308 29.97% 33.72 32.96
SELECT role, team, avg(age) AS average_age
FROM bigdata."<tablename>"
GROUP BY role, team
ORDER BY average_age DESC
74.1430 37.7676 49.06% 17.24 16.59
SELECT name, age, start_date, role, team
FROM bigdata."<tablename>"
WHERE
CAST(start_date as DATE) > CAST('2023-01-02' as DATE) and
age > 40
ORDER BY start_date DESC
limit 100
70.3376 37.1232 47.22% 105.74 110.32

Because the previous test didn’t perform any delete operations on the table, we conduct a new test involving hundreds of thousands of such operations. We use the previously auto compacted table (employeeauto) as a base, noting that this table uses MoR for all operations.

We run a query that deletes data from each even second on the table:

DELETE FROM iceberg_catalog.bigdata.employeeauto
WHERE start_date BETWEEN 'start' AND 'end'
AND SECOND(start_date) % 2 = 0;

This query runs with table optimizations enabled, using an Amazon EMR Studio notebook. After running the queries, we roll back the table to its previous state for a performance comparison. Iceberg’s time-traveling capabilities allow us to restore the table. We then disable the table optimizations, rerun the delete query, and follow up with Athena queries to analyze performance differences. The following table summarizes our results.

 

Execution Time (sec) Performance Improvement (%) Data Scanned (GB)
Query employee (without compaction) employeeauto (with compaction) employee (without compaction) employeeauto (with compaction)
SELECT count(*) FROM "bigdata"."<tablename>" 29.820 8.71 70.77% 0 0
SELECT team, name, min(age) as youngest_age
FROM "bigdata"."<tablename>"
GROUP BY team, name
ORDER BY youngest_age ASC
58.0600 34.1320 41.21% 33.27 19.13
SELECT role, team, avg(age) AS average_age
FROM bigdata."<tablename>"
GROUP BY role, team
ORDER BY average_age DESC
59.2100 31.8492 46.21% 16.75 9.73
SELECT name, age, start_date, role, team
FROM bigdata."<tablename>"
WHERE
CAST(start_date as DATE) > CAST('2023-01-02' as DATE) and
age > 40
ORDER BY start_date DESC
limit 100
68.4650 33.1720 51.55% 112.64 61.18

We analyze the following key metrics:

  • Query runtime – We compared the runtimes between compacted and non-compacted tables using Athena as the query engine and found significant performance improvements with both MoR for ingestion and appends and MoR for delete operations.
  • Data scanned evaluation – We compared compacted and non-compacted tables using Athena as the query engine and observed a reduction in data scanned for most queries. This reduction translates directly into cost savings.

Prerequisites

To set up your own evaluation environment and test the feature, you need the following prerequisites:

  • A virtual private cloud (VPC) with at least two private subnets. For instructions, see Create a VPC.
  • An EC2 instance c5.xlarge using Amazon Linux 2023 running on one of those private subnets where you will launch the data simulator. For the security group, you can use the default for the VPC. For more information, see Get started with Amazon EC2.
  • An AWS Identity and Access Management (IAM) user with the correct permissions to create and configure all the required resources.

Set up Amazon S3 storage

Create an S3 bucket with the following structure:

s3bucket/
/jars
/employee.desc
/warehouse
/checkpoint
/checkpointAuto

Download the descriptor file employee.desc from the GitHub repo and place it in the S3 bucket.

Download the application on the releases page

Get the packaged application from the GitHub repo, then upload the JAR file to the jars directory on the S3 bucket. The warehouse will be where the Iceberg data and metadata will live and checkpoint will be used for the Structured Streaming checkpointing mechanism. Because we use two streaming job runs, one for compacted and one for non-compacted data, we also create a checkpointAuto folder.

Create a Data Catalog database

Create a database in the Data Catalog (for this post, we name our database bigdata). For instructions, see Getting started with the AWS Glue Data Catalog.

Create an EMR Serverless application

Create an EMR Serverless application with the following settings (for instructions, see Getting started with Amazon EMR Serverless):

  • Type: Spark
  • Version: 7.1.0
  • Architecture: x86_64
  • Java Runtime: Java 17
  • Metastore Integration: AWS Glue Data Catalog
  • Logs: Enable Amazon CloudWatch Logs if desired

Configure the network (VPC, subnets, and default security group) to allow the EMR Serverless application to reach the MSK cluster.

Take note of the application-id to use later for launching the jobs.

Create an MSK cluster

Create an MSK cluster on the Amazon MSK console. For more details, see Get started using Amazon MSK.

You need to use custom create with at least two brokers using 3.5.1, Apache Zookeeper mode version, and instance type kafka.m7g.xlarge. Do not use public access; choose two private subnets to deploy it (one broker per subnet or Availability Zone, for a total of two brokers). For the security group, remember that the EMR cluster and the Amazon EC2 based producer will need to reach the cluster and act accordingly. For security, use PLAINTEXT (in production, you should secure access to the cluster). Choose 200 GB as storage size for each broker and do not enable tiered storage. For network security groups, you can choose the default of the VPC.

For the MSK cluster configuration, use the following settings:

auto.create.topics.enable=true
default.replication.factor=2
min.insync.replicas=2
num.io.threads=8
num.network.threads=5
num.partitions=32
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
zookeeper.session.timeout.ms=18000
compression.type=zstd
log.retention.hours=2
log.retention.bytes=10073741824

Configure the data simulator

Log in to your EC2 instance. Because it’s running on a private subnet, you can use an instance endpoint to connect. To create one, see Connect to your instances using EC2 Instance Connect Endpoint. After you log in, issue the following commands:

sudo yum install java-17-amazon-corretto-devel
wget https://archive.apache.org/dist/kafka/3.5.1/kafka_2.12-3.5.1.tgz
tar xzvf kafka_2.12-3.5.1.tgz

Create Kafka topics

Create two Kafka topics—remember that you need to change the bootstrap server with the corresponding client information. You can get this data from the Amazon MSK console on the details page for your MSK cluster.

cd kafka_2.12-3.5.1/bin/

./kafka-topics.sh --topic protobuf-demo-topic-pure-auto --bootstrap-server kafkaBoostrapString --create
./kafka-topics.sh --topic protobuf-demo-topic-pure --bootstrap-server kafkaBoostrapString –create

Launch job runs

Issue job runs for the non-compacted and auto compacted tables using the following AWS Command Line Interface (AWS CLI) commands. You can use AWS CloudShell to run the commands.

For the non-compacted table, you need to change the s3bucket value as needed and the application-id. You also need an IAM role (execution-role-arn) with the corresponding permissions to access the S3 bucket and to access and write tables on the Data Catalog.

aws emr-serverless start-job-run --application-id application-identifier --name job-run-name --execution-role-arn arn-of-emrserverless-role --mode 'STREAMING' --job-driver '{
"sparkSubmit": {
"entryPoint": "s3://s3bucket/jars/streaming-iceberg-ingest-1.0-SNAPSHOT.jar",
"entryPointArguments": ["true","s3://s3bucket/warehouse","s3://s3bucket/Employee.desc","s3://s3bucket/checkpoint","kafkaBootstrapString","true"],
"sparkSubmitParameters": "--class com.aws.emr.spark.iot.SparkCustomIcebergIngestMoR --conf spark.executor.cores=16 --conf spark.executor.memory=64g --conf spark.driver.cores=4 --conf spark.driver.memory=16g --conf spark.dynamicAllocation.minExecutors=3 --conf spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar --conf spark.dynamicAllocation.maxExecutors=5 --conf spark.sql.catalog.glue_catalog.http-client.apache.max-connections=3000 --conf spark.emr-serverless.executor.disk.type=shuffle_optimized --conf spark.emr-serverless.executor.disk=1000G --files s3://s3bucket/Employee.desc --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1"
}
}'

For the auto compacted table, you need to change the s3bucket value as needed, the application-id, and the kafkaBootstrapString. You also need an IAM role (execution-role-arn) with the corresponding permissions to access the S3 bucket and to access and write tables on the Data Catalog.

aws emr-serverless start-job-run --application-id application-identifier --name job-run-name --execution-role-arn arn-of-emrserverless-role --mode 'STREAMING' --job-driver '{
"sparkSubmit": {
"entryPoint": "s3://s3bucket/jars/streaming-iceberg-ingest-1.0-SNAPSHOT.jar",
"entryPointArguments": ["true","s3://s3bucket/warehouse","/home/hadoop/Employee.desc","s3://s3bucket/checkpointAuto","kafkaBootstrapString","true"],
"sparkSubmitParameters": "--class com.aws.emr.spark.iot.SparkCustomIcebergIngestMoRAuto --conf spark.executor.cores=16 --conf spark.executor.memory=64g --conf spark.driver.cores=4 --conf spark.driver.memory=16g --conf spark.dynamicAllocation.minExecutors=3 --conf spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar --conf spark.dynamicAllocation.maxExecutors=5 --conf spark.sql.catalog.glue_catalog.http-client.apache.max-connections=3000 --conf spark.emr-serverless.executor.disk.type=shuffle_optimized --conf spark.emr-serverless.executor.disk=1000G --files s3://s3bucket/Employee.desc --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1"
}
}'

Enable auto compaction

Enable auto compaction for the employeeauto table in AWS Glue. For instructions, see Enabling compaction optimizer.

Launch the data simulator

Download the JAR file to the EC2 instance and run the producer:

aws s3 cp s3://s3bucket/jars/streaming-iceberg-ingest-1.0-SNAPSHOT.jar .

Now you can start the protocol buffer producers.

For non-compacted tables, use the following commands:

java -cp streaming-iceberg-ingest-1.0-SNAPSHOT.jar 
com.aws.emr.proto.kafka.producer.ProtoProducer kafkaBoostrapString

For auto compacted tables, use the following commands:

java -cp streaming-iceberg-ingest-1.0-SNAPSHOT.jar 
com.aws.emr.proto.kafka.producer.ProtoProducerAuto kafkaBoostrapString

Test the solution in EMR Studio

For the delete test, we use an EMR Studio. For setup instructions, see Set up an EMR Studio. Next, you need to create an EMR Serverless interactive application to run the notebook; refer to Run interactive workloads with EMR Serverless through EMR Studio to create a Workspace.

Open the Workspace, select the interactive EMR Serverless application as the compute option, and attach it.

Download the Jupyter notebook, upload it to your environment, and run the cells using a PySpark kernel to run the test.

Clean up

This evaluation is for high-throughput scenarios and can lead to significant costs. Complete the following steps to clean up your resources:

  1. Stop the Kafka producer EC2 instance.
  2. Cancel the EMR job runs and delete the EMR Serverless application.
  3. Delete the MSK cluster.
  4. Delete the tables and database from the Data Catalog.
  5. Delete the S3 bucket.

Conclusion

The Data Catalog has improved automatic compaction of Iceberg tables for streaming data, making it straightforward for you to keep your transactional data lakes always performant. Enabling automatic compaction on Iceberg tables reduces metadata overhead on your Iceberg tables and improves query performance.

Many customers have streaming data that is continuously ingested in Iceberg tables, resulting in a large set of delete files that track changes in data files. With this new feature, when you enable the Data Catalog optimizer, it constantly monitors table partitions and runs the compaction process for both data and delta or delete files and regularly commits the partial progress. The Data Catalog also has expanded support for heavily nested complex data and supports schema evolution as you reorder or rename columns.

In this post, we assessed the ingestion and query performance of simulated IoT data using AWS Glue Iceberg with auto compaction enabled. Our setup processed over 20 billion events, managing duplicates and late-arriving events, and employed a MoR approach for both ingestion/appends and deletions to evaluate the performance improvement and efficiency.

Overall, AWS Glue Iceberg with auto compaction proves to be a robust solution for managing high-throughput IoT data streams. These enhancements lead to faster data processing, shorter query times, and more efficient resource utilization, all of which are essential for any large-scale data ingestion and analytics pipeline.

For detailed setup instructions, see the GitHub repo.


About the Authors

Navnit Shukla serves as an AWS Specialist Solutions Architect with a focus on Analytics. He possesses a strong enthusiasm for assisting clients in discovering valuable insights from their data. Through his expertise, he constructs innovative solutions that empower businesses to arrive at informed, data-driven choices. Notably, Navnit Shukla is the accomplished author of the book titled Data Wrangling on AWS. He can be reached through LinkedIn.

Angel Conde Manjon is a Sr. PSA Specialist on Data & AI, based in Madrid, and focuses on EMEA South and Israel. He has previously worked on research related to data analytics and artificial intelligence in diverse European research projects. In his current role, Angel helps partners develop businesses centered on data and AI.

Amit Singh currently serves as a Senior Solutions Architect at AWS, specializing in analytics and IoT technologies. With extensive expertise in designing and implementing large-scale distributed systems, Amit is passionate about empowering clients to drive innovation and achieve business transformation through AWS solutions.

Sandeep Adwankar is a Senior Technical Product Manager at AWS. Based in the California Bay Area, he works with customers around the globe to translate business and technical requirements into products that enable customers to improve how they manage, secure, and access data.

Build Write-Audit-Publish pattern with Apache Iceberg branching and AWS Glue Data Quality

Post Syndicated from Tomohiro Tanaka original https://aws.amazon.com/blogs/big-data/build-write-audit-publish-pattern-with-apache-iceberg-branching-and-aws-glue-data-quality/

Given the importance of data in the world today, organizations face the dual challenges of managing large-scale, continuously incoming data while vetting its quality and reliability. The importance of publishing only high-quality data can’t be overstated—it’s the foundation for accurate analytics, reliable machine learning (ML) models, and sound decision-making. Equally crucial is the ability to segregate and audit problematic data, not just for maintaining data integrity, but also for regulatory compliance, error analysis, and potential data recovery.

AWS Glue is a serverless data integration service that you can use to effectively monitor and manage data quality through AWS Glue Data Quality. Today, many customers build data quality validation pipelines using its Data Quality Definition Language (DQDL) because with static rules, dynamic rules, and anomaly detection capability, it’s fairly straightforward.

Apache Iceberg is an open table format that brings atomicity, consistency, isolation, and durability (ACID) transactions to data lakes, streamlining data management. One of its key features is the ability to manage data using branches. Each branch has its own lifecycle, allowing for flexible and efficient data management strategies.

This post explores robust strategies for maintaining data quality when ingesting data into Apache Iceberg tables using AWS Glue Data Quality and Iceberg branches. We discuss two common strategies to verify the quality of published data. We dive deep into the Write-Audit-Publish (WAP) pattern, demonstrating how it works with Apache Iceberg.

Strategy for managing data quality

When it comes to vetting data quality in streaming environments, two prominent strategies emerge: the dead-letter queue (DLQ) approach and the WAP pattern. Each strategy offers unique advantages and considerations.

  • The DLQ approach – Segregate problematic entries from high-quality data so that only clean data makes it into your primary dataset.
  • The WAP pattern – Using branches, segregate problematic entries from high-quality data so that only clean data is published in the main branch.

The DLQ approach

The DLQ strategy focuses on efficiently segregating high-quality data from problematic entries so that only clean data makes it into your primary dataset. Here’s how it works:

  1. As data streams in, it passes through a validation process
  2. Valid data is written directly to the table referred by downstream users
  3. Invalid or problematic data is redirected to a separate DLQ for later analysis and potential recovery

The following screenshot shows this flow.

bdb4341_0_1_dlq

Here are its advantages:

  • Simplicity – The DLQ approach is straightforward to implement, especially when there is only one writer
  • Low latency – Valid data is instantly available in the main branch for downstream consumers
  • Separate processing for invalid data – You can have dedicated jobs to process the DLQ for auditing and recovery purposes.

The DLQ strategy can present significant challenges in complex data environments. With multiple concurrent writers to the same Iceberg table, maintaining consistent DLQ implementation becomes difficult. This issue is compounded when different engines (for example, Spark, Trino, or Python) are used for writes because the DLQ logic may vary between them, making system maintenance more complex. Additionally, storing invalid data separately can lead to management overhead.

Additionally, for low-latency requirements, the processing validation step may introduce additional delays. This creates a challenge in balancing data quality with speed of delivery.

To solve those challenges in a reasonable way, we introduce the WAP pattern in the next section.

The WAP pattern

The WAP pattern implements a three-stage process:

  1. Write – Data is initially written to a staging branch
  2. Audit – Quality checks are performed on the staging branch
  3. Publish – Validated data is merged into the main branch for consumption

The following screenshot shows this flow.

bdb4341_0_2_wap

Here are its advantages:

  • Flexible data latency management – In the WAP pattern, the raw data is ingested to the staging branch without data validation, and then the high-quality data is ingested to the main branch with data validation. With this characteristic, there’s flexibility to achieve urgent, low-latency data handling on the staging branch and achieve high-quality data handling on the main branch.
  • Unified data quality management – The WAP pattern separates the audit and publish logic from the writer applications. It provides a unified approach to quality management, even with multiple writers or varying data sources. The audit phase can be customized and evolved without affecting the write or publish stages.

The primary challenge of the WAP pattern is the increased latency it introduces. The multistep process inevitably delays data availability for downstream consumers, which may be problematic for near real-time use cases. Furthermore, implementing this pattern requires more sophisticated orchestration compared to the DLQ approach, potentially increasing development time and complexity.

How the WAP pattern works with Iceberg

The following sections explore how the WAP pattern works with Iceberg.

Iceberg’s branching feature

Iceberg offers a branching feature for data lifecycle management, which is particularly useful for efficiently implementing the WAP pattern. The metadata of an Iceberg table stores a history of snapshots. These snapshots, created for each change to the table, are fundamental to concurrent access control and table versioning. Branches are independent histories of snapshots branched from another branch, and each branch can be referred to and updated separately.

When a table is created, it starts with only a main branch, and all transactions are initially written to it. You can create additional branches, such as an audit branch, and configure engines to write to them. Changes on one branch can be fast-forwarded to another branch using Spark’s fast_forward procedure, as shown in the following screenshot.

bdb4341_0_3_iceberg-branch

How to manage Iceberg branches

In this section, we cover the essential operations for managing Iceberg branches using SparkSQL. We’ll demonstrate how to use the branches, specifically, to create a new branch, write to and read from a specific branch, and set a default branch for a Spark session. These operations form the foundation for implementing the WAP pattern with Iceberg.

To create a branch, run the following SparkSQL query:

ALTER TABLE glue_catalog.db.tbl CREATE BRANCH audit

To specify a branch to be updated, use the glue_catalog.<database_name>.<table_name>.branch_<branch_name> syntax:

INSERT INTO glue_catalog.db.tbl.branch_audit VALUES (1, 'a'), (2, 'b');

To specify a branch to be queried, use the glue_catalog.<database_name>.<table_name>.branch_<branch_name> syntax:

SELECT * FROM glue_catalog.db.tbl.branch_audit;

To specify a branch for the entire Spark session scope, set the branch name to the Spark parameter spark.wap.branch. After this parameter is set, all queries will refer to the specified branch without explicit expression:

SET spark.wap.branch = audit

-- audit branch will be updated
INSERT INTO glue_catalog.db.tbl VALUES (3, 'c');

How to implement the WAP pattern with Iceberg branches

Using Iceberg’s branching feature, we can efficiently implement the WAP pattern with a single Iceberg table. Additionally, Iceberg characteristics such as ACID transactions and schema evolution are useful for handling multiple concurrent writers and varying data.

  1. Write – The data ingestion process switches branch from main and it commits updates to the audit branch, instead of the main branch. At this point, these updates aren’t accessible to downstream users who can only access the main branch.
  2. Audit – The audit process runs data quality checks on the data in the audit branch. It specifies which data is clean and ready to be provided.
  3. Publish – The audit process publishes validated data to the main branch with the Iceberg fast_forward procedure, making it available for downstream users.

This flow is shown in the following screenshot.

bdb4341_0_4_wap-w-iceberg-branch

By implementing the WAP pattern with Iceberg, we can obtain several advantages:

  • Simplicity – Iceberg branches can express multiple states of a table, such as audit and main, within one table. We can have unified data management even when handling multiple data contexts separately and uniformly.
  • Handling concurrent writers – Iceberg tables are ACID compliant, so consistent reads and writes are guaranteed even when multiple reader and writer processes run concurrently.
  • Schema evolution – If there are issues with the data being ingested, its schema may differ from the table definition. Spark supports dynamic schema merging for Iceberg tables. Iceberg tables can flexibly evolve their schema to write data with inconsistent schemas. By configuring the following parameters, when schema changes occur, new columns from the source are added to the target table with NULL values for existing rows. Columns present only in the target have their values set to NULL for new insertions or left unchanged during updates.
SET `spark.sql.iceberg.check-ordering` = false

ALTER TABLE glue_catalog.db.tbl SET TBLPROPERTIES (
    'write.spark.accept-any-schema'='true'
)
df.writeTo("glue_catalog.db.tbl").option("merge-schema","true").append()

As an intermediate wrap-up, the WAP pattern offers a robust approach to managing the balance between data quality and latency. With Iceberg branches, we can implement WAP pattern simply on single Iceberg table with handling concurrent writers and schema evolution.

Example use case

Suppose that a home monitoring system tracks room temperature and humidity. The system captures and sends the data to an Iceberg based data lake built on top of Amazon Simple Storage Service (Amazon S3). The data is visualized using matplotlib for interactive data analysis. For the system, issues such as device malfunctions or network problems can lead to partial or erroneous data being written, resulting in incorrect insights. In many cases, these issues are only detected after the data is sent to the data lake. Additionally, the correctness of such data is generally complicated.

To address these issues, the WAP pattern using Iceberg branches is applied for the system in this post. Through this approach, the incoming room data to the data lake is evaluated for quality before being visualized, and you make sure that only qualified room data is used for further data analysis. With the WAP pattern using the branches, you can achieve effective data management and promote data quality in downstream processes. The solution is demonstrated using AWS Glue Studio notebook, which is a managed Jupyter Notebook for interacting with Apache Spark.

Prerequisites

The following prerequisites are necessary for this use case:

Set up resources with AWS CloudFormation

First, you use a provided AWS CloudFormation template to set up resources to build Iceberg environments. The template creates the following resources:

  • An S3 bucket for metadata and data files of an Iceberg table
  • A database for the Iceberg table in AWS Glue Data Catalog
  • An AWS Identity and Access Management (IAM) role for an AWS Glue job

Complete the following steps to deploy the resources.

  1. Choose Launch stack.

Launch Button

  1. For the Parameters, IcebergDatabaseName is set by default. You can also change the default value. Then, choose Next.
  2. Choose Next.
  3. Choose I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  4. Choose Submit.
  5. After the stack creation is complete, check the Outputs The resource values are used in the following sections.

Next, configure the Iceberg JAR files to the session to use the Iceberg branch feature. Complete the following steps:

  1. Select the following JAR files from the Iceberg releases page and download these JAR files on your local machine:
    1. 1.6.1 Spark 3.3_with Scala 2.12 runtime Jar
    2. 1.6.1 aws-bundle Jar
  2. Open the Amazon S3 console and select the S3 bucket you created through the CloudFormation stack. The S3 bucket name can be found on the CloudFormation Outputs tab.
  3. Choose Create folder and create the jars path in the S3 bucket.
  4. Upload the two downloaded JAR files to s3://<IcebergS3Bucket>/jars/ from the S3 console.

Upload a Jupyter Notebook on AWS Glue Studio

After launching the CloudFormation stack, you create an AWS Glue Studio notebook to use Iceberg with AWS Glue. Complete the following steps.

  1. Download wap.ipynb.
  2. Open AWS Glue Studio console.
  3. Under Create job, select Notebook.
  4. Select Upload Notebook, choose Choose file, and upload the notebook you downloaded.
  5. Select the IAM role name, such as IcebergWAPGlueJobRole, that you created through the CloudFormation stack. Then, choose Create notebook.
  6. For Job name at the left top of the page, enter iceberg_wap.
  7. Choose Save.

Configure Iceberg branches

Start by creating an Iceberg table that contains a room temperature and humidity dataset. After creating the Iceberg table, create branches that are used for performing the WAP practice. Complete the following steps:

  1. On the Jupyter Notebook that you created in Upload a Jupyter Notebook on AWS Glue Studio, run the following cell to use Iceberg with Glue. %additional_python_modules pandas==2.2 is used to visualize the temperature and humidity data in the notebook with pandas. Before running the cell, replace <IcebergS3Bucket> with the S3 bucket name where you uploaded the Iceberg JAR files.

bdb4341_1_session-config

  1. Initialize the SparkSession by running the following cell. The first three settings, starting with spark.sql, are required to use Iceberg with Glue. The default catalog name is set to glue_catalog using spark.sql.defaultCatalog. The configuration spark.sql.execution.arrow.pyspark.enabled is set to true and is used for data visualization with pandas.

bdb4341_2_sparksession-init

  1. After the session is created (the notification Session <Session Id> has been created. will be displayed in the notebook), run the following commands to copy the temperature and humidity dataset to the S3 bucket you created through the CloudFormation stack. Before running the cell, replace <IcebergS3Bucket> with the name of the S3 bucket for Iceberg, which you can find on the CloudFormation Outputs tab.
!aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-4341/data/part-00000-fa08487a-43c2-4398-bae9-9cb912f8843c-c000.snappy.parquet s3://<IcebergS3Bucket>/src-data/current/ 
!aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-4341/data/new-part-00000-e8a06ab0-f33d-4b3b-bd0a-f04d366f067e-c000.snappy.parquet s3://<IcebergS3Bucket>/src-data/new/
  1. Configure the data source bucket name and path (DATA_SRC), Iceberg data warehouse path (ICEBERG_LOC), and database and table names for an Iceberg table (DB_TBL). Replace <IcebergS3Bucket> with the S3 bucket from the CloudFormation Outputs tab.
  2. Read the dataset and create the Iceberg table with the dataset using the Create Table As Select (CTAS) query.

bdb4341_3_ctas

  1. Run the following code to display the temperature and humidity data for each room in the Iceberg table. Pandas and matplotlib are used to visualize the data for each room. The data from 10:05 to 10:30 is displayed in the notebook, as shown in the following screenshot, with each room showing approximately 25°C for temperature (displayed as the blue line) and 52% for humidity (displayed as the orange line).
import matplotlib.pyplot as plt
import pandas as pd

CONF = [
    {'room_type': 'myroom', 'cols':['current_temperature', 'current_humidity']},
    {'room_type': 'living', 'cols':['current_temperature', 'current_humidity']},
    {'room_type': 'kitchen', 'cols':['current_temperature', 'current_humidity']}
]

fig, axes = plt.subplots(nrows=3, ncols=1, sharex=True, sharey=True)
for ax, conf in zip(axes.ravel(), CONF):
    df_room = spark.sql(f"""
        SELECT current_time, current_temperature, current_humidity, room_type
        FROM {DB_TBL} WHERE room_type = '{conf['room_type']}'
        ORDER BY current_time ASC
        """)
    pdf = df_room.toPandas()
    pdf.set_index(pdf['current_time'], inplace=True)
    plt.xlabel('time')
    plt.ylabel('temperature/humidity')
    plt.ylim(10, 60)
    plt.yticks([tick for tick in range(10, 60, 10)])
    pdf[conf['cols']].plot.line(ax=ax, grid=True, figsize=(8, 6), title=conf['room_type'], legend=False, marker=".", markersize=2, linewidth=0)

plt.legend(['temperature', 'humidity'], loc='center', bbox_to_anchor=(0, 1, 1, 5.5), ncol=2)

%matplot plt

bdb4341_4_vis-1

  1. You create Iceberg branches by running the following queries before writing data into the Iceberg table. You can create an Iceberg branch by the ALTER TABLE db.table CREATE BRANCH <branch_name> query.
ALTER TABLE iceberg_wap_db.room_data CREATE BRANCH stg
ALTER TABLE iceberg_wap_db.room_data CREATE BRANCH audit

Now, you’re ready to build the WAP pattern with Iceberg.

Build WAP pattern with Iceberg

Use the Iceberg branches created earlier to implement the WAP pattern. You start writing the newly incoming temperature and humidity data including erroneous values to the stg branch in the Iceberg table.

Write phase: Write incoming data into the Iceberg stg branch

To write the incoming data into the stg branch in the Iceberg table, complete the following steps:

  1. Run the following cell and write the data into Iceberg table.

bdb4341_5_write

  1. After the records are written, run the following code to visualize the current temperature and humidity data in the stg On the following screenshot, notice that new data was added after 10:30. The output shows incorrect readings, such as around 100°C for temperature between 10:35 and 10:52 in the living room.
fig, axes = plt.subplots(nrows=3, ncols=1, sharex=True, sharey=True)
for ax, conf in zip(axes.ravel(), CONF):
    df_room_stg = spark.sql(f"""
        SELECT current_time, current_temperature, current_humidity, room_type
        FROM {DB_TBL}.branch_stg WHERE room_type = '{conf['room_type']}'
        ORDER BY current_time ASC
        """)
    pdf = df_room_stg.toPandas()
    pdf.set_index(pdf['current_time'], inplace=True)
    plt.xlabel('time')
    plt.ylabel('temperature/humidity')
    plt.ylim(10, 110)
    plt.yticks([tick for tick in range(10, 110, 30)])
    pdf[conf['cols']].plot.line(ax=ax, grid=True, figsize=(8, 6), title=conf['room_type'], legend=False, marker=".", markersize=2, linewidth=0)

plt.legend(['temperature', 'humidity'], loc='center', bbox_to_anchor=(0, 1, 1, 5.5), ncol=2)

%matplot plt

bdb4341_6_vis-2

The new temperature data including erroneous records was written to the stg branch. This data isn’t visible to the downstream side because it hasn’t been published to the main branch. Next, you evaluate the data quality in the stg branch.

Audit phase: Evaluate the data quality in the stg branch

In this phase, you evaluate the quality of the temperature and humidity data in the stg branch using AWS Glue Data Quality. Then, the data that doesn’t meet the criteria is filtered out based on the data quality rules, and the qualified data is used to update the latest snapshot in the audit branch. Start with the data quality evaluation:

  1. Run the following code to evaluate the current data quality using AWS Glue Data Quality. The evaluation rule is defined in DQ_RULESET, where the normal temperature range is set between −10 and 50°C based on the device specifications. Any values out of this range are considered erroneous in this scenario.
from awsglue.context import GlueContext
from awsglue.transforms import SelectFromCollection
from awsglue.dynamicframe import DynamicFrame
from awsgluedq.transforms import EvaluateDataQuality
DQ_RULESET = """Rules = [ ColumnValues "current_temperature" between -10 and 50 ]"""


dyf = DynamicFrame.fromDF(
    dataframe=spark.sql(f"SELECT * FROM {DB_TBL}.branch_stg"),
    glue_ctx=GlueContext(spark.sparkContext),
    name='dyf')

dyfc_eval_dq = EvaluateDataQuality().process_rows(
    frame=dyf,
    ruleset=DQ_RULESET,
    publishing_options={
        "dataQualityEvaluationContext": "dyfc_eval_dq",
        "enableDataQualityCloudWatchMetrics": False,
        "enableDataQualityResultsPublishing": False,
    },
    additional_options={"performanceTuning.caching": "CACHE_NOTHING"},
)

# Show DQ results
dyfc_rule_outcomes = SelectFromCollection.apply(
    dfc=dyfc_eval_dq,
    key="ruleOutcomes")
dyfc_rule_outcomes.toDF().select('Outcome', 'FailureReason').show(truncate=False)
  1. The output shows the result of the evaluation. It displays Failed because some temperature data, such as 105°C, is out of the normal temperature range of −10 to 50°C.
+-------+------------------------------------------------------+
|Outcome|FailureReason                                         |
+-------+------------------------------------------------------+
|Failed |Value: 105.0 does not meet the constraint requirement!|
+-------+------------------------------------------------------+
  1. After the evaluation, filter out the incorrect temperature data in the stg branch, then update the latest snapshot in the audit branch with the valid temperature data.

bdb4341_7_write-to-audit

Through the data quality evaluation, the audit branch in the Iceberg table now contains the valid data, which is ready for downstream use.

Publish phase: Publish the valid data to the downstream side

To publish the valid data in the audit branch to main, complete the following steps:

  1. Run the fast_forward Iceberg procedure to publish the valid data in the audit branch to the downstream side.

bdb4341_8_publish

  1. After the procedure is complete, review the published data by querying the main branch in the Iceberg table to simulate the query from the downstream side.
fig, axes = plt.subplots(nrows=3, ncols=1, sharex=True, sharey=True)
for ax, conf in zip(axes.ravel(), CONF):
    df_room_main = spark.sql(f"""
        SELECT current_time, current_temperature, current_humidity, room_type
        FROM {DB_TBL} WHERE room_type = '{conf['room_type']}'
        ORDER BY current_time ASC
        """)
    pdf = df_room_main.toPandas()
    pdf.set_index(pdf['current_time'], inplace=True)
    plt.xlabel('time')
    plt.ylabel('temperature/humidity')
    plt.ylim(10, 60)
    plt.yticks([tick for tick in range(10, 60, 10)])
    pdf[conf['cols']].plot.line(ax=ax, grid=True, figsize=(8, 6), title=conf['room_type'], legend=False, marker=".", markersize=2, linewidth=0)

plt.legend(['temperature', 'humidity'], loc='center', bbox_to_anchor=(0, 1, 1, 5.5), ncol=2)

%matplot plt

The query result shows only the valid temperature and humidity data that has passed the data quality evaluation.

bdb4341_9_vis-3

In this scenario, you successfully managed data quality by applying the WAP pattern with Iceberg branches. The room temperature and humidity data, including any erroneous records, was first written to the staging branch for quality evaluation. This approach prevented erroneous data from being visualized and leading to incorrect insights. After the data was validated by AWS Glue Data Quality, only valid data was published to the main branch and visualized in the notebook. Using the WAP pattern with Iceberg branches, you can make sure that only validated data is passed to the downstream side for further analysis.

Clean up resources

To clean up the resources, complete the following steps:

  1. On the Amazon S3 console, select the S3 bucket aws-glue-assets-<ACCOUNT_ID>-<REGION> where the Notebook file (iceberg_wap.ipynb) is stored. Delete the Notebook file located in the notebook path.
  2. Select the S3 bucket you created through the CloudFormation template. You can obtain the bucket name from IcebergS3Bucket key on the CloudFormation Outputs tab. After selecting the bucket, choose Empty to delete all objects.
  3. After you confirm the bucket is empty, delete the CloudFormation stack iceberg-wap-baseline-resources.

Conclusion

In this post, we explored common strategies for maintaining data quality when ingesting data into Apache Iceberg tables. The step-by-step instructions demonstrated how to implement the WAP pattern with Iceberg branches. For use cases requiring data quality validation, the WAP pattern provides the flexibility to manage data latency even with concurrent writer applications without impacting downstream applications.


About the Authors

Tomohiro Tanaka is a Senior Cloud Support Engineer at Amazon Web Services. He’s passionate about helping customers use Apache Iceberg for their data lakes on AWS. In his free time, he enjoys a coffee break with his colleagues and making coffee at home.

Sotaro Hikita is a Solutions Architect. He supports customers in a wide range of industries, especially the financial industry, to build better solutions. He is particularly passionate about big data technologies and open source software.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Implement historical record lookup and Slowly Changing Dimensions Type-2 using Apache Iceberg

Post Syndicated from Tomohiro Tanaka original https://aws.amazon.com/blogs/big-data/implement-historical-record-lookup-and-slowly-changing-dimensions-type-2-using-apache-iceberg/

In today’s data-driven world, tracking and analyzing changes over time has become essential. As organizations process vast amounts of data, maintaining an accurate historical record is crucial. History management in data systems is fundamental for compliance, business intelligence, data quality, and time-based analysis. It enables organizations to maintain audit trails, perform trend analysis, identify data quality issues, and conduct point-in-time reporting. When combined with Change Data Capture (CDC), which identifies and captures database changes, history management becomes even more potent.

Common use cases for historical record management in CDC scenarios span various domains. In customer relationship management, it tracks changes in customer information over time. Financial systems use it for maintaining accurate transaction and balance histories. Inventory management benefits from historical data for analyzing sales patterns and optimizing stock levels. HR systems use it to track employee information changes. In fraud detection, historical data helps identify anomalous patterns in transactions or user behaviors.

This post will explore how to implement these functionalities using Apache Iceberg, focusing on Slowly Changing Dimensions (SCD) Type-2. This method creates new records for each data change while preserving old ones, thus maintaining a full history. By the end, you’ll understand how to use Apache Iceberg to manage historical records effectively on a typical CDC architecture.

Historical record lookup

How can we retrieve the history of given records? This is a fundamental question in data management, especially when dealing with systems that need to track changes over time. Let’s explore this concept with a practical example.

Consider a product (Heater) in an ecommerce database:

product_id product_name price
00001 Heater 250

Now, let’s say we update the price of this product from 250 to 500. After some time, we want to retrieve the price history of this heater. In a traditional database setup, this task could be challenging, especially if we haven’t explicitly designed our system to track historical changes.

This is where the concept of historical record lookup becomes crucial. We need a system that not only stores the current state of our data but also maintains a log of all changes made to each record over time. This allows us to answer questions like:

  • What was the price of the heater at a specific point in time?
  • How many times has the price changed, and when did these changes occur?
  • What was the price trend of the heater over the past year?

Implementing such a system can be complex, requiring careful consideration of data storage, retrieval mechanisms, and query optimization. This is where Apache Iceberg comes into play, offering a feature known as the change log view.

The change log view in Apache Iceberg provides a view of all changes made to a table over time, making it straightforward to query and analyze the history of any record. With change log view, we can easily track insertions, updates, and deletions, giving us a complete picture of how our data has evolved.

For our heater example, Iceberg’s change log view would allow us to effortlessly retrieve a timeline of all price changes, complete with timestamps and other relevant metadata, as shown in the following table.

product_id product_name price _change_type
00001 Heater 250 INSERT
00001 Heater 250 UPDATE_BEFORE
00001 Heater 500 UPDATE_AFTER

This capability not only simplifies historical analysis but also opens possibilities for advanced time-based analytics, auditing, and data governance.

Historical table lookup with SCD Type-2

SCD Type-2 is a key concept in data warehousing and historical data management and is particularly relevant to Change Data Capture (CDC) scenarios. SCD Type-2 creates new rows for changed data instead of overwriting existing records, allowing for comprehensive tracking of changes over time.

SCD Type-2 requires additional fields such as effective_start_date, effective_end_date, and current_flag to manage historical records. This approach has been widely used in data warehouses to track changes in various dimensions such as customer information, product details, and employee data. In the example of the previous section, here’s what the SCD Type-2 looks like assuming the update operation is performed on December 11, 2024.

product_id product_name price effective_start_date effective_end_date current_flag
00001 Heater 250 2024-12-10 2024-12-11 FALSE
00001 Heater 500 2024-12-11 NULL TRUE

SCD Type-2 is particularly valuable in CDC use cases, where capturing all data changes over time is crucial. It enables point-in-time analysis, provides detailed audit trails, aids in data quality management, and helps meet compliance requirements by preserving historical data.

In traditional implementations on data warehouses, SCD Type-2 requires its specific handling in all INSERT, UPDATE, and DELETE operations that affect those additional columns. For example, to update the price of the product, you need to run the following query.

UPDATE product SET effective_end_date = '2024-12-11', current_flag = false
WHERE product_id = '00001' AND current_flag = true;

INSERT INTO product (product_id, product_name, price, effective_start_date, effective_end_date, current_flag)
VALUES ('00001', 'Heater', 500, '2024-12-11', NULL, true);

For modern data lakes, we propose a new approach to implement SCD Type-2. With Iceberg, you can create a dedicated view of SCD Type-2 on top of the change log view, eliminating the need to implement specific handling to make changes on SCD Type-2 tables. With this approach, you can keep managing Iceberg tables without complexity considering SCD Type-2 specification. Anytime when you need SCD Type-2 snapshot of your Iceberg table, you can create the corresponding representation. This approach combines the power of Iceberg’s efficient data management with the historical tracking capabilities of SCD Type-2. By using the change log view, Iceberg can dynamically generate the SCD Type-2 structure without the overhead of maintaining additional tables or manually managing effective dates and flags.

This streamlined method not only makes the implementation of SCD Type-2 more straightforward, but also offers improved performance and scalability for handling large volumes of historical data in CDC scenarios. It represents a significant advancement in historical data management, merging traditional data warehousing concepts with modern big data capabilities.

As we delve deeper into Iceberg’s features, we’ll explore how this approach can be implemented, showcasing the efficiency and flexibility it brings to historical data analysis and CDC processes.

Prerequisites

The following prerequisites are required for the use cases:

Set up resources with AWS CloudFormation

Use a provided AWS CloudFormation template to set up resources to build Iceberg environments. The template creates the following resources:

Complete the following steps to deploy the resources.

  1. Choose Launch stack

Launch Button

  1. For the parameters, IcebergDatabaseName is set by default. You can change the default value. Then, choose Next.
  2. Choose Next
  3. Choose I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  4. Choose Submit.
  5. After the stack creation is complete, check the Outputs tab and make a note of the resource values, which are used in the following sections.

Next, configure the Iceberg JAR files to the session to use the Iceberg change log view feature. Complete the following steps.

  1. Select the following JAR files from the Iceberg releases page and download these JAR files on your local machine:
    1. 1.6.1 Spark 3.3_with Scala 2.12 runtime Jar.
    2. 1.6.1 aws-bundle Jar.
  2. Open the Amazon S3 console and select the S3 bucket you created using the CloudFormation stack. The S3 bucket name can be found on the CloudFormation Outputs tab.
  3. Choose Create folder and create the jars path in the S3 bucket.
  4. Upload the two downloaded JAR files on s3://<IcebergS3Bucket>/jars/ from the S3 console.

Upload a Jupyter Notebook on AWS Glue Studio

After launching the CloudFormation stack, create an AWS Glue Studio notebook to use Iceberg with AWS Glue.

  1. Download history.ipynb.
  2. Open AWS Glue Studio console.
  3. Under Create job, select Notebook.
  4. Select Upload Notebook, choose Choose file and upload the Notebook you downloaded.
  5. Select the IAM role name such as IcebergHistoryGlueJobRole that you created using the CloudFormation template. Then, choose Create notebook.

1_upload-notebook

  1. For Job name at the left top of the page, enter iceberg_history.
  2. Choose Save.

Create an Iceberg table

To create an Iceberg table using a product dataset, complete the following steps.

  1. On the Jupyter Notebook that you created in Upload a Jupyter Notebook on AWS Glue Studio, run the following cell to use Iceberg with AWS Glue. Before running the cell, replace <IcebergS3Bucket> with the S3 bucket name where you uploaded the Iceberg JAR files.

2_session-config

  1. Initialize the SparkSession with Iceberg settings.

3_ss-init

  1. Configure database and table names for an Iceberg table (DB_TBL) and data warehouse path (ICEBERG_LOC). Replace <IcebergS3Bucket> with the S3 bucket from the CloudFormation Outputs tab.
  2. Run the following code to create the Iceberg table using the Spark DataFrame based on the product dataset.
from pyspark.sql import Row
import time
ut = time.time()
product = [
    {'product_id': '00001', 'product_name': 'Heater', 'price': 250, 'category': 'Electronics', 'updated_at': ut},
    {'product_id': '00002', 'product_name': 'Thermostat', 'price': 400, 'category': 'Electronics', 'updated_at': ut},
    {'product_id': '00003', 'product_name': 'Television', 'price': 600, 'category': 'Electronics', 'updated_at': ut},
    {'product_id': '00004', 'product_name': 'Blender', 'price': 100, 'category': 'Electronics', 'updated_at': ut},
    {'product_id': '00005', 'product_name': 'USB charger', 'price': 50, 'category': 'Electronics', 'updated_at': ut}
]
df_products = spark.createDataFrame(Row(**x) for x in product)
df_products.createOrReplaceTempView('tmp')

spark.sql(f"""
CREATE TABLE {DB_TBL} USING iceberg LOCATION '{ICEBERG_LOC}'
AS SELECT * FROM tmp
""")
  1. After creating the Iceberg table, run SELECT * FROM iceberg_history_db.products ORDER BY product_id to show the product data in the Iceberg table. Currently the following five products are stored in the Iceberg table.
+----------+------------+-----+-----------+--------------------+
|product_id|product_name|price|   category|          updated_at|
+----------+------------+-----+-----------+--------------------+
|     00001|      Heater|  250|Electronics|1.7297845122056053E9|
|     00002|  Thermostat|  400|Electronics|1.7297845122056053E9|
|     00003|  Television|  600|Electronics|1.7297845122056053E9|
|     00004|     Blender|  100|Electronics|1.7297845122056053E9|
|     00005| USB charger|   50|Electronics|1.7297845122056053E9|
+----------+------------+-----+-----------+--------------------+

Next, look up the historical changes for a product using Iceberg’s change log view feature.

Implement historical record lookup with Iceberg’s change log view

Suppose that there’s a source table whose table records are replicated to the Iceberg table through a Change Data Capture (CDC) process. When the records in the source table are updated, these changes are then mirrored in the Iceberg table. In this section, you look up the history of a given record for such a system to capture the history of product updates. For example, the following updates occur in the source table. Through the CDC process, these changes are applied to the Iceberg table.

  • Upsert (update and insert) the two records:
    • The price of Heater (product_id: 00001) is updated from 250 to 500.
    • A new product Chair (product_id: 00006) is added.
  • Television (product_id: 00003) is deleted.

To simulate the CDC workflow, you manually apply these changes to the Iceberg table in the notebook.

  1. Use the MERGE INTO query to upsert records. If an input record in the Spark DataFrame has the same product_id as an existing record, the existing record is updated. If no matching product_id is found, the input record is inserted into the Iceberg table.

4-merge-into

  1. Delete Television from the Iceberg table by running the DELETE query.
DELETE FROM iceberg_history_db.products WHERE product_id = '00003'
  1. Then, run SELECT * FROM iceberg_history_db.products ORDER BY product_id to show the product data in the Iceberg table. You can confirm that the price of Heater is updated to 500, Chair is added and Television is deleted.
+----------+------------+-----+-----------+--------------------+
|product_id|product_name|price|   category|          updated_at|
+----------+------------+-----+-----------+--------------------+
|     00001|      Heater|  500|Electronics|    1.729790106579E9|
|     00002|  Thermostat|  400|Electronics|1.7297845122056053E9|
|     00004|     Blender|  100|Electronics|1.7297845122056053E9|
|     00005| USB charger|   50|Electronics|1.7297845122056053E9|
|     00006|       Chair|   50|  Furniture|    1.729790106579E9|
+----------+------------+-----+-----------+--------------------+

For the Iceberg table, where changes from the source table are replicated, you can track the record changes using Iceberg’s change log view. To start, you first create a change log view from the Iceberg table.

  1. Run the create_changelog_view Iceberg procedure to create a change log view.

5-clv

  1. Run the following query to retrieve the historical changes for Heater.
SELECT product_id, product_name, price, category, updated_at, _change_type
FROM products_clv WHERE product_id = '00001'
ORDER BY _change_ordinal, _change_type DESC
  1. The query result shows the historical changes to Heater. You can confirm that the price of Heater was updated from 250 to 500 from the output.
+----------+------------+-----+-----------+--------------------+-------------+
|product_id|product_name|price|   category|          updated_at| _change_type|
+----------+------------+-----+-----------+--------------------+-------------+
|     00001|      Heater|  250|Electronics|1.7297902833360643E9|       INSERT|
|     00001|      Heater|  250|Electronics|1.7297902833360643E9|UPDATE_BEFORE|
|     00001|      Heater|  500|Electronics|1.7297903836233025E9| UPDATE_AFTER|
+----------+------------+-----+-----------+--------------------+-------------+

Using Iceberg’s change log view, you can obtain the history of a given record directly from the Iceberg table’s history, without needing to create a separate table for managing record history. Next, you implement Slowly Changing Dimension (SCD) Type-2 using the change log view.

Implement SCD Type-2 with Iceberg’s change log view

The SCD Type-2 based table retains the full history of record changes and it can be used in multiple cases such as historical tracking, point-in-time analysis, regulatory compliance, and so on. In this section, you implement SCD Type-2 using the change log view (products_clv) that was created in the previous section. The change log view has a schema that’s similar to the schema defined in the SCD Type-2 specifications. For this change log view, you add effective_start, effective_end, and is_current columns. To add these columns and then implement SCD Type-2, complete the following steps.

  1. Run the following query to implement SCD Type-2. In the WITH AS (...) section of the query, the change log view is merged with the Iceberg table snapshots using the snapshot_id key to include the commit time for each record change. You can obtain the table snapshots by querying for db.table.snapshots. The other part in the query identifies both current and non-current entries by comparing the commit times for each product. It then sets the effective time for each product, and marks whether a product is current or not based on the effective time and the change type from the change log view.
WITH clv_snapshots AS (
    SELECT
        clv.*,
        s.snapshot_id,
        s.committed_at,
        s.committed_at as effective_start
    FROM products_clv clv
    JOIN iceberg_history_db.products.snapshots s
    ON clv._commit_snapshot_id = s.snapshot_id
) 
SELECT
    product_id, 
    product_name, 
    price, 
    category, 
    updated_at,
    effective_start,
    CASE
        WHEN effective_start != l_part_committed_at 
            OR _change_type = 'UPDATE_BEFORE' THEN l_part_committed_at
        ELSE CAST(null as timestamp)
    END as effective_end,
    CASE
        WHEN effective_start != l_part_committed_at
            OR _change_type = 'UPDATE_BEFORE' 
            OR _change_type = 'DELETE' THEN CAST(false as boolean)
        ELSE CAST(true as boolean)
    END as is_current
FROM (SELECT *, MAX(committed_at) OVER (PARTITION BY product_id, updated_at) as l_part_committed_at FROM clv_snapshots)
WHERE _change_type != 'UPDATE_BEFORE'
ORDER BY product_id,  _change_ordinal
  1. The query result shows the SCD Type-2 based schema and records.

7-output

After the query result is displayed, this SCD Type-2 based table is stored as scdt2 to allow access for further analysis.

SCD Type-2 is useful for many use cases. To explore how this SCD Type-2 implementation can be used to track the history of table records, run the following example queries.

  1. Run the following query to retrieve deleted or updated records in a specific period. This query captures which records were changed during that timeframe, allowing you to audit changes for further use-cases such as trend analysis, regulatory compliance checks, and so on. Before running the query, replace <START_DATETIME> and <END_DATETIME> with specific time ranges such as 2024-10-24 17:18:00 and 2024-10-24 17:20:00.
SELECT product_id, product_name, price, category, updated_at, effective_start, effective_end, is_current 
FROM scdt2 WHERE product_id IN ( SELECT product_id FROM scdt2 
WHERE (_change_type = 'DELETE' or _change_type = 'UPDATE_AFTER') 
AND effective_start BETWEEN '<START_DATETIME>' AND '<END_DATETIME>') 
ORDER BY product_id, effective_start
  1. The query result shows the deleted and updated records in the specified period. You can confirm that the price of Heater was updated and Television was deleted from the table.
+----------+------------+-----+-----------+--------------------+--------------------+--------------------+----------+
|product_id|product_name|price|   category|          updated_at|     effective_start|       effective_end|is_current|
+----------+------------+-----+-----------+--------------------+--------------------+--------------------+----------+
|     00001|      Heater|  250|Electronics|1.7297902833360643E9|2024-10-24 17:18:...|2024-10-24 17:19:...|     false|
|     00001|      Heater|  500|Electronics|1.7297903836233025E9|2024-10-24 17:19:...|                null|      true|
|     00003|  Television|  600|Electronics|1.7297902833360643E9|2024-10-24 17:18:...|2024-10-24 17:19:...|     false|
|     00003|  Television|  600|Electronics|1.7297902833360643E9|2024-10-24 17:19:...|                null|     false|
+----------+------------+-----+-----------+--------------------+--------------------+--------------------+----------+
  1. As another example, run the following query to retrieve the latest records at a specific point in time from the SCD Type-2 table by filtering with is_current = true for current data reporting.
SELECT product_id, product_name, price, category, updated_at
FROM scdt2 WHERE is_current = true ORDER BY product_id
  1. The query result shows the current table records, reflecting the updated price of Heater, the deletion of Television, and the addition of Chair after the initial records.
+----------+------------+-----+-----------+--------------------+
|product_id|product_name|price|   category|          updated_at|
+----------+------------+-----+-----------+--------------------+
|     00001|      Heater|  500|Electronics|1.7297903836233025E9|
|     00002|  Thermostat|  400|Electronics|1.7297902833360643E9|
|     00004|     Blender|  100|Electronics|1.7297902833360643E9|
|     00005| USB charger|   50|Electronics|1.7297902833360643E9|
|     00006|       Chair|   50|  Furniture|1.7297903836233025E9|
+----------+------------+-----+-----------+--------------------+

You have now successfully implemented SCD Type-2 using the change log view. This SCD Type-2 implementation allows you to track the history of table records. For example, you can use it to search for deleted or updated products such as Heater and Chair in a specific period. Additionally, you can retrieve the current table records by querying the SCD Type-2 table with is_current = true. Using Iceberg’s change log view enables you to implement SCD Type-2 without making any changes to the Iceberg table itself. It also eliminates the need for creating or managing an additional table for SCD Type-2.

Clean up

To clean up the resources used in this post, complete the following steps:

  1. Open the Amazon S3 console
  2. Select the S3 bucket aws-glue-assets-<ACCOUNT_ID>-<REGION> where the Notebook file (iceberg_history.ipynb) is stored. Delete the Notebook file that’s in the notebook path.
  3. Select the S3 bucket you created using the CloudFormation template. You can obtain the bucket name from IcebergS3Bucket key on the CloudFormation Outputs tab. After selecting the bucket, choose Empty to delete all objects
  4. After you confirm the bucket is empty, delete the CloudFormation stack iceberg-history-baseline-resources.

Considerations

Here are important considerations:

Conclusion

In this post, we have explored how to look up the history of records and tables using Apache Iceberg. The instruction demonstrated how to use change log view to look up the history of the records, and also the history of the tables with SCD Type-2. With this method, you can manage the history of records and tables without extra effort.


About the Authors

Tomohiro Tanaka is a Senior Cloud Support Engineer at Amazon Web Services. He’s passionate about helping customers use Apache Iceberg for their data lakes on AWS. In his free time, he enjoys a coffee break with his colleagues and making coffee at home.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Modernize your legacy databases with AWS data lakes, Part 2: Build a data lake using AWS DMS data on Apache Iceberg

Post Syndicated from Shaheer Mansoor original https://aws.amazon.com/blogs/big-data/modernize-your-legacy-databases-with-aws-data-lakes-part-2-build-a-data-lake-using-aws-dms-data-on-apache-iceberg/

This is part two of a three-part series where we show how to build a data lake on AWS using a modern data architecture. This post shows how to load data from a legacy database (SQL Server) into a transactional data lake (Apache Iceberg) using AWS Glue. We show how to build data pipelines using AWS Glue jobs, optimize them for both cost and performance, and implement schema evolution to automate manual tasks. To review the first part of the series, where we load SQL Server data into Amazon Simple Storage Service (Amazon S3) using AWS Database Migration Service (AWS DMS), see Modernize your legacy databases with AWS data lakes, Part 1: Migrate SQL Server using AWS DMS.

Solution overview

In this post, we go over the process of building a data lake, providing the rationale behind the different decisions, and share best practices when building such a solution.

The following diagram illustrates the different layers of the data lake.

Overall Architecture

To load data into the data lake, AWS Step Functions can define a workflow, Amazon Simple Queue Service (Amazon SQS) can track the order of incoming files, and AWS Glue jobs and the Data Catalog can be used create the data lake silver layer. AWS DMS produces files and writes these files to the bronze bucket (as we explained in Part 1).

We can turn on Amazon S3 notifications and push the new arriving file names to an SQS first-in-first-out (FIFO) queue. A Step Functions state machine can consume messages from this queue to process the files in the order they arrive.

For processing the files, we need to create two types of AWS Glue jobs:

  • Full load – This job loads the entire table data dump into an Iceberg table. Data types from the source are mapped to an Iceberg data type. After the data is loaded, the job updates the Data Catalog with the table schemas.
  • CDC – This job loads the change data capture (CDC) files into the respective Iceberg tables. The AWS Glue job implements the schema evolution feature of Iceberg to handle schema changes such as addition or deletion of columns.

As in Part 1, the AWS DMS jobs will place the full load and CDC data from the source database (SQL Server) in the raw S3 bucket. Now we process this data using AWS Glue and save it to the silver bucket in Iceberg format. AWS Glue has a plugin for Iceberg; for details, see Using the Iceberg framework in AWS Glue.

Along with moving data from the bronze to the silver bucket, we also create and update the Data Catalog for further processing the data for the gold bucket.

The following diagram illustrates how the full load and CDC jobs are defined inside the Step Functions workflow.

Step Functions for loading data into the lake

In this post, we discuss the AWS Glue jobs for defining the workflow. We recommend using AWS Step Functions Workflow Studio, and setting up Amazon S3 event notifications and an SNS FIFO queue to receive the filename as messages.

Prerequisites

To follow the solution, you need the following prerequisites set up as well as certain access rights and AWS Identity and Access Management (IAM) privileges:

  • An IAM role to run Glue jobs
  • IAM privileges to create AWS DMS resources (this role was created in Part 1 of this series; you can use the same role here)
  • The AWS DMS job from Part 1 working and producing files for the source database on Amazon S3.

Create an AWS Glue connection for the source database

We need to create a connection between AWS Glue and the source SQL Server database so the AWS Glue job can query the source for the latest schema while loading the data files. To create the connection, follow these steps:

  1. On the AWS Glue console, choose Connections in the navigation pane.
  2. Choose Create custom connector.
  3. Give the connection a name and choose JDBC as the connection type.
  4. In the JDBC URL section, enter the following string and replace the name of your source database endpoint and database that was set up in Part 1: jdbc:sqlserver://{Your RDS End Point Name}:1433/{Your Database Name}.
  5. Select Require SSL connection, then choose Create connector.

Clue Connections

Create and configure the full load AWS Glue job

Complete the following steps to create the full load job:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Choose Script editor and select Spark.
  3. Choose Start fresh and select Create script.
  4. Enter a name for the full load job and choose the IAM role (mentioned in the prerequisites) for running the job.
  5. Finish creating the job.
  6. On the Job details tab, expand Advanced properties.
  7. In the Connections section, add the connection you created.
  8. Under Job parameters, pass the following arguments to the job:
    1. target_s3_bucket – The silver S3 bucket name.
    2. source_s3_bucket – The raw S3 bucket name.
    3. secret_id – The ID of the AWS Secrets Manager secret for the source database credentials.
    4. dbname – The source database name.
    5. datalake-formats – This sets the data format to iceberg.

Glue Job Parameters

The full load AWS Glue job starts after the AWS DMS task reaches 100%. The job loops over the files located in the raw S3 bucket and processes them one at time. For each file, the job infers the table name from the file name and gets the source table schema, including column names and primary keys.

If the table has one or more primary keys, the job creates an equivalent Iceberg table. If the job has no primary key, the file is not processed. In our use case, all the tables have primary keys, so we enforce this check. Depending on your data, you might need to handle this scenario differently.

You can use the following code to process the full load files. To start the job, choose Run.

import sys, boto3, json
import boto3
import json
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import SparkSession

#Get the arguments passed to the script
args = getResolvedOptions(sys.argv, ['JOB_NAME',
                           'target_s3_bucket',
                           'secret_id',
                           'source_s3_bucket'])
dbname = "AdventureWorks"
schema = "HumanResources"

#Initialize parameters
target_s3_bucket = args['target_s3_bucket']
source_s3_bucket = args['source_s3_bucket']
secret_id = args['secret_id']
unprocessed_tables = []
drop_column_list = ['db', 'table_name', 'schema_name', 'Op', 'last_update_time']  # DMS added columns

#Helper Function: Get Credentials from Secrets Manager
def get_db_credentials(secret_id):
    secretsmanager = boto3.client('secretsmanager')
    response = secretsmanager.get_secret_value(SecretId=secret_id)
    secrets = json.loads(response['SecretString'])
    return secrets['host'], int(secrets['port']), secrets['username'], secrets['password']

#Helper Function: Load Iceberg table with Primary key(s)
def load_table(full_load_data_df, dbname, table_name):

    try:
        full_load_data_df = full_load_data_df.drop(*drop_column_list)
        full_load_data_df.createOrReplaceTempView('full_data')

        query = """
        CREATE TABLE IF NOT EXISTS glue_catalog.{0}.{1}
        USING iceberg
        LOCATION "s3://{2}/{0}/{1}"
        AS SELECT * FROM full_data
        """.format(dbname, table_name, target_s3_bucket)
        spark.sql(query)
        
        #Update Table property to accept Schema Changes
        spark.sql("""ALTER TABLE glue_catalog.{0}.{1} SET TBLPROPERTIES (
                      'write.spark.accept-any-schema'='true'
                    )""".format(dbname, table_name))
        
    except Exception as ex:
        print(ex)
        failed_table = {"table_name": table_name, "Reason": ex}
        unprocessed_tables.append(failed_table)
        
def get_table_key(host, port, username, password, dbname):
    
    jdbc_url = "jdbc:sqlserver://{0}:{1};databaseName={2}".format(host, port, dbname)
    
    connectionProperties = {
      "user" : username,
      "password" : password
    }
    
    spark.read.jdbc(url=jdbc_url, table='INFORMATION_SCHEMA.TABLE_CONSTRAINTS', properties=connectionProperties).createOrReplaceTempView("TABLE_CONSTRAINTS")
    spark.read.jdbc(url=jdbc_url, table='INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE', properties=connectionProperties).createOrReplaceTempView("CONSTRAINT_COLUMN_USAGE")
    df_table_pkeys = spark.sql("select c.TABLE_NAME, C.COLUMN_NAME as primary_key FROM TABLE_CONSTRAINTS T JOIN CONSTRAINT_COLUMN_USAGE C ON C.CONSTRAINT_NAME=T.CONSTRAINT_NAME WHERE T.CONSTRAINT_TYPE='PRIMARY KEY'")
    return df_table_pkeys


#Setup Spark configuration for reading and writing Iceberg tables
spark = (
    SparkSession.builder
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.glue_catalog.warehouse", "s3://{0}".format(dbname))
    .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
    .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    .getOrCreate()
)


#Initialize MSSQL credentials
host, port, username, password = get_db_credentials(secret_id)

#Initialize primary keys for all tables
df_table_pkeys = get_table_key(host, port, username, password, dbname)

#Read Full load csv files from s3
s3 = boto3.client('s3')
full_load_tables = s3.list_objects_v2(Bucket=source_s3_bucket, Prefix="raw/{0}/{1}".format(args['dbname'], args['schema']))

#Loop over files
for item in full_load_tables['Contents']:
    pkey_list = []
    table_name = item["Key"].split("/")[3].lower()
    print("Table name {0}".format(table_name))
    current_table_df = df_table_pkeys.where(df_table_pkeys.TABLE_NAME == table_name)

    # Only Process tables with at least 1 Primary key
    if not current_table_df.isEmpty():
        for i in current_table_df.collect():
            pkey_list.append(i["primary_key"])
    else:
        failed_table = {"table_name": table_name, "Reason": "No primary key"}
        unprocessed_tables.append(failed_table)
        # ToDo Handle these cases

    full_data_path = "s3://{0}/{1}".format(source_s3_bucket, item['Key'])
    full_load_data_df = (spark
                        .read
                        .option("header", True)
                        .option("inferSchema", True)
                        .option("recursiveFileLookup", "true")
                        .csv(full_data_path)
                        )

    primary_key = ",".join(pkey_list)

    if table_name not in unprocessed_tables:
        load_table(full_load_data_df, dbname, table_name)

When the job is complete, it creates the database and tables in the Data Catalog, as shown in the following screenshot.

Data lake silver layer data

Create and configure the CDC AWS Glue job

The CDC AWS Glue job is created similar to the full load job. As with the full load AWS Glue job, you need to use the source database connection and pass the job parameters with one additional parameter, cdc_file, which contains the location of the CDC file to be processed. Because a CDC file can contain data for multiple tables, the job loops over the tables in a file and loads the table metadata from the source table ( RDS column names).

If the CDC operation is DELETE, the job deletes the records from the Iceberg table. If the CDC operation is INSERT or UPDATE, the job merges the data into the Iceberg table.

You can use the following code to process the CDC files. To start the job, choose Run

import sys
import boto3
import json
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import SparkSession

# Get the arguments passed to the script
args = getResolvedOptions(sys.argv, ['JOB_NAME',
                           'target_s3_bucket',
                           'secret_id',
                           'source_s3_bucket',
                           'cdc_file'])
dbname = "AdventureWorks"
schema = "HumanResources"
target_s3_bucket = args['target_s3_bucket']
source_s3_bucket = args['source_s3_bucket']
secret_id = args['secret_id']
cdc_file = args['cdc_file']
unprocessed_tables = []
drop_column_list = ['db', 'table_name', 'schema_name', 'Op', 'last_update_time']  # DMS added columns
source_s3_cdc_file_key = "raw/AdventureWorks/cdc/" + cdc_file



# Helper Function: Get Credentials from Secrets Manager
def get_db_credentials(secret_id):
    secretsmanager = boto3.client('secretsmanager')
    response = secretsmanager.get_secret_value(SecretId=secret_id)
    secrets = json.loads(response['SecretString'])
    return secrets['host'], int(secrets['port']), secrets['username'], secrets['password']

# Helper Function: Column names from RDS
def get_table_colums(table, host, port, username, password, dbname):

    jdbc_url = "jdbc:sqlserver://{0}:{1};databaseName={2}".format(host, port, dbname)
    
    connectionProperties = {
      "user" : username,
      "password" : password
    }
    
    spark.read.jdbc(url=jdbc_url, table='INFORMATION_SCHEMA.COLUMNS', properties= connectionProperties).createOrReplaceTempView("TABLE_COLUMNS")
    columns = list((row.COLUMN_NAME) for (index, row) in spark.sql("select TABLE_NAME, TABLE_CATALOG, COLUMN_NAME from TABLE_COLUMNS where TABLE_NAME = '{0}' and TABLE_CATALOG = '{1}'".format(table, dbname)).select("COLUMN_NAME").toPandas().iterrows())
    return columns

# Helper Function: Get Colum names and datatypes from RDS
def get_table_colum_datatypes(table, host, port, username, password, dbname):

    jdbc_url = "jdbc:sqlserver://{0}:{1};databaseName={2}".format(host, port, dbname)
    
    connectionProperties = {
      "user" : username,
      "password" : password
    }
    
    spark.read.jdbc(url=jdbc_url, table='INFORMATION_SCHEMA.COLUMNS', properties= connectionProperties).createOrReplaceTempView("TABLE_COLUMNS")
    return spark.sql("select TABLE_NAME, COLUMN_NAME, DATA_TYPE from TABLE_COLUMNS WHERE TABLE_NAME ='{0}'".format(table))

# Helper Function: Setup the primary key condition
def get_iceberg_table_condition(database, tablename):
    
    jdbc_url = "jdbc:sqlserver://{0}:{1};databaseName={2}".format(host, port, database)
    
    connectionProperties = {
      "user" : username,
      "password" : password
    }
    
    spark.read.jdbc(url=jdbc_url, table='INFORMATION_SCHEMA.TABLE_CONSTRAINTS', properties=connectionProperties).createOrReplaceTempView("TABLE_CONSTRAINTS")
    spark.read.jdbc(url=jdbc_url, table='INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE', properties=connectionProperties).createOrReplaceTempView("CONSTRAINT_COLUMN_USAGE")
    
    condition = ''
    
    for key in spark.sql("select C.COLUMN_NAME FROM TABLE_CONSTRAINTS T JOIN CONSTRAINT_COLUMN_USAGE C ON C.CONSTRAINT_NAME=T.CONSTRAINT_NAME WHERE T.CONSTRAINT_TYPE='PRIMARY KEY' AND c.TABLE_NAME = '{0}'".format(table)).collect():
        condition += "target.{0} = source.{0} and".format(key.COLUMN_NAME)
    return condition[:-4]

    
# Read incoming data from Amazon S3
def read_cdc_S3(source_s3_bucket, source_s3_cdc_file_key):
    
    inputDf = (spark
                    .read
                    .option("header", False)
                    .option("inferSchema", True)
                    .option("recursiveFileLookup", "true")
                    .csv("s3://" + source_s3_bucket + "/" + source_s3_cdc_file_key)
                    )
    return inputDf

# Setup Spark configuration for reading and writing Iceberg tables
spark = (
    SparkSession.builder
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.glue_catalog.warehouse", "s3://{0}".format(target_s3_bucket))
    .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
    .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    .getOrCreate()
)

#Initialize MSSQL credentials
host, port, username, password = get_db_credentials(secret_id)

#Read the cdc file 
cdc_df = read_cdc_S3(source_s3_bucket, source_s3_cdc_file_key)

tables = cdc_df.toPandas()._c1.unique().tolist()

#Loop over tables in the cdc file
for table in tables:
    #Create dataframes for delets and for inserts and updates
    table_df_deletes = cdc_df.where((cdc_df._c1 == table) & (cdc_df._c0 == "D")).drop(cdc_df.columns[0], cdc_df.columns[1], cdc_df.columns[2], cdc_df.columns[3])
    table_df_upserts = cdc_df.where((cdc_df._c1 == table) & ((cdc_df._c0 == "I") | (cdc_df._c0 == "U"))).drop(cdc_df.columns[0], cdc_df.columns[1], cdc_df.columns[2], cdc_df.columns[3])
    
    #Update column names for the dataframes
    columns = get_table_colums(table, host, port, username, password, dbname) 
    selectExpr = [] 

    for column in columns: 
        selectExpr.append(cdc_df.where((cdc_df._c1 == table)).drop(cdc_df.columns[0], cdc_df.columns[1], cdc_df.columns[2], cdc_df.columns[3]).columns[columns.index(column)] + " as " + column)

    table_df_deletes = table_df_deletes.selectExpr(selectExpr) 
    table_df_upserts = table_df_upserts.selectExpr(selectExpr)
    
    #Process Deletes
    if table_df_deletes.count() > 0:
        
        print("Delete Triggered")
        table_df_deletes.createOrReplaceTempView('deleted_rows')
        
        sql_string = """MERGE INTO glue_catalog.{0}.{1} target
                        USING (SELECT * FROM deleted_rows) source
                        ON {2}
                        WHEN MATCHED 
                        THEN DELETE""".format(database, table.lower(), get_iceberg_table_condition(database, table.lower()))
        spark.sql(sql_string)
    
    if table_df_upserts.count() > 0:
        print("Upsert triggered")

        #Upsert Records when there are Schema Changes
        if len(table_df_upserts.columns) != len(columns):

            #Handle column deletes
            if len(table_df_upserts.columns) < len(columns):

                drop_columns = list(set(columns) - set(table_df_upserts.columns))

                for drop_column in drop_columns:
                    sql_string = """
                                    ALTER TABLE glue_catalog.{0}.{1}
                                    DROP COLUMN {2}""".format(dbname.lower(), table.lower(), drop_column)
                    spark.sql(sql_string)

            #Handle column additions
            elif len(table_df_upserts.columns) > len(columns):

                column_datatype_df = get_table_colum_datatypes(table, host, port, username, password, dbname)
                add_columns = list(set(table_df_upserts.columns) - set(columns))

                for add_column in add_columns:

                    #Set Iceberg data type
                    data_type = list((row.DATA_TYPE) for (index, row) in column_datatype_df.filter("COLUMN_NAME='{0}'".format(add_column)).select("DATA_TYPE").toPandas().iterrows())[0]

                    # Convert MSSQL Datatypes to Iceberg supported datatypes
                    if data_type.lower() in ["varchar", "char"]:
                        data_type = "string"

                    if data_type.lower() in ["bigint"]:
                        data_type = "long"

                    if data_type.lower() in ["array"]:
                        data_type = "list"

                    sql_string = """
                                    ALTER TABLE glue_catalog.{0}.{1}
                                    ADD COLUMN {2} {3}""".format(dbname.lower(), table.lower(), add_column, data_type)
                    spark.sql(sql_string)
                    
            #Create statement to update columns
            update_table_column_list = ""
            insert_column_list = ""
            columns = get_table_colums(table, host, port, username, password, dbname)             

            for column in columns:

                update_table_column_list+="""target.{0}=source.{0},""".format(column)
                insert_column_list+="""source.{0},""".format(column)

            table_df_upserts.createOrReplaceTempView('updated_rows')

            sql_string = """MERGE INTO glue_catalog.{0}.{1} target
                            USING (SELECT * FROM updated_rows) source
                            ON {2}
                            WHEN MATCHED 
                            THEN UPDATE SET {3} 
                            WHEN NOT MATCHED THEN INSERT ({4}) VALUES ({5})""".format(dbname.lower(), 
                                                                                      table.lower(), 
                                                                                      get_iceberg_table_condition(dbname.lower(), table.lower()), 
                                                                                      update_table_column_list.rstrip(","), 
                                                                                      ",".join(columns), 
                                                                                      insert_column_list.rstrip(","))

            spark.sql(sql_string)

    
print("CDC job complete")

The Iceberg MERGE INTO syntax can handle cases where a new column is added. For more details on this feature, see the Iceberg MERGE INTO syntax documentation. If the CDC job needs to process many tables in the CDC file, the job can be multi-threaded to process the file in parallel.

 

Configure EventBridge notifications, SQS queue, and Step Functions state machine

You can use EventBridge notifications to send notifications to EventBridge when certain events occur on S3 buckets, such as when new objects are created and deleted. For this post, we’re interested in the events when new CDC files from AWS DMS arrive in the bronze S3 bucket. You can create event notifications for new objects and insert the file names into an SQS queue. A Lambda function within Step Functions would consume from the queue, extract the file name, start a CDC Glue job, and pass the file name as a parameter to the job.

AWS DMS CDC files contain database insert, update, and delete statements. We need to process these in order, so we use an SQS FIFO queue, which preserves the order of messages in which they arrive. You can also configure Amazon SQS to set a time to live (TTL); this parameter defines how long a message stays in the queue before it expires.

Another important parameter to consider when configuring an SQS queue is the message visibility timeout value. While a message is being processed, it disappears from the queue to make sure that the message isn’t consumed by multiple consumers (AWS Glue jobs in our case). If the message is consumed successfully, it should be deleted from the queue before the visibility timeout. However, if the visibility timeout expires and the message isn’t deleted, the message reappears in the queue. In our solution, this timeout must be greater than the time it takes for the CDC job to process a file.

Lastly, we recommend using Step Functions to define a workflow for handling the full load and CDC files. Step Functions has built-in integrations to other AWS services like Amazon SQS, AWS Glue, and Lambda, which makes it a good candidate for this use case.

The Step Functions state machine starts with checking the status of the AWS DMS task. The AWS DMS tasks can be queried to check the status of the full load, and we check the value of the parameter FullLoadProgressPercent. When this value gets to 100%, we can start processing the full load files. After the AWS Glue job processes the full load files, we start polling the SQS queue to check the size of the queue. If the queue size is greater than 0, this means new CDC files have arrived and we can start the AWS Glue CDC job to process these files. The AWS Glue jobs processes the CDC files and deletes the messages from the queue. When the queue size reaches 0, the AWS Glue job exits and we loop in the Step Functions workflow to check the SQS queue size.

Because the Step Functions state machine is supposed to run indefinitely, it’s good to keep in mind that there will be service limits you need to adhere to. Namely, the maximum runtime, which is 1 year, and maximum run history size, i.e., state transitions or events for a state machine which is 25,000. We recommend adding an additional step at the end to check if either of these conditions are being met to stop the current state machine run and start a new one.

The following diagram illustrates how you can use Step Functions state machine history size to monitor and start a new Step Functions state machine run.

Step Functions Workflow

Configure the pipeline

The pipeline needs to be configured to address cost, performance, and resilience goals. You might want a pipeline that can load fresh data into the data lake and make it available quickly, and you might also want to optimize costs by loading large chunks of data into the data lake. At the same time, you should make the pipeline resilient and be able to recover in case of failures. In this section, we cover the different parameters and recommended settings to achieve these goals.

Step Functions is designed to process incoming AWS DMS CDC files by running AWS Glue jobs. AWS Glue jobs can take a couple of minutes to boot up, and when they’re running, it’s efficient to process large chunks of data. You can configure AWS DMS to write CSV files to Amazon S3 by configuring the following AWS DMS task parameters:

  • CdcMaxBatchInterval – Defines the maximum time limit AWS DMS will wait before writing a batch to Amazon S3
  • CdcMinFileSize – Defines the minimum file size AWS DMS will write to Amazon S3

Whichever condition is met first will invoke the write operation. If you want to prioritize data freshness, you should have a short CdcMaxBatchInterval value (10 seconds) and a small CdcMinFileSize value (1–5 MB). This will result in many small CSV files being written to Amazon S3 and will invoke a lot of AWS Glue jobs to process the data, making the extract, transform, and load (ETL) process faster. If you want to optimize costs, you should have a moderate CdcMaxBatchInterval (minutes) and a large CdcMinFileSize value (100–500 MB). In this scenario, we start a few AWS Glue jobs that will process large chunks of data, making the ETL flow more efficient. In a real-world use case, the required values for these parameters might fall somewhere that’s a good compromise between throughput and cost. You can configure these parameters when creating a target endpoint using the AWS DMS console, or by using the create-endpoint command in the AWS Command Line Interface (AWS CLI).

For the full list of parameters, see Using Amazon S3 as a target for AWS Database Migration Service.

Choosing the right AWS Glue worker types for the full load and CDC jobs is also crucial for performance and cost optimization. The AWS Glue (Spark) workers range from G1X to G8X, which have an increasing number of data processing units (DPUs). Full load files are usually much larger in size compared to CDC files, and therefore it’s more cost- and performance-effective to select a larger worker. For CDC files, it would be more cost-effective to select a smaller worker because files sizes are smaller.

You should design the Step Functions state machine in such a way that if anything fails, the pipeline can be redeployed after repair and resume processing from where it left off. One important parameter here is TTL for the messages in the SQS queue. This parameter defines how long a message stays in the queue before expiring. In case of failures, we want this parameter to be long enough for us to deploy a fix. Amazon SQS has a maximum of 14 days for a message’s TTL. We recommend setting this to a large enough value to minimize messages being expired in case of pipeline failures.

Clean up

Complete the following steps to clean up the resources you created in this post:

  1. Delete the AWS Glue jobs:
    1. On the AWS Glue console, choose ETL jobs in the navigation pane.
    2. Select the full load and CDC jobs and on the Actions menu, choose Delete.
    3. Choose Delete to confirm.
  2. Delete the Iceberg tables:
    1. On the AWS Glue console, under Data Catalog in the navigation pane, choose Databases.
    2. Choose the database in which the Iceberg tables reside.
    3. Select the tables to delete, choose Delete, and confirm the deletion.
  3. Delete the S3 bucket:
    1. On the Amazon S3 console, choose Buckets in the navigation pane.
    2. Choose the silver bucket and empty the files in the bucket.
    3. Delete the bucket.

Conclusion

In this post, we showed how to use AWS Glue jobs to load AWS DMS files into a transactional data lake framework such as Iceberg. In our setup, AWS Glue provided highly scalable and simple-to-maintain ETL jobs. Furthermore, we share a proposed solution using Step Functions to create an ETL pipeline workflow, with Amazon S3 notifications and an SQS queue to capture newly arriving files. We shared how to design this system to be resilient towards failures and to automate one of the most time-consuming tasks in maintaining a data lake: schema evolution.

In Part 3, we will share how to process the data lake to create data marts.


About the Authors

Shaheer Mansoor is a Senior Machine Learning Engineer at AWS, where he specializes in developing cutting-edge machine learning platforms. His expertise lies in creating scalable infrastructure to support advanced AI solutions. His focus areas are MLOps, feature stores, data lakes, model hosting, and generative AI.

Anoop Kumar K M is a Data Architect at AWS with focus in the data and analytics area. He helps customers in building scalable data platforms and in their enterprise data strategy. His areas of interest are data platforms, data analytics, security, file systems and operating systems. Anoop loves to travel and enjoys reading books in the crime fiction and financial domains.

Sreenivas Nettem is a Lead Database Consultant at AWS Professional Services. He has experience working with Microsoft technologies with a specialization in SQL Server. He works closely with customers to help migrate and modernize their databases to AWS.

The AWS Glue Data Catalog now supports storage optimization of Apache Iceberg tables

Post Syndicated from Sandeep Adwankar original https://aws.amazon.com/blogs/big-data/the-aws-glue-data-catalog-now-supports-storage-optimization-of-apache-iceberg-tables/

The AWS Glue Data Catalog now enhances managed table optimization of Apache Iceberg tables by automatically removing data files that are no longer needed. Along with the Glue Data Catalog’s automated compaction feature, these storage optimizations can help you reduce metadata overhead, control storage costs, and improve query performance.

Iceberg creates a new version called a snapshot for every change to the data in the table. Iceberg has features like time travel and rollback that allow you to query data lake snapshots or roll back to previous versions. As more table changes are made, more data files are created. In addition, any failures during writing to Iceberg tables will create data files that aren’t referenced in snapshots, also known as orphan files. Time travel features, though useful, may conflict with regulations like GDPR that require permanent data deletion. Because time travel allows accessing data through historical snapshots, additional safeguards are needed to maintain compliance with data privacy laws. To control storage costs and comply with regulations, many organizations have created custom data pipelines that periodically expire snapshots in a table that are no longer needed and remove orphan files. However, building these custom pipelines is time-consuming and expensive.

With this launch, you can enable Glue Data Catalog table optimization to include snapshot and orphan data management along with compaction. You can enable this by providing configurations such as a default retention period and maximum days to keep orphan files. The Glue Data Catalog monitors tables daily, removes snapshots from table metadata, and removes the data files and orphan files that are no longer needed. The Glue Data Catalog honors retention policies for Iceberg branches and tags referencing snapshots. You can now get an always-optimized Amazon Simple Storage Service (Amazon S3) layout by automatically removing expired snapshots and orphan files. You can view the history of data, manifest, manifest lists, and orphan files deleted from the table optimization tab on the AWS Glue Data Catalog console.

In this post, we show how to enable managed retention and orphan file deletion on an Apache Iceberg table for storage optimization.

Solution overview

For this post, we use a table called customer in the iceberg_blog_db database, where data is added continuously by a streaming application—around 10,000 records (file size less than 100 KB) every 10 minutes, which includes change data capture (CDC) as well. The customer table data and metadata are stored in the S3 bucket. Because the data is updated and deleted as part of CDC, new snapshots are created for every change to the data in the table.

Managed compaction is enabled on this table for query optimization, which results in new snapshots being created when compaction rewrites several small files into a few compacted files, leaving the old small files in storage. This results in data and metadata in Amazon S3 growing at a rapid pace, which can become cost-prohibitive.

Snapshots are timestamped versions of an iceberg table. Snapshot retention configurations allow customers to enforce how long to retain snapshots and how many snapshots to retain. Configuring a snapshot retention optimizer can help manage storage overhead by removing older, unnecessary snapshots and their underlying files.

Orphan files are files that are no longer referenced by the Iceberg table metadata. These files can accumulate over time, especially after operations like table deletions or failed ETL jobs. Enabling orphan file deletion allows AWS Glue to periodically identify and remove these unnecessary files, freeing up storage.

The following diagram illustrates the architecture.

architecture

In the following sections, we demonstrate how to enable managed retention and orphan file deletion on the AWS Glue managed Iceberg table.

Prerequisite

Have an AWS account. If you don’t have an account, you can create one.

Set up resources with AWS CloudFormation

This post includes a CloudFormation template for a quick setup. You can review and customize it to suit your needs. The template generates the following resources:

  • An S3 bucket to store the dataset, Glue job scripts, and so on
  • Data Catalog database
  • An AWS Glue job that creates and modifies sample customer data in your S3 bucket with a Trigger every 10 mins
  • AWS Identity and Access Management (AWS IAM) roles and policies – glueroleoutput

To launch the CloudFormation stack, complete the following steps:

  1. Sign in to the AWS CloudFormation console.
  2. Choose Launch Stack.
    Launch Cloudformation Stack
  3. Choose Next.
  4. Leave the parameters as default or make appropriate changes based on your requirements, then choose Next.
  5. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  6. Choose Create.

This stack can take around 5-10 minutes to complete, after which you can view the deployed stack on the AWS CloudFormation console.

CFN

Note down the role glueroleouput value that will be used when enabling optimization setup.

From the Amazon S3 console, note the Amazon S3 bucket and you can monitor how the data will be continuously updated every 10 mins with the AWS Glue Job.

S3 buckets

Enable snapshot retention

We want to remove metadata and data files of snapshots older than 1 day and the number of snapshots to retain a maximum of 1. To enable snapshot expiry, you enable snapshot retention on the customer table by setting the retention configuration as shown in the following steps, and AWS Glue will run background operations to perform these table maintenance operations, enforcing these settings one time per day.

  1. Sign in to the AWS Glue console as an administrator.
  2. Under Data Catalog in the navigation pane, choose Tables.
  3. Search for and select the customer table.
  4. On the Actions menu, choose Enable under Optimization.
    GDC table
  5. Specify your optimization settings by selecting Snapshot retention.
  6. Under Optimization configuration, select Customize settings and provide the following:
    1. For IAM role, choose role created as CloudFormation resource.
    2. Set Snapshot retention period as 1 day.
    3. Set Minimum snapshots to retain as 1.
    4. Choose Yes for Delete expire files.
  7. Select the acknowledgement check box and choose Enable.

optimization enable

Alternatively, you can install or update the latest AWS Command Line Interface (AWS CLI) version to run the AWS CLI to enable snapshot retention. For instructions, refer to Installing or updating the latest version of the AWS CLI. Use the following code to enable snapshot retention:

aws glue create-table-optimizer
--catalog-id 112233445566
--database-name iceberg_blog_db
--table-name customer
--table-optimizer-configuration
'{
"roleArn": "arn:aws:iam::112233445566:role/<glueroleoutput>",
"enabled": true,
"retentionConfiguration": {
"icebergConfiguration": {
"snapshotRetentionPeriodInDays": 1,
"numberOfSnapshotsToRetain": 1,
"cleanExpiredFiles": true
}
}
}'
--type retention
--region us-east-1

Enable orphan file deletion

We want to remove metadata and data files that aren’t referenced of snapshots older than 1 day and the number of snapshots to retain a maximum of 1. Complete the steps to enable orphan file deletion on the customer table, and AWS Glue will run background operations to perform these table maintenance operations enforcing these settings one time per day.

  1. Under Optimization configuration, select Customize settings and provide the following:
    1. For IAM role, choose role created as CloudFormation resource.
    2. Set Delete orphan file period as 1 day.
  2. Select the acknowledgement check box and choose Enable.

Alternatively, you can use the AWS CLI to enable orphan file deletion:

aws glue create-table-optimizer
--catalog-id 112233445566
--database-name iceberg_blog_db
--table-name customer
--table-optimizer-configuration
'{
"roleArn": "arn:aws:iam::112233445566:role/<glueroleoutput>",
"enabled": true,
"orphanFileDeletionConfiguration": {
"icebergConfiguration": {
"orphanFileRetentionPeriodInDays": 1
}
}
}'
--type orphan_file_deletion
--region us-east-1

Based on the optimizer configuration, you will start seeing the optimization history in the AWS Glue Data Catalog

runs

Validate the solution

To validate the snapshot retention and orphan file deletion configuration, complete the following steps:

  1. Sign in to the AWS Glue console as an administrator.
  2. Under Data Catalog in the navigation pane, choose Tables.
  3. Search for and choose the customer table.
  4. Choose the Table optimization tab to view the optimization job run history.

runs

Alternatively, you can use the AWS CLI to verify snapshot retention:

aws glue get-table-optimizer --catalog-id 112233445566 --database-name iceberg_blog_db --table-name customer --type retention

You can also use the AWS CLI to verify orphan file deletion:

aws glue get-table-optimizer --catalog-id 112233445566 --database-name iceberg_blog_db --table-name customer --type orphan_file_deletion

Monitor CloudWatch metrics for Amazon S3

The following metrics show a steep increase in the bucket size as streaming of customer data happens along with CDC, leading to an increase in the metadata and data objects as snapshots are created. When snapshot retention (“snapshotRetentionPeriodInDays“: 1, “numberOfSnapshotsToRetain“: 50) and orphan file deletion (“orphanFileRetentionPeriodInDays“: 1) enabled, there is drop in the total bucket size for the customer prefix and the total number of objects as the maintenance takes place, eventually leading to optimized storage.

metrics

Clean up

To avoid incurring future charges, delete the resources you created in the Glue, Data Catalog, and S3 bucket used for storage.

Conclusion

Two of the key features of Iceberg are time travel and rollbacks, allowing you to query data at previous points in time and roll back unwanted changes to your tables. This is facilitated through the concept of Iceberg snapshots, which are a complete set of data files in the table at a point in time. With these new releases, the Data Catalog now provides storage optimizations that can help you reduce metadata overhead, control storage costs, and improve query performance.

To learn more about using the AWS Glue Data Catalog, refer to Optimizing Iceberg Tables.

A special thanks to everyone who contributed to the launch: Sangeet Lohariwala, Arvin Mohanty, Juan Santillan, Sandya Krishnanand, Mert Hocanin, Yanting Zhang and Shyam Rathi.


About the Authors

Sandeep Adwankar is a Senior Product Manager at AWS. Based in the California Bay Area, he works with customers around the globe to translate business and technical requirements into products that enable customers to improve how they manage, secure, and access data.

Srividya Parthasarathy is a Senior Big Data Architect on the AWS Lake Formation team. She enjoys building data mesh solutions and sharing them with the community.

Paul Villena is a Senior Analytics Solutions Architect in AWS with expertise in building modern data and analytics solutions to drive business value. He works with customers to help them harness the power of the cloud. His areas of interests are infrastructure as code, serverless technologies, and coding in Python.

Understanding Apache Iceberg on AWS with the new technical guide

Post Syndicated from Carlos Rodrigues original https://aws.amazon.com/blogs/big-data/understanding-apache-iceberg-on-aws-with-the-new-technical-guide/

We’re excited to announce the launch of the Apache Iceberg on AWS technical guide. Whether you are new to Apache Iceberg on AWS or already running production workloads on AWS, this comprehensive technical guide offers detailed guidance on foundational concepts to advanced optimizations to build your transactional data lake with Apache Iceberg on AWS.

Apache Iceberg is an open source table format that simplifies data processing on large datasets stored in data lakes. It does so by bringing the familiarity of SQL tables to big data and capabilities such as ACID transactions, row-level operations (merge, update, delete), partition evolution, data versioning, incremental processing, and advanced query scanning. Apache Iceberg seamlessly integrates with popular open source big data processing frameworks like Apache Spark, Apache Hive, Apache Flink, Presto, and Trino. It is natively supported by AWS analytics services such as AWS Glue, Amazon EMR, Amazon Athena, and Amazon Redshift.

The following diagram illustrates a reference architecture of a transactional data lake with Apache Iceberg on AWS.

AWS customers and data engineers use the Apache Iceberg table format for its many benefits, as well as for its high performance and reliability at scale to build transactional data lakes and write-optimized solutions with Amazon EMR, AWS Glue, Athena, and Amazon Redshift on Amazon Simple Storage Service (Amazon S3).

We believe Apache Iceberg adoption on AWS will continue to grow rapidly, and you can benefit from this technical guide that delivers productive guidance on working with Apache Iceberg on supported AWS services, best practices on cost-optimization and performance, and effective monitoring and maintenance policies.

Related resources


About the Authors

Carlos Rodrigues is a Big Data Specialist Solutions Architect at AWS. He helps customers worldwide build transactional data lakes on AWS using open table formats like Apache Iceberg and Apache Hudi. He can be reached via LinkedIn.

Imtiaz (Taz) Sayed is the WW Tech Leader for Analytics at AWS. He is an expert on data engineering and enjoys engaging with the community on all things data and analytics. He can be reached via LinkedIn.

Shana Schipers is an Analytics Specialist Solutions Architect at AWS, focusing on big data. She supports customers worldwide in building transactional data lakes using open table formats like Apache Hudi, Apache Iceberg, and Delta Lake on AWS.

Incremental Processing using Netflix Maestro and Apache Iceberg

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/incremental-processing-using-netflix-maestro-and-apache-iceberg-b8ba072ddeeb

by Jun He, Yingyi Zhang, and Pawan Dixit

Incremental processing is an approach to process new or changed data in workflows. The key advantage is that it only incrementally processes data that are newly added or updated to a dataset, instead of re-processing the complete dataset. This not only reduces the cost of compute resources but also reduces the execution time in a significant manner. When workflow execution has a shorter duration, chances of failure and manual intervention reduce. It also improves the engineering productivity by simplifying the existing pipelines and unlocking the new patterns.

In this blog post, we talk about the landscape and the challenges in workflows at Netflix. We will show how we are building a clean and efficient incremental processing solution (IPS) by using Netflix Maestro and Apache Iceberg. IPS provides the incremental processing support with data accuracy, data freshness, and backfill for users and addresses many of the challenges in workflows. IPS enables users to continue to use the data processing patterns with minimal changes.

Introduction

Netflix relies on data to power its business in all phases. Whether in analyzing A/B tests, optimizing studio production, training algorithms, investing in content acquisition, detecting security breaches, or optimizing payments, well structured and accurate data is foundational. As our business scales globally, the demand for data is growing and the needs for scalable low latency incremental processing begin to emerge. There are three common issues that the dataset owners usually face.

  • Data Freshness: Large datasets from Iceberg tables needed to be processed quickly and accurately to generate insights to enable faster product decisions. The hourly processing semantics along with valid–through-timestamp watermark or data signals provided by the Data Platform toolset today satisfies many use cases, but is not the best for low-latency batch processing. Before IPS, the Data Platform did not have a solution for tracking the state and progression of data sets as a single easy to use offering. This has led to a few internal solutions such as Psyberg. These internal libraries process data by capturing the changed partitions, which works only on specific use cases. Additionally, the libraries have tight coupling to the user business logic, which often incurs higher migration costs, maintenance costs, and requires heavy coordination with the Data Platform team.
  • Data Accuracy: Late arriving data causes datasets processed in the past to become incomplete and as a result inaccurate. To compensate for that, ETL workflows often use a lookback window, based on which they reprocess the data in that certain time window. For example, a job would reprocess aggregates for the past 3 days because it assumes that there would be late arriving data, but data prior to 3 days isn’t worth the cost of reprocessing.
  • Backfill: Backfilling datasets is a common operation in big data processing. This requires repopulating data for a historical time period which is before the scheduled processing. The need for backfilling could be due to a variety of factors, e.g. (1) upstream data sets got repopulated due to changes in business logic of its data pipeline, (2) business logic was changed in a data pipeline, (3) anew metric was created that needs to be populated for historical time ranges, (4) historical data was found missing, etc.

These challenges are currently addressed in suboptimal and less cost efficient ways by individual local teams to fulfill the needs, such as

  • Lookback: This is a generic and simple approach that data engineers use to solve the data accuracy problem. Users configure the workflow to read the data in a window (e.g. past 3 hours or 10 days). The window is set based on users’ domain knowledge so that users have a high confidence that the late arriving data will be included or will not matter (i.e. data arrives too late to be useful). It ensures the correctness with a high cost in terms of time and compute resources.
  • Foreach pattern: Users build backfill workflows using Maestro foreach support. It works well to backfill data produced by a single workflow. If the pipeline has multiple stages or many downstream workflows, users have to manually create backfill workflows for each of them and that requires significant manual work.

The incremental processing solution (IPS) described here has been designed to address the above problems. The design goal is to provide a clean and easy to adopt solution for the Incremental processing to ensure data freshness, data accuracy, and to provide easy backfill support.

  • Data Freshness: provide the support for scheduling workflows in a micro batch fashion (e.g. 15 min interval) with state tracking functionality
  • Data Accuracy: provide the support to process all late arriving data to achieve data accuracy needed by the business with significantly improved performance in terms of multifold time and cost efficiency
  • Backfill: provide managed backfill support to build, monitor, and validate the backfill, including automatically propagating changes from upstream to downstream workflows, to greatly improve engineering productivity (i.e. a few days or weeks of engineering work to build backfill workflows vs one click for managed backfill)

Approach Overview

General Concept

Incremental processing is an approach to process data in batch — but only on new or changed data. To support incremental processing, we need an approach for not only capturing incremental data changes but also tracking their states (i.e. whether a change is processed by a workflow or not). It must be aware of the change and can capture the changes from the source table(s) and then keep tracking those changes. Here, changes mean more than just new data itself. For example, a row in an aggregation target table needs all the rows from the source table associated with the aggregation row. Also, if there are multiple source tables, usually the union of the changed data ranges from all input tables gives the full change data set. Thus, change information captured must include all related data including those unchanged rows in the source table as well. Due to previously mentioned complexities, change tracking cannot be simply achieved by using a single watermark. IPS has to track those captured changes in finer granularity.

The changes from the source tables might affect the transformed result in the target table in various ways.

  • If one row in the target table is derived from one row in the source table, newly captured data change will be the complete input dataset for the workflow pipeline.
  • If one row in the target table is derived from multiple rows in the source table, capturing new data will only tell us the rows have to be re-processed. But the dataset needed for ETL is beyond the change data itself. For example, an aggregation based on account id requires all rows from the source table about an account id. The change dataset will tell us which account ids are changed and then the user business logic needs to load all data associated with those account ids found in the change data.
  • If one row in the target table is derived based on the data beyond the changed data set, e.g. joining source table with other tables, newly captured data is still useful and can indicate a range of data to be affected. Then the workflow will re-process the data based on the range. For example, assuming we have a table that keeps the accumulated view time for a given account partitioned by the day. If the view time 3-days ago is updated right now due to late arriving data, then the view time for the following two days has to be re-calculated for this account. In this case, the captured late arriving data will tell us the start of the re-calculation, which is much more accurate than recomputing everything for the past X days by guesstimate, where X is a cutoff lookback window decided by business domain knowledge.

Once the change information (data or range) is captured, a workflow has to write the data to the target table in a slightly more complicated way because the simple INSERT OVERWRITE mechanism won’t work well. There are two alternatives:

  • Merge pattern: In some compute frameworks, e.g. Spark 3, it supports MERGE INTO to allow new data to be merged into the existing data set. That solves the write problem for incremental processing. Note that the workflow/step can be safely restarted without worrying about duplicate data being inserted when using MERGE INTO.
  • Append pattern: Users can also use append only write (e.g. INSERT INTO) to add the new data to the existing data set. Once the processing is completed, the append data is committed to the table. If users want to re-run or re-build the data set, they will run a backfill workflow to completely overwrite the target data set (e.g. INSERT OVERWRITE).

Additionally, the IPS will naturally support the backfill in many cases. Downstream workflows (if there is no business logic change) will be triggered by the data change due to backfill. This enables auto propagation of backfill data in multi-stage pipelines. Note that the backfill support is skipped in this blog. We will talk about IPS backfill support in another following blog post.

Netflix Maestro

Maestro is the Netflix data workflow orchestration platform built to meet the current and future needs of Netflix. It is a general-purpose workflow orchestrator that provides a fully managed workflow-as-a-service (WAAS) to the data platform users at Netflix. It serves thousands of users, including data scientists, data engineers, machine learning engineers, software engineers, content producers, and business analysts, in various use cases. Maestro is highly scalable and extensible to support existing and new use cases and offers enhanced usability to end users.

Since the last blog on Maestro, we have migrated all the workflows to it on behalf of users with minimal interruption. Maestro has been fully deployed in production with 100% workload running on it.

IPS is built upon Maestro as an extension by adding two building blocks, i.e. a new trigger mechanism and step job type, to enable incremental processing for all workflows. It is seamlessly integrated into the whole Maestro ecosystem with minimal onboarding cost.

Apache Iceberg

Iceberg is a high-performance format for huge analytic tables. Iceberg brings the reliability and simplicity of SQL tables to big data, while making it possible for engines like Spark, Trino, Flink, Presto, Hive and Impala to safely work with the same tables, at the same time. It supports expressive SQL, full schema evolution, hidden partitioning, data compaction, and time travel & rollback. In the IPS, we leverage the rich features provided by Apache Iceberg to develop a lightweight approach to capture the table changes.

Incremental Change Capture Design

Using Netflix Maestro and Apache Iceberg, we created a novel solution for incremental processing, which provides the incremental change (data and range) capture in a super lightweight way without copying any data. During our exploration, we see a huge opportunity to improve cost efficiency and engineering productivity using incremental processing.

Here is our solution to achieve incremental change capture built upon Apache Iceberg features. As we know, an iceberg table contains a list of snapshots with a set of metadata data. Snapshots include references to the actual immutable data files. A snapshot can contain data files from different partitions.

Design to achieve incremental change capture built upon Apache Iceberg features

The graph above shows that s0 contains data for Partition P0 and P1 at T1. Then at T2, a new snapshot s1 is committed to the table with a list of new data files, which includes late arriving data for partition P0 and P1 and data for P2.

We implemented a lightweight approach to create an iceberg table (called ICDC table), which has its own snapshot but only includes the new data file references from the original table without copying the data files. It is highly efficient with a low cost. Then workflow pipelines can just load the ICDC table to process only the change data from partition P0, P1, P2 without reprocessing the unchanged data in P0 and P1. Meanwhile, the change range is also captured for the specified data field as the Iceberg table metadata contains the upper and lower bound information of each data field for each data file. Moreover, IPS will track the changes in data file granularity for each workflow.

This lightweight approach is seamlessly integrated with Maestro to allow all (thousands) scheduler users to use this new building block (i.e. incremental processing) in their tens of thousands of workflows. Each workflow using IPS will be injected with a table parameter, which is the table name of the lightweight ICDC table. The ICDC table contains only the change data. Additionally, if the workflow needs the change range, a list of parameters will be injected to the user workflow to include the change range information. The incremental processing can be enabled by a new step job type (ICDC) and/or a new incremental trigger mechanism. Users can use them together with all existing Maestro features, e.g. foreach patterns, step dependencies based on valid–through-timestamp watermark, write-audit-publish templatized pattern, etc.

Main Advantages

With this design, user workflows can adopt incremental processing with very low efforts. The user business logic is also decoupled from the IPS implementation. Multi-stage pipelines can also mix the incremental processing workflows with existing normal workflows. We also found that user workflows can be simplified after using IPS by removing additional steps to handle the complexity of the lookback window or calling some internal libraries.

Adding incremental processing features into Netflix Maestro as new features/building blocks for users will enable users to build their workflows in a much more efficient way and bridge the gaps to solve many challenging problems (e.g. dealing with late arriving data) in a much simpler way.

Emerging Incremental Processing Patterns

While onboarding user pipelines to IPS, we have discovered a few incremental processing patterns:

Incrementally process the captured incremental change data and directly append them to the target table

Incrementally process the captured incremental change data and directly append them to the target table

This is the straightforward incremental processing use case, where the change data carries all the information needed for the data processing. Upstream changes (usually from a single source table) are propagated to the downstream (usually another target table) and the workflow pipeline only needs to process the change data (might join with other dimension tables) and then merge into (usually append) to the target table. This pattern will replace lookback window patterns to take care of late arriving data. Instead of overwriting past X days of data completely by using a lookback window pattern, user workflows just need to MERGE the change data (including late arriving data) into the target table by processing the ICDC table.

Use captured incremental change data as the row level filter list to remove unnecessary transformation

Use captured incremental change data as the row level filter list to remove unnecessary transformation

ETL jobs usually need to aggregate data based on certain group-by keys. Change data will disclose all the group-by keys that require a re-aggregation due to the new landing data from the source table(s). Then ETL jobs can join the original source table with the ICDC table on those group-by keys by using ICDC as a filter to speed up the processing to enable calculations of a much smaller set of data. There is no change to business transform logic and no re-design of ETL workflow. ETL pipelines keep all the benefits of batch workflows.

Use the captured range parameters in the business logic

Use the captured range parameters in the business logic

This pattern is usually used in complicated use cases, such as joining multiple tables and doing complex processings. In this case, the change data do not give the full picture of the input needed by the ETL workflow. Instead, the change data indicates a range of changed data sets for a specific set of fields (might be partition keys) in a given input table or usually multiple input tables. Then, the union of the change ranges from all input tables gives the full change data set needed by the workflow. Additionally, the whole range of data usually has to be overwritten because the transformation is not stateless and depends on the outcome result from the previous ranges. Another example is that the aggregated record in the target table or window function in the query has to be updated based on the whole data set in the partition (e.g. calculating a medium across the whole partition). Basically, the range derived from the change data indicates the dataset to be re-processed.

Use cases

Data workflows at Netflix usually have to deal with late arriving data which is commonly solved by using lookback window pattern due to its simplicity and ease of implementation. In the lookback pattern, the ETL pipeline will always consume the past X number of partition data from the source table and then overwrite the target table in every run. Here, X is a number decided by the pipeline owners based on their domain expertise. The drawback is the cost of computation and execution time. It usually costs almost X times more than the pipeline without considering late arriving data. Given the fact that the late arriving data is sparse, the majority of the processing is done on the data that have been already processed, which is unnecessary. Also, note that this approach is based on domain knowledge and sometimes is subject to changes of the business environment or the domain expertise of data engineers. In certain cases, it is challenging to come up with a good constant number.

Below, we will use a two-stage data pipeline to illustrate how to rebuild it using IPS to improve the cost efficiency. We will observe a significant cost reduction (> 80%) with little changes in the business logic. In this use case, we will set the lookback window size X to be 14 days, which varies in different real pipelines.

Original Data Pipeline with Lookback Window

Original data pipeline with lookback window
  • playback_table: an iceberg table holding playback events from user devices ingested by streaming pipelines with late arriving data, which is sparse, only about few percents of the data is late arriving.
  • playback_daily_workflow: a daily scheduled workflow to process the past X days playback_table data and write the transformed data to the target table for the past X days
  • playback_daily_table: the target table of the playback_daily_workflow and get overwritten every day for the past X days
  • playback_daily_agg_workflow: a daily scheduled workflow to process the past X days’ playback_daily_table data and write the aggregated data to the target table for the past X days
  • playback_daily_agg_table: the target table of the playback_daily_agg_workflow and get overwritten every day for the past 14 days.

We ran this pipeline in a sample dataset using the real business logic and here is the average execution result of sample runs

  • The first stage workflow takes about 7 hours to process playback_table data
  • The second stage workflow takes about 3.5 hours to process playback_daily_table data

New Data Pipeline with Incremental Processing

Using IPS, we rewrite the pipeline to avoid re-processing data as much as possible. The new pipeline is shown below.

New data pipeline with incremental processing

Stage 1:

  • ips_playback_daily_workflow: it is the updated version of playback_daily_workflow.
  • The workflow spark sql job then reads an incremental change data capture (ICDC) iceberg table (i.e. playback_icdc_table), which only includes the new data added into the playback_table. It includes the late arriving data but does not include any unchanged data from playback_table.
  • The business logic will replace INSERT OVERWRITE by MERGE INTO SQL query and then the new data will be merged into the playback_daily_table.

Stage 2:

  • IPS captures the changed data of playback_daily_table and also keeps the change data in an ICDC source table (playback_daily_icdc_table). So we don’t need to hard code the lookback window in the business logic. If there are only Y days having changed data in playback_daily_table, then it only needs to load data for Y days.
  • In ips_playback_daily_agg_workflow, the business logic will be the same for the current day’s partition. We then need to update business logic to take care of late arriving data by
  • JOIN the playback_daily table with playback_daily_icdc_table on the aggregation group-by keys for the past 2 to X days, excluding the current day (i.e. day 1)
  • Because late arriving data is sparse, JOIN will narrow down the playback_daily_table data set so as to only process a very small portion of it.
  • The business logic will use MERGE INTO SQL query then the change will be propagated to the downstream target table
  • For the current day, the business logic will be the same and consume the data from playback_daily_table and then write the outcome to the target table playback_daily_agg_table using INSERT OVERWRITE because there is no need to join with the ICDC table.

With these small changes, the data pipeline efficiency is greatly improved. In our sample run,

  • The first stage workflow takes just about 30 minutes to process X day change data from playback_table.
  • The second stage workflow takes about 15 minutes to process change data between day 2 to day X from playback_daily_table by joining with playback_daily_cdc_table data and takes another 15 minutes to process the current day (i.e. day 1) playback_daily_table change data.

Here the spark job settings are the same in original and new pipelines. So in total, the new IPS based pipeline overall needs around 10% of resources (measured by the execution time) to finish.

Looking Forward

We will improve IPS to support more complicated cases beyond append-only cases. IPS will be able to keep track of the progress of the table changes and support multiple Iceberg table change types (e.g. append, overwrite, etc.). We will also add managed backfill support into IPS to help users to build, monitor, and validate the backfill.

We are taking Big Data Orchestration to the next level and constantly solving new problems and challenges, please stay tuned. If you are motivated to solve large scale orchestration problems, please join us.

Acknowledgements

Thanks to our Product Manager Ashim Pokharel for driving the strategy and requirements. We’d also like to thank Andy Chu, Kyoko Shimada, Abhinaya Shetty, Bharath Mummadisetty, John Zhuge, Rakesh Veeramacheneni, and other stunning colleagues at Netflix for their suggestions and feedback while developing IPS. We’d also like to thank Prashanth Ramdas, Eva Tse, Charles Smith, and other leaders of Netflix engineering organizations for their constructive feedback and suggestions on the IPS architecture and design.


Incremental Processing using Netflix Maestro and Apache Iceberg was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Migrate an existing data lake to a transactional data lake using Apache Iceberg

Post Syndicated from Rajdip Chaudhuri original https://aws.amazon.com/blogs/big-data/migrate-an-existing-data-lake-to-a-transactional-data-lake-using-apache-iceberg/

A data lake is a centralized repository that you can use to store all your structured and unstructured data at any scale. You can store your data as-is, without having to first structure the data and then run different types of analytics for better business insights. Over the years, data lakes on Amazon Simple Storage Service (Amazon S3) have become the default repository for enterprise data and are a common choice for a large set of users who query data for a variety of analytics and machine leaning use cases. Amazon S3 allows you to access diverse data sets, build business intelligence dashboards, and accelerate the consumption of data by adopting a modern data architecture or data mesh pattern on Amazon Web Services (AWS).

Analytics use cases on data lakes are always evolving. Oftentimes, you want to continuously ingest data from various sources into a data lake and query the data concurrently through multiple analytics tools with transactional capabilities. But traditionally, data lakes built on Amazon S3 are immutable and don’t provide the transactional capabilities needed to support changing use cases. With changing use cases, customers are looking for ways to not only move new or incremental data to data lakes as transactions, but also to convert existing data based on Apache Parquet to a transactional format. Open table formats, such as Apache Iceberg, provide a solution to this issue. Apache Iceberg enables transactions on data lakes and can simplify data storage, management, ingestion, and processing.

In this post, we show you how you can convert existing data in an Amazon S3 data lake in Apache Parquet format to Apache Iceberg format to support transactions on the data using Jupyter Notebook based interactive sessions over AWS Glue 4.0.

Existing Parquet to Iceberg migration

There are two broad methods to migrate the existing data in a data lake in Apache Parquet format to Apache Iceberg format to convert the data lake to a transactional table format.

In-place data upgrade

In an in-place data migration strategy, existing datasets are upgraded to Apache Iceberg format without first reprocessing or restating existing data. This means the data files in the data lake aren’t modified during the migration and all Apache Iceberg metadata files (manifests, manifest files, and table metadata files) are generated outside the purview of the data. In this method, the metadata are recreated in an isolated environment and colocated with the existing data files. This can be a much less expensive operation compared to rewriting all the data files. The existing data file format must be Apache Parquet, Apache ORC, or Apache Avro. An in-place migration can be performed in either of two ways:

  1. Using add_files: This procedure adds existing data files to an existing Iceberg table with a new snapshot that includes the files. Unlike migrate or snapshot, add_files can import files from a specific partition or partitions and doesn’t create a new Iceberg table. This procedure doesn’t analyze the schema of the files to determine if they match the schema of the Iceberg table. Upon completion, the Iceberg table treats these files as if they are part of the set of files owned by Apache Iceberg.
  2. Using migrate: This procedure replaces a table with an Apache Iceberg table loaded with the source’s data files. The table’s schema, partitioning, properties, and location are copied from the source table. Supported formats are Avro, Parquet, and ORC. By default, the original table is retained with the name table_BACKUP_. However, to leave the original table intact during the process, you must use snapshot to create a new temporary table that has the same source data files and schema.

In this post, we show you how you can use the Iceberg add_files procedure for an in-place data upgrade. Note that the migrate procedure isn’t supported in AWS Glue Data Catalog.

The following diagram shows a high-level representation.

CTAS migration of data

The create table as select (CTAS) migration approach is a technique where all the metadata information for Iceberg is generated along with restating all the data files. This method shadows the source dataset in batches. When the shadow is caught up, you can swap the shadowed dataset with the source.

The following diagram showcases the high-level flow.

Prerequisites

To follow along with the walkthrough, you must have the following:

You can check the data size using the following code in the AWS CLI or AWS CloudShell:

//Run this command to check the data size

aws s3 ls --summarize --human-readable --recursive s3://noaa-ghcn-pds/parquet/by_year/YEAR=2023

As of writing this post, there are 107 objects with total size of 70 MB for year 2023 in the Amazon S3 path.

Note that to implement the solution, you must complete a few preparatory steps.

Deploy resources using AWS CloudFormation

Complete the following steps to create the S3 bucket and the AWS IAM role and policy for the solution:

  1. Sign in to your AWS account and then choose Launch Stack to launch the CloudFormation template.

  1. For Stack name, enter a name.
  2. Leave the parameters at the default values. Note that if the default values are changed, then you must make corresponding changes throughout the following steps.
  3. Choose Next to create your stack.

This AWS CloudFormation template deploys the following resources:

  • An S3 bucket named demo-blog-post-XXXXXXXX (XXXXXXXX represents the AWS account ID used).
  • Two folders named parquet and iceberg under the bucket.
  • An IAM role and a policy named demoblogpostrole and demoblogpostscoped respectively.
  • An AWS Glue database named ghcn_db.
  • An AWS Glue Crawler named demopostcrawlerparquet.

After the the AWS CloudFormation template is successfully deployed:

  1. Copy the data in the created S3 bucket using following command in AWS CLI or AWS CloudShell. Replace XXXXXXXX appropriately in the target S3 bucket name.
    Note: In the example, we copy data only for the year 2023. However, you can work with the entire dataset, following the same instructions.

    aws s3 sync s3://noaa-ghcn-pds/parquet/by_year/YEAR=2023/ s3://demo-blog-post-XXXXXXXX/parquet/year=2023

  2. Open the AWS Management Console and go to the AWS Glue console.
  3. On the navigation pane, select Crawlers.
  4. Run the crawler named demopostcrawlerparquet.
  5. After the AWS Glue crawler demopostcrawlerparquet is successfully run, the metadata information of the Apache Parquet data will be cataloged under the ghcn_db AWS Glue database with the table name source_parquet. Notice that the table is partitioned over year and element columns (as in the S3 bucket).

  1. Use the following query to verify the data from the Amazon Athena console. If you’re using Amazon Athena for the first time in your AWS Account, set up a query result location in Amazon S3.
    SELECT * FROM ghcn_db.source_parquet limit 10;

Launch an AWS Glue Studio notebook for processing

For this post, we use an AWS Glue Studio notebook. Follow the steps in Getting started with notebooks in AWS Glue Studio to set up the notebook environment. Launch the notebooks hosted under this link and unzip them on a local workstation.

  1. Open AWS Glue Studio.
  2. Choose ETL Jobs.
  3. Choose Jupyter notebook and then choose Upload and edit an existing notebook. From Choose file, select required ipynb file and choose Open, then choose Create.
  4. On the Notebook setup page, for Job name, enter a logical name.
  5. For IAM role, select demoblogpostrole. Choose Create job. After a minute, the Jupyter notebook editor appears. Clear all the default cells.

The preceding steps launch an AWS Glue Studio notebook environment. Make sure you Save the notebook every time it’s used.

In-place data upgrade

In this section we show you how you can use the add_files procedure to achieve an in-place data upgrade. This section uses the ipynb file named demo-in-place-upgrade-addfiles.ipynb. To use with the add_files procedure, complete the following:

  1. On the Notebook setup page, for Job name, enter demo-in-place-upgrade for the notebook session as explained in Launch Glue notebook for processing.
  2. Run the cells under the section Glue session configurations. Provide the S3 bucket name from the prerequisites for the bucket_name variable by replacing XXXXXXXX.
  3. Run the subsequent cells in the notebook.

Notice that the cell under Execute add_files procedure section performs the metadata creation in the mentioned path.

Review the data file paths for the new Iceberg table. To show an Iceberg table’s current data files, .files can be used to get details such as file_path, partition, and others. Recreated files are pointing to the source path under Amazon S3.

Note the metadata file location after transformation. It’s pointing to the new folder named iceberg under Amazon S3. This can be checked using .snapshots to check Iceberg tables’ snapshot file location. Also, check the same in the Amazon S3 URI s3://demo-blog-post-XXXXXXXX/iceberg/ghcn_db.db/target_iceberg_add_files/metadata/. Also notice that there are two versions of the manifest list created after the add_files procedure has been run. The first is an empty table with the data schema and the second is adding the files.

The table is cataloged in AWS Glue under the database ghcn_db with the table type as ICEBERG.

Compare the count of records using Amazon Athena between the source and target table. They are the same.

In summary, you can use the add_files procedure to convert existing data files in Apache Parquet format in a data lake to Apache Iceberg format by adding the metadata files and without recreating the table from scratch. Following are some pros and cons of this method.

Pros

  • Avoids full table scans to read the data as there is no restatement. This can save time.
  • If there are any errors during while writing the metadata, only a metadata re-write is required and not the entire data.
  • Lineage of the existing jobs is maintained because the existing catalog still exists.

Cons

  • If data is processed (inserts, updates, and deletes) in the dataset during the metadata writing process, the process must be run again to include the new data.
  • There must be write downtime to avoid having to run the process a second time.
  • If a data restatement is required, this workflow will not work as source data files aren’t modified.

CTAS migration of data

This section uses the ipynb file named demo-ctas-upgrade.ipynb. Complete the following:

  1. On the Notebook setup page, for Job name, enter demo-ctas-upgrade for the notebook session as explained under Launch Glue notebook for processing.
  2. Run the cells under the section Glue session configurations. Provide the S3 bucket name from the prerequisites for the bucket_name variable by replacing XXXXXXXX.
  3. Run the subsequent cells in the notebook.

Notice that the cell under Create Iceberg table from Parquet section performs the shadow upgrade to Iceberg format. Note that Iceberg requires sorting the data according to table partitions before writing to the Iceberg table. Further details can be found in Writing Distribution Modes.

Notice the data and metadata file paths for the new Iceberg table. It’s pointing to the new path under Amazon S3. Also, check under the Amazon S3 URI s3://demo-blog-post-XXXXXXXX/iceberg/ghcn_db.db/target_iceberg_ctas/ used for this post.

The table is cataloged under AWS Glue under the database ghcn_db with the table type as ICEBERG.

Compare the count of records using Amazon Athena between the source and target table. They are same.

In summary, the CTAS method creates a new table by generating all the metadata files along with restating the actual data. Following are some pros and cons of this method:

Pros

  • It allows you to audit and validate the data during the process because data is restated.
  • If there are any runtime issues during the migration process, rollback and recovery can be easily performed by deleting the Apache Iceberg table.
  • You can test different configurations when migrating a source. You can create a new table for each configuration and evaluate the impact.
  • Shadow data is renamed to a different directory in the source (so it doesn’t collide with old Apache Parquet data).

Cons

  • Storage of the dataset is doubled during the process as both the original Apache Parquet and new Apache Iceberg tables are present during the migration and testing phase. This needs to be considered during cost estimation.
  • The migration can take much longer (depending on the volume of the data) because both data and metadata are written.
  • It’s difficult to keep tables in sync if there changes to the source table during the process.

Clean up

To avoid incurring future charges, and to clean up unused roles and policies, delete the resources you created: the datasets, CloudFormation stack, S3 bucket, AWS Glue job, AWS Glue database, and AWS Glue table.

Conclusion

In this post, you learned strategies for migrating existing Apache Parquet formatted data to Apache Iceberg in Amazon S3 to convert to a transactional data lake using interactive sessions in AWS Glue 4.0 to complete the processes. If you have an evolving use case where an existing data lake needs to be converted to a transactional data lake based on Apache Iceberg table format, follow the guidance in this post.

The path you choose for this upgrade, an in-place upgrade or CTAS migration, or a combination of both, will depend on careful analysis of the data architecture and data integration pipeline. Both pathways have pros and cons, as discussed. At a high level, this upgrade process should go through multiple well-defined phases to identify the patterns of data integration and use cases. Choosing the correct strategy will depend on your requirements—such as performance, cost, data freshness, acceptable downtime during migration, and so on.


About the author

Rajdip Chaudhuri is a Senior Solutions Architect with Amazon Web Services specializing in data and analytics. He enjoys working with AWS customers and partners on data and analytics requirements. In his spare time, he enjoys soccer and movies.

Apache Iceberg optimization: Solving the small files problem in Amazon EMR

Post Syndicated from Avijit Goswami original https://aws.amazon.com/blogs/big-data/apache-iceberg-optimization-solving-the-small-files-problem-in-amazon-emr/

In our previous post Improve operational efficiencies of Apache Iceberg tables built on Amazon S3 data lakes, we discussed how you can implement solutions to improve operational efficiencies of your Amazon Simple Storage Service (Amazon S3) data lake that is using the Apache Iceberg open table format and running on the Amazon EMR big data platform. Iceberg tables store metadata in manifest files. As the number of data files increase, the amount of metadata stored in these manifest files also increases, leading to longer query planning time. The query runtime also increases because it’s proportional to the number of data or metadata file read operations. Compaction is the process of combining these small data and metadata files to improve performance and reduce cost. Compaction also gets rid of deleting files by applying deletes and rewriting a new file without deleting records. Currently, Iceberg provides a compaction utility that compacts small files at a table or partition level. But this approach requires you to implement the compaction job using your preferred job scheduler or manually triggering the compaction job.

In this post, we discuss the new Iceberg feature that you can use to automatically compact small files while writing data into Iceberg tables using Spark on Amazon EMR or Amazon Athena.

Use cases for processing small files

Streaming applications are prone to creating a large number of small files, which can negatively impact the performance of subsequent processing times. For example, consider a critical Internet of Things (IoT) sensor from a cold storage facility that is continuously sending temperature and health data into an S3 data lake for downstream data processing and triggering actions like emergency maintenance. Systems of this nature generate a huge number of small objects and need attention to compact them to a more optimal size for faster reading, such as 128 MB, 256 MB, or 512 MB. In this post, we show you a streaming sensor data use case with a large number of small files and the mitigation steps using the Iceberg open table format. For more information on streaming applications on AWS, refer to Real-time Data Streaming and Analytics.

Streaming Architecture

Solution overview

To compact the small files for improved performance, in this example, Amazon EMR triggers a compaction job after the write commit as a post-commit hook when defined thresholds (for example, number of commits) are met. By default, Amazon EMR waits for 10 commits to trigger the post-commit hook compaction utility.

This Iceberg event-based table management feature lets you monitor table activities during writes to make better decisions about how to manage each table differently based on events. As of this writing, only the optimize-data optimization is supported. To learn more about the available optimize data executors and catalog properties, refer to the README file in the GitHub repo.

To use the feature, you can use the iceberg-aws-event-based-table-management source code and provide the built JAR in the engine’s class-path. The following bootstrap action can place the JAR in the engine’s class-path:

sudo aws s3 cp s3://<path>/iceberg-aws-event-based-table-management-0.1.jar /usr/lib/spark/jars/

Note that the Iceberg AWS event-based table management feature works with Iceberg v1.2.0 and above (available from Amazon EMR 6.11.0).

In some use cases, you may want to run the event-based compaction jobs in a different EMR cluster in order to avoid any impact to the ETL jobs running in their current EMR cluster. You can get the metadata, including the cluster ID of your current ETL workflows, from the /mnt/var/lib/info/job-flow.json file and then use a different cluster to process the event-based compactions.

The notebook examples shown in the following sections are also available in the aws-samples GitHub repo.

Prerequisite

For this performance comparison exercise between a Spark external table and an Iceberg table and Iceberg with compaction, we generate a significant number of small files in Parquet format and store them in an S3 bucket. We used the Amazon Kinesis Data Generator (KDG) tool to generate sample sensor data information using the following template:

{"sensorId": {{random.number(5000)}},
 "currentTemperature": {{random.number(
        {
            "min":10,
            "max":150
        }
  )}},
 "status": "{{random.arrayElement(
        ["OK","FAIL","WARN"]
    )}}",
 "date_ts": "{{date.now("YYYY-MM-DD HH:mm:ss")}}"
}

We configured an Amazon Kinesis Data Firehose delivery stream and sent the generated data into a staging S3 bucket. Then we ran an AWS Glue extract, transform, and load (ETL) job to convert the JSON files into Parquet format. For our testing, we generated about 58,176 small objects with total size of 2 GB.

For running the Amazon EMR tests, we used Amazon EMR version emr-6.11.0 with Spark 3.3.2, and JupyterEnterpriseGateway 2.6.0. The cluster used had one primary node (r5.2xlarge) and two core nodes (r5.xlarge). We used a bootstrap action during cluster creation to enable event-based table management:

sudo aws s3 cp s3://<path>/iceberg-aws-event-based-table-management-0.1.jar /usr/lib/spark/jars/

Also, refer to our guidance on how to use an Iceberg cluster with Spark, which is a prerequisite for this exercise.

As part of the exercise, we see new steps are being added to the EMR cluster to trigger the compaction jobs. To enable adding new steps to the running cluster, we add the elasticmapreduce:AddJobFlowSteps action to the cluster’s default role, EMR_EC2_DefaultRole, as a prerequisite.

Performance of Iceberg reads with the compaction utility on Amazon EMR

In the following steps, we demonstrate how to use the compaction utility and what performance benefits you can achieve. We use an EMR notebook to demonstrate the benefits of the compaction utility. For instructions to set up an EMR notebook, refer to Amazon EMR Studio overview.

First, you configure your Spark session using the %%configure magic command. We use the Hive catalog for Iceberg tables.

  1. Before you run the following step, create an Amazon S3 bucket in your AWS account called <your-iceberg-storage-blog>. To check how to create an Amazon S3 bucket, follow the instructions given here. Update the your-iceberg-storage-blog bucket name in the following configuration with the actual bucket name you created to test this example:
    %%configure -f
    {
    "conf":{
        "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
        "spark.sql.catalog.dev":"org.apache.iceberg.spark.SparkCatalog",
        "spark.sql.catalog.dev.catalog-impl":"org.apache.iceberg.aws.glue.GlueCatalog",
        "spark.sql.catalog.dev.io-impl":"org.apache.iceberg.aws.s3.S3FileIO",
        "spark.sql.catalog.dev.warehouse":"s3://<your-iceberg-storage-blog>/iceberg/"
        }
    }

  2. Create a new database for the Iceberg table in the AWS Glue Data Catalog named DB and provide the S3 URI specified in the Spark config as s3://<your-iceberg-storage-blog>/iceberg/db. Also, create another Database named iceberg_db in Glue for the parquet tables. Follow the instructions given in Working with databases on the AWS Glue console to create your Glue databases. Then create a new Spark table in Parquet format pointing to the bucket containing small objects in your AWS account. See the following code:
    spark.sql(""" CREATE TABLE iceberg_db.sensor_data_parquet_table (
        sensorid int,
        currenttemperature int,
        status string,
        date_ts timestamp)
    USING parquet
    location 's3://<your-bucket-with-parquet-files>/'
    """)

  3. Run an aggregate SQL to measure the performance of Spark SQL on the Parquet table with 58,176 small objects:
    spark.sql(""" select maxtemp, mintemp, avgtemp from
    (select
    max(currenttemperature) as maxtemp,
    min(currenttemperature) as mintemp,
    avg(currenttemperature) as avgtemp
    from iceberg_db.sensor_data_parquet_table
    where month(date_ts) between 2 and 10
    order by maxtemp, mintemp, avgtemp)""").show()

In the following steps, we create a new Iceberg table from the Spark/Parquet table using CTAS (Create Table As Select). Then we show how the automated compaction job can help improve query performance.

  1. Create a new Iceberg table using CTAS from the earlier AWS Glue table with the small files:
    spark.sql(""" CREATE TABLE dev.db.sensor_data_iceberg_format USING iceberg AS (SELECT * FROM iceberg_db.sensor_data_parquet_table)""")

  2. Validate that a new Iceberg snapshot was created for the new table:
    spark.sql(""" Select * from dev.db.sensor_data_iceberg_format.snapshots limit 5""").show()

We have confirmed that our S3 folder corresponds to the newly created Iceberg table. It shows that during the CTAS statement, it added 1,879 objects in the new folder with a total size of 1.3 GB. We can conclude that Iceberg did some optimization while loading data from the Parquet table.

  1. Now that you have data in the Iceberg table, run the previous aggregation SQL to check the runtime:
    spark.sql(""" select maxtemp, mintemp, avgtemp from
    (select
    max(currenttemperature) as maxtemp,
    min(currenttemperature) as mintemp,
    avg(currenttemperature) as avgtemp
    from dev.db.sensor_data_iceberg_format
    where month(date_ts) between 2 and 10
    order by maxtemp, mintemp, avgtemp)""").show()

The runtime for the preceding query ran on the Iceberg table with 1,879 objects in 1 minute, 39 seconds. There is already some significant performance improvement by converting the external Parquet table to an Iceberg table.

  1. Now let’s add the configurations needed to apply the automatic compaction of small files in the Iceberg tables. Note the last four newly added configurations in the following statement. The parameter optimize-data.commit-threshold suggests that the compaction will take place after the first successful commit. The default is 10 successful commits to trigger the compaction.
    %%configure -f
    {
    "conf":{
        "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
        "spark.sql.catalog.dev":"org.apache.iceberg.spark.SparkCatalog",
        "spark.sql.catalog.dev.catalog-impl":"org.apache.iceberg.aws.glue.GlueCatalog",
        "spark.sql.catalog.dev.io-impl":"org.apache.iceberg.aws.s3.S3FileIO",
        "spark.sql.catalog.dev.warehouse":"s3://<your-iceberg-storage-blog>/iceberg/",
        "spark.sql.catalog.dev.metrics-reporter-impl":"org.apache.iceberg.aws.manage.AwsTableManagementMetricsEvaluator",
        "spark.sql.catalog.dev.optimize-data.impl":"org.apache.iceberg.aws.manage.EmrOnEc2OptimizeDataExecutor",
        "spark.sql.catalog.dev.optimize-data.emr.cluster-id":"j-1N8J5NZI0KEU3",
        "spark.sql.catalog.dev.optimize-data.commit-threshold":"1"
        }
    }

  2. Run a quick sanity check to confirm that the configurations are working fine with Spark SQL.

  1. 10. To activate the automatic compaction process, add a new record to the existing Iceberg table using a Spark insert:
    spark.sql(""" Insert into dev.db.sensor_data_iceberg_format values(999123, 86, 'PASS', timestamp'2023-07-26 12:50:25') """)

  2. Navigate to the Amazon EMR console to check the cluster steps.

You should see a new step added that goes from Pending to Running and finally the Completed state. Every time the data in the Iceberg table is updated or inserted, based on configuration optimize-data.commit-threshold, the optimize job will automatically trigger to compact the underlying data.

  1. Validate that the record insert was successful.

  1. Check the snapshot table to see that a new snapshot is created for the table with the operation replace.

For every successful run of the background optimize job, a new entry will be added to the snapshot table.

  1. On the Amazon S3 console, navigate to the folder corresponding to the Iceberg table and see that the data files are compacted.

In our case, it was compacted from the previous smaller sizes to approximately 437 MB. The folder will still contain the previous smaller files for time travel unless you issue an expire snapshot command to remove them.

  1. Now you can run the same aggregate query and record the performance after the compaction.

Summary of Amazon EMR testing

The runtime for the preceding aggregation query on the compacted Iceberg table reduced to approximately 59 seconds from the previous runtime of 1 minute, 39 seconds. That is about a 40% improvement. The more small files you have in your source bucket, the bigger performance boost you can achieve with this post-hook compaction implementation. The examples shown in this blog were executed in a small Amazon EMR cluster with only two core nodes (r5.xlarge). To improve the performance of your Spark applications, Amazon EMR provides multiple optimization features that you can implement for your production workloads.

Performance of Iceberg reads with the compaction utility on Athena

To manage the Iceberg table based on events, you can start the Spark 3.3 SQL shell as shown in the following code. Make sure that the athena:StartQueryExecution and athena:GetQueryExecution permission policies are enabled.

spark-sql --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
          --conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
          --conf spark.sql.catalog.my_catalog.warehouse=<s3-bucket> \
          --conf spark.sql.catalog.my_catalog.metrics-reporter-impl=org.apache.iceberg.aws.manage.AwsTableManagementMetricsEvaluator \
          --conf spark.sql.catalog.my_catalog.optimize-data.impl=org.apache.iceberg.aws.manage.AthenaOptimizeDataExecutor \
          --conf spark.sql.catalog.my_catalog.optimize-data.athena.output-bucket=<s3-bucket>

Clean up

After you complete the test, clean up your resources to avoid any recurring costs:

  1. Delete the S3 buckets that you created for this test.
  2. Delete the EMR cluster.
  3. Stop and delete the EMR notebook instance.

Conclusion

In this post, we showed how Iceberg event-based table management lets you manage each table differently based on events and compact small files to boost application performance. This event-based process significantly reduces the operational overhead of using the Iceberg rewrite_data_files procedure, which needs manual or scheduled operation.

To learn more about Apache Iceberg and implement this open table format for your transactional data lake use cases, refer to the following resources:


About the Authors

Avijit Goswami is a Principal Solutions Architect at AWS specialized in data and analytics. He supports AWS strategic customers in building high-performing, secure, and scalable data lake solutions on AWS using AWS managed services and open-source solutions. Outside of his work, Avijit likes to travel, hike, watch sports, and listen to music.

Rajarshi Sarkar is a Software Development Engineer at Amazon EMR/Athena. He works on cutting-edge features of Amazon EMR/Athena and is also involved in open-source projects such as Apache Iceberg and Trino. In his spare time, he likes to travel, watch movies, and hang out with friends.

Perform upserts in a data lake using Amazon Athena and Apache Iceberg

Post Syndicated from Ranjit Rajan original https://aws.amazon.com/blogs/big-data/perform-upserts-in-a-data-lake-using-amazon-athena-and-apache-iceberg/

Amazon Athena supports the MERGE command on Apache Iceberg tables, which allows you to perform inserts, updates, and deletes in your data lake at scale using familiar SQL statements that are compliant with ACID (Atomic, Consistent, Isolated, Durable). Apache Iceberg is an open table format for data lakes that manages large collections of files as tables. It supports modern analytical data lake operations such as create table as select (CTAS), upsert and merge, and time travel queries. Athena also supports the ability to create views and perform VACUUM (snapshot expiration) on Apache Iceberg tables to optimize storage and performance. With these features, you can now build data pipelines completely in standard SQL that are serverless, more simple to build, and able to operate at scale. This enables developers to:

  • Focus on writing business logic and not worry about setting up and managing the underlying infrastructure
  • Perform data transformations with Athena
  • Help comply with certain data deletion requirements
  • Apply change data capture (CDC) from sources databases

With data lakes, data pipelines are typically configured to write data into a raw zone, which is an Amazon Simple Storage Service (Amazon S3) bucket or folder that contains data as is from source systems. Data is accumulated in this zone, such that inserts, updates, or deletes on the sources database appear as records in new files as transactions occur on the source. Although the raw zone can be queried, any downstream processing or analytical queries typically need to deduplicate data to derive a current view of the source table. For example, if a single record is updated multiple times in the source database, these be need to be deduplicated and the most recent record selected.

Typically, data transformation processes are used to perform this operation, and a final consistent view is stored in an S3 bucket or folder. Data transformation processes can be complex requiring more coding, more testing and are also error prone. This was a challenge because data lakes are based on files and have been optimized for appending data. Previously, you had to overwrite the complete S3 object or folder, which was not only inefficient but also interrupted users who were querying the same data. With the evolution of frameworks such as Apache Iceberg, you can perform SQL-based upsert in-place in Amazon S3 using Athena, without blocking user queries and while still maintaining query performance.

In this post, we demonstrate how you can use Athena to apply CDC from a relational database to target tables in an S3 data lake.

Overview of solution

For this post, consider a mock sports ticketing application based on the following project. We use a single table in that database that contains sporting events information and ingest it into an S3 data lake on a continuous basis (initial load and ongoing changes). This data ingestion pipeline can be implemented using AWS Database Migration Service (AWS DMS) to extract both full and ongoing CDC extracts. With CDC, you can determine and track data that has changed and provide it as a stream of changes that a downstream application can consume. Most databases use a transaction log to record changes made to the database. AWS DMS reads the transaction log by using engine-specific API operations and captures the changes made to the database in a nonintrusive manner.

Specifically, to extract changed data including inserts, updates, and deletes from the database, you can configure AWS DMS with two replication tasks, as described in the following workshop. The first task performs an initial copy of the full data into an S3 folder. The second task is configured to replicate ongoing CDC into a separate folder in S3, which is further organized into date-based subfolders based on the source databases’ transaction commit date. With full and CDC data in separate S3 folders, it’s easier to maintain and operate data replication and downstream processing jobs. To enable this, you can apply the following extra connection attributes to the S3 endpoint in AWS DMS, (refer to S3Settings for other CSV and related settings):

  • TimestampColumnName – AWS DMS adds a column that you name with timestamp information for the commit of that row in the source database.
  • includeOpForFullLoad – AWS DMS adds a column named Op to every file to indicate if the record is an I (INSERT), U (UPDATE), or D (DELETE).
  • DatePartitionEnabled, DatePartitionSequence, DatePartitionDelimiter – These settings are used to configure AWS DMS to write changed data to date/time-based folders in the data lake. By partitioning folders, you can better manage S3 objects and optimize data lake queries for subsequent downstream processing.

We use the support in Athena for Apache Iceberg tables called MERGE INTO, which can express row-level updates. Apache Iceberg supports MERGE INTO by rewriting data files that contain rows that need to be updated. After the data is merged, we demonstrate how to use Athena to perform time travel on the sporting_event table, and use views to abstract and present different versions of the data to end-users. Finally, to simplify table maintenance, we demonstrate performing VACUUM on Apache Iceberg tables to delete older snapshots, which will optimize latency and cost of both read and write operations.

The following diagram illustrates the solution architecture.

The solution workflow consists of the following steps:

  • Data ingestion:
    • Steps 1 and 2 use AWS DMS, which connects to the source database to load initial data and ongoing changes (CDC) to Amazon S3 in CSV format. For this post, we have provided sample full and CDC datasets in CSV format that have been generated using AWS DMS.
    • Step 3 is comprised of the following actions:
      • Create an external table in Athena pointing to the source data ingested in Amazon S3.
      • Create an Apache Iceberg target table and load data from the source table.
      • Merge CDC data into the Apache Iceberg table using MERGE INTO.
  • Data access:
    • In Step 4, create a view on the Apache Iceberg table.
    • Use the view to query data using standard SQL.

Prerequisites

Before getting started, make sure you have the required permissions to perform the following in your AWS account:

Create tables on the raw data

First, create a database for this demo.

  1. Navigate to the Athena console and choose Query editor.
    If this is your first time using the Athena query editor, you need to configure and specify an S3 bucket to store the query results.
  2. Create a database with the following code:
    CREATE DATABASE raw_demo;

  3. Next, create a folder in an S3 bucket that you can use for this demo. Name this folder sporting_event_full.
  4. Upload LOAD00000001.csv into the folder.
  5. Switch to the raw_demo database and create a table to point to the raw input data:
    CREATE EXTERNAL TABLE raw_demo.sporting_event(
      op string,
      cdc_timestamp timestamp, 
      id bigint, 
      sport_type_name string, 
      home_team_id int, 
      away_team_id int, 
      location_id smallint, 
      start_date_time timestamp, 
      start_date date, 
      sold_out smallint)
    ROW FORMAT DELIMITED 
      FIELDS TERMINATED BY ',' 
    STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' 
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION 's3://<your bucket>/sporting_event_full/'
      ;

  6. Run the following query to review the data:
    SELECT * FROM raw_demo.sporting_event LIMIT 5;

  7. Next, create another folder in the same S3 bucket called sporting_event_cdc.
  8. Within this folder, create three subfolders in a time hierarchy folder structure such that the final S3 folder URI looks like s3://<your-bucket>/sporting_event_cdc/2022/09/22/.
  9. Upload 20220922-184314489.csv into this folder.This folder structure is similar to how AWS DMS stores CDC data when you enable date-based folder partitioning.
  10. Create a table to point to the CDC data. This table also includes a partition column because the source data in Amazon S3 is organized into date-based folders.
    CREATE EXTERNAL TABLE raw_demo.sporting_event_cdc(
    op string,
    cdc_timestamp timestamp,
    id bigint,
    sport_type_name string,
    home_team_id int,
    away_team_id int,
    location_id smallint,
    start_date_time timestamp,
    start_date date,
    sold_out smallint)
    PARTITIONED BY (partition_date string)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION 's3://<your-bucket>/sporting_event_cdc/'
    ;

  11. Next, alter the table to add new partitions. Because the data is stored in non-Hive style format by AWS DMS, to query this data, add this partition manually or use an AWS Glue crawler. As data accumulates, continue to add new partitions to query this data.
    ALTER TABLE raw_demo.sporting_event_cdc ADD PARTITION (partition_date='2022-09-22') location 's3://<your-bucket>/sporting_event_cdc/2022/09/22/'

  12. Run the following query to review the CDC data:
    SELECT * FROM raw_demo.sporting_event_cdc;

There are two records with IDs 1 and 11 that are updates with op code U. The record with ID 21 has a delete (D) op code, and the record with ID 5 is an insert (I).

cdc data

Use CTAS to create the target Iceberg table in Parquet format

CTAS statements create new tables using standard SELECT queries. The resultant table is added to the AWS Glue Data Catalog and made available for querying.

  1. First, create another database to store the target table:
    CREATE DATABASE curated_demo;

  2. Next, switch to this database and run the CTAS statement to select data from the raw input table to create the target Iceberg table (replace the location with an appropriate S3 bucket in your account):
    CREATE TABLE curated_demo.sporting_event
    WITH (table_type='ICEBERG',
    location='s3://<your-bucket>/curated/sporting_event',
    format='PARQUET',
    is_external=false)
    AS SELECT
    id,
    sport_type_name,
    home_team_id,
    away_team_id,
    cast(location_id as int) as location_id,
    cast(start_date_time as timestamp(6)) as start_date_time,
    start_date,
    cast(sold_out as int) as sold_out
    FROM raw_demo.sporting_event
    ;

  3. Run the following query to review data in the Iceberg table:
    SELECT * FROM curated_demo.sporting_event LIMIT 5;

iceberg data

Use MERGE INTO to insert, update, and delete data into the Iceberg table

The MERGE INTO command updates the target table with data from the CDC table. The following statement uses a combination of primary keys and the Op column in the source data, which indicates if the source row is an insert, update, or delete. We use the id column as the primary key to join the target table to the source table, and we use the Op column to determine if a record needs to be deleted.

MERGE INTO curated_demo.sporting_event t
USING (SELECT op,
cdc_timestamp,
id,
sport_type_name,
home_team_id,
away_team_id,
location_id,
start_date_time,
start_date,
sold_out
FROM raw_demo.sporting_event_cdc
WHERE partition_date ='2022-09-22') s
ON t.id = s.id
WHEN MATCHED AND s.op = 'D' THEN DELETE
WHEN MATCHED THEN
UPDATE SET
sport_type_name = s.sport_type_name,
home_team_id = s.home_team_id,
location_id = s.location_id,
start_date_time = s.start_date_time,
start_date = s.start_date,
sold_out = s.sold_out
WHEN NOT MATCHED THEN
INSERT (id,
sport_type_name,
home_team_id,
away_team_id,
location_id,
start_date_time,
start_date)
VALUES
(s.id,
s.sport_type_name,
s.home_team_id,
s.away_team_id,
s.location_id,
s.start_date_time,
s.start_date)

Run the following query to verify data in the Iceberg table:

SELECT * FROM curated_demo.sporting_event WHERE id in (1, 5, 11, 21);

The record with ID 21 has been deleted, and the other records in the CDC dataset have been updated and inserted, as expected.

merge and delete

Create a view that contains the previous state

When you write to an Iceberg table, a new snapshot or version of a table is created each time.

A snapshot represents the state of a table at a point in time and is used to access the complete set of data files in the table. Time travel queries in Athena query Amazon S3 for historical data from a consistent snapshot as of a specified date and time or a specified snapshot ID. However, this requires knowledge of a table’s current snapshots. To abstract this information from users, you can create views on top of Iceberg tables:

CREATE VIEW curated_demo.v_sporting_event_previous_snapshot AS
SELECT id,
sport_type_name,
home_team_id,
away_team_id,
location_id,
cast(start_date_time as timestamp(3)) as start_date_time,
start_date,
sold_out
FROM curated_demo.sporting_event
FOR TIMESTAMP AS OF current_timestamp + interval '-5' minute;

Run the following query using this view to retrieve the snapshot of data before the CDC was applied:

SELECT * FROM curated_demo.v_sporting_event_previous_snapshot WHERE id = 21;

You can see the record with ID 21, which was deleted earlier.

view data

Compliance with privacy regulations may require that you permanently delete records in all snapshots. To accomplish this, you can set properties for snapshot retention in Athena when creating the table, or you can alter the table:

ALTER TABLE curated_demo.sporting_event SET TBLPROPERTIES (
'vacuum_min_snapshots_to_keep'='1',
'vacuum_max_snapshot_age_seconds'='1'
)

This instructs Athena to store only one version of the data and not maintain any transaction history. After a table has been updated with these properties, run the VACUUM command to remove the older snapshots and clean up storage:

VACUUM curated_demo.sporting_event;

Run the following query again:

SELECT * FROM curated_demo.v_sporting_event_previous_snapshot WHERE id = 21;

The record with ID 21 has been permanently deleted.

final validation

Considerations

As data accumulates in the CDC folder of your raw zone, older files can be archived to Amazon S3 Glacier. Subsequently, the MERGE INTO statement can also be run on a single source file if needed by using $path in the WHERE condition of the USING clause:

MERGE INTO curated_demo.sporting_event t
USING (SELECT op, cdc_timestamp,id,sport_type_name, home_team_id, away_team_id, location_id, start_date_time, start_date, sold_out FROM raw_demo.sporting_event_cdc WHERE partition_date='2022-09-22' AND regexp_like("$path", ‘/sporting_event_cdc/2022/09/22/20220922-184314489.csv')
………..

This results in Athena scanning all files in the partition’s folder before the filter is applied, but can be minimized by choosing fine-grained hourly partitions. With this approach, you can trigger the MERGE INTO to run on Athena as files arrive in your S3 bucket using Amazon S3 event notifications. This could enable near-real-time use cases where users need to query a consistent view of data in the data lake as soon it is created in source systems.

Clean up

To avoid incurring ongoing costs, complete the following steps to clean up your resources:

  1. Run the following SQL to drop the tables and views:
    DROP TABLE raw_demo.sporting_event;
    DROP TABLE raw_demo.sporting_event_cdc;
    DROP TABLE curated_demo.sporting_event;
    DROP VIEW curated_demo.v_sporting_event_previous_snapshot;

    Because Iceberg tables are considered managed tables in Athena, dropping an Iceberg table also removes all the data in the corresponding S3 folder.

  2. Run the following SQL to drop the databases:
    DROP DATABASE raw_demo;
    DROP DATABASE curated_demo;

  3. Delete the S3 folders and CSV files that you had uploaded.

Conclusion

This post showed you how to apply CDC to a target Iceberg table using CTAS and MERGE INTO statements in Athena. You can perform bulk load using a CTAS statement. When new data or changed data arrives, use the MERGE INTO statement to merge the CDC changes. To optimize storage and improve performance of queries, use the VACUUM command regularly.

As next steps, you can orchestrate these SQL statements using AWS Step Functions to implement end-to-end data pipelines for your data lake. For more information, refer to Build and orchestrate ETL pipelines using Amazon Athena and AWS Step Functions.


About the Authors

Ranjit Rajan is a Principal Data Lab Solutions Architect with AWS. Ranjit works with AWS customers to help them design and build data and analytics applications in the cloud.

Kannan Iyer is a Senior Data Lab Solutions Architect with AWS. Kannan works with AWS customers to help them design and build data and analytics applications in the cloud.

Alexandre Rezende is a Data Lab Solutions Architect with AWS. Alexandre works with customers on their Business Intelligence, Data Warehouse, and Data Lake use cases, design architectures to solve their business problems, and helps them build MVPs to accelerate their path to production.

Use Apache Iceberg in a data lake to support incremental data processing

Post Syndicated from Flora Wu original https://aws.amazon.com/blogs/big-data/use-apache-iceberg-in-a-data-lake-to-support-incremental-data-processing/

Apache Iceberg is an open table format for very large analytic datasets, which captures metadata information on the state of datasets as they evolve and change over time. It adds tables to compute engines including Spark, Trino, PrestoDB, Flink, and Hive using a high-performance table format that works just like a SQL table. Iceberg has become very popular for its support for ACID transactions in data lakes and features like schema and partition evolution, time travel, and rollback.

Apache Iceberg integration is supported by AWS analytics services including Amazon EMR, Amazon Athena, and AWS Glue. Amazon EMR can provision clusters with Spark, Hive, Trino, and Flink that can run Iceberg. Starting with Amazon EMR version 6.5.0, you can use Iceberg with your EMR cluster without requiring a bootstrap action. In early 2022, AWS announced general availability of Athena ACID transactions, powered by Apache Iceberg. The recently released Athena query engine version 3 provides better integration with the Iceberg table format. AWS Glue 3.0 and later supports the Apache Iceberg framework for data lakes.

In this post, we discuss what customers want in modern data lakes and how Apache Iceberg helps address customer needs. Then we walk through a solution to build a high-performance and evolving Iceberg data lake on Amazon Simple Storage Service (Amazon S3) and process incremental data by running insert, update, and delete SQL statements. Finally, we show you how to performance tune the process to improve read and write performance.

How Apache Iceberg addresses what customers want in modern data lakes

More and more customers are building data lakes, with structured and unstructured data, to support many users, applications, and analytics tools. There is an increased need for data lakes to support database like features such as ACID transactions, record-level updates and deletes, time travel, and rollback. Apache Iceberg is designed to support these features on cost-effective petabyte-scale data lakes on Amazon S3.

Apache Iceberg addresses customer needs by capturing rich metadata information about the dataset at the time the individual data files are created. There are three layers in the architecture of an Iceberg table: the Iceberg catalog, the metadata layer, and the data layer, as depicted in the following figure (source).

The Iceberg catalog stores the metadata pointer to the current table metadata file. When a select query is reading an Iceberg table, the query engine first goes to the Iceberg catalog, then retrieves the location of the current metadata file. Whenever there is an update to the Iceberg table, a new snapshot of the table is created, and the metadata pointer points to the current table metadata file.

The following is an example Iceberg catalog with AWS Glue implementation. You can see the database name, the location (S3 path) of the Iceberg table, and the metadata location.

The metadata layer has three types of files: the metadata file, manifest list, and manifest file in a hierarchy. At the top of the hierarchy is the metadata file, which stores information about the table’s schema, partition information, and snapshots. The snapshot points to the manifest list. The manifest list has the information about each manifest file that makes up the snapshot, such as location of the manifest file, the partitions it belongs to, and the lower and upper bounds for partition columns for the data files it tracks. The manifest file tracks data files as well as additional details about each file, such as the file format. All three files work in a hierarchy to track the snapshots, schema, partitioning, properties, and data files in an Iceberg table.

The data layer has the individual data files of the Iceberg table. Iceberg supports a wide range of file formats including Parquet, ORC, and Avro. Because the Iceberg table tracks the individual data files instead of only pointing to the partition location with data files, it isolates the writing operations from reading operations. You can write the data files at any time, but only commit the change explicitly, which creates a new version of the snapshot and metadata files.

Solution overview

In this post, we walk you through a solution to build a high-performing Apache Iceberg data lake on Amazon S3; process incremental data with insert, update, and delete SQL statements; and tune the Iceberg table to improve read and write performance. The following diagram illustrates the solution architecture.

To demonstrate this solution, we use the Amazon Customer Reviews dataset in an S3 bucket (s3://amazon-reviews-pds/parquet/). In real use case, it would be raw data stored in your S3 bucket. We can check the data size with the following code in the AWS Command Line Interface (AWS CLI):

//Run this AWS CLI command to check the data size
aws s3 ls --summarize --human-readable --recursive s3://amazon-reviews-pds/parquet

The total object count is 430, and total size is 47.4 GiB.

To set up and test this solution, we complete the following high-level steps:

  1. Set up an S3 bucket in the curated zone to store converted data in Iceberg table format.
  2. Launch an EMR cluster with appropriate configurations for Apache Iceberg.
  3. Create a notebook in EMR Studio.
  4. Configure the Spark session for Apache Iceberg.
  5. Convert data to Iceberg table format and move data to the curated zone.
  6. Run insert, update, and delete queries in Athena to process incremental data.
  7. Carry out performance tuning.

Prerequisites

To follow along with this walkthrough, you must have an AWS account with an AWS Identity and Access Management (IAM) role that has sufficient access to provision the required resources.

Set up the S3 bucket for Iceberg data in the curated zone in your data lake

Choose the Region in which you want to create the S3 bucket and provide a unique name:

s3://iceberg-curated-blog-data

Launch an EMR cluster to run Iceberg jobs using Spark

You can create an EMR cluster from the AWS Management Console, Amazon EMR CLI, or AWS Cloud Development Kit (AWS CDK). For this post, we walk you through how to create an EMR cluster from the console.

  1. On the Amazon EMR console, choose Create cluster.
  2. Choose Advanced options.
  3. For Software Configuration, choose the latest Amazon EMR release. As of January 2023, the latest release is 6.9.0. Iceberg requires release 6.5.0 and above.
  4. Select JupyterEnterpriseGateway and Spark as the software to install.
  5. For Edit software settings, select Enter configuration and enter [{"classification":"iceberg-defaults","properties":{"iceberg.enabled":true}}].
  6. Leave other settings at their default and choose Next.
  7. For Hardware, use the default setting.
  8. Choose Next.
  9. For Cluster name, enter a name. We use iceberg-blog-cluster.
  10. Leave the remaining settings unchanged and choose Next.
  11. Choose Create cluster.

Create a notebook in EMR Studio

We now walk you through how to create a notebook in EMR Studio from the console.

  1. On the IAM console, create an EMR Studio service role.
  2. On the Amazon EMR console, choose EMR Studio.
  3. Choose Get started.

The Get started page appears in a new tab.

  1. Choose Create Studio in the new tab.
  2. Enter a name. We use iceberg-studio.
  3. Choose the same VPC and subnet as those for the EMR cluster, and the default security group.
  4. Choose AWS Identity and Access Management (IAM) for authentication, and choose the EMR Studio service role you just created.
  5. Choose an S3 path for Workspaces backup.
  6. Choose Create Studio.
  7. After the Studio is created, choose the Studio access URL.
  8. On the EMR Studio dashboard, choose Create workspace.
  9. Enter a name for your Workspace. We use iceberg-workspace.
  10. Expand Advanced configuration and choose Attach Workspace to an EMR cluster.
  11. Choose the EMR cluster you created earlier.
  12. Choose Create Workspace.
  13. Choose the Workspace name to open a new tab.

In the navigation pane, there is a notebook that has the same name as the Workspace. In our case, it is iceberg-workspace.

  1. Open the notebook.
  2. When prompted to choose a kernel, choose Spark.

Configure a Spark session for Apache Iceberg

Use the following code, providing your own S3 bucket name:

%%configure -f
{
"conf": {
"spark.sql.catalog.demo": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.demo.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
"spark.sql.catalog.demo.warehouse": "s3://iceberg-curated-blog-data",
"spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.sql.catalog.demo.io-impl":"org.apache.iceberg.aws.s3.S3FileIO"
}
}

This sets the following Spark session configurations:

  • spark.sql.catalog.demo – Registers a Spark catalog named demo, which uses the Iceberg Spark catalog plugin.
  • spark.sql.catalog.demo.catalog-impl – The demo Spark catalog uses AWS Glue as the physical catalog to store Iceberg database and table information.
  • spark.sql.catalog.demo.warehouse – The demo Spark catalog stores all Iceberg metadata and data files under the root path defined by this property: s3://iceberg-curated-blog-data.
  • spark.sql.extensions – Adds support to Iceberg Spark SQL extensions, which allows you to run Iceberg Spark procedures and some Iceberg-only SQL commands (you use this in a later step).
  • spark.sql.catalog.demo.io-impl – Iceberg allows users to write data to Amazon S3 through S3FileIO. The AWS Glue Data Catalog by default uses this FileIO, and other catalogs can load this FileIO using the io-impl catalog property.

Convert data to Iceberg table format

You can use either Spark on Amazon EMR or Athena to load the Iceberg table. In the EMR Studio Workspace notebook Spark session, run the following commands to load the data:

// create a database in AWS Glue named reviews if not exist
spark.sql("CREATE DATABASE IF NOT EXISTS demo.reviews")

// load reviews - this load all the parquet files
val reviews_all_location = "s3://amazon-reviews-pds/parquet/"
val reviews_all = spark.read.parquet(reviews_all_location)

// write reviews data to an Iceberg v2 table
reviews_all.writeTo("demo.reviews.all_reviews").tableProperty("format-version", "2").createOrReplace()

After you run the code, you should find two prefixes created in your data warehouse S3 path (s3://iceberg-curated-blog-data/reviews.db/all_reviews): data and metadata.

Process incremental data using insert, update, and delete SQL statements in Athena

Athena is a serverless query engine that you can use to perform read, write, update, and optimization tasks against Iceberg tables. To demonstrate how the Apache Iceberg data lake format supports incremental data ingestion, we run insert, update, and delete SQL statements on the data lake.

Navigate to the Athena console and choose Query editor. If this is your first time using the Athena query editor, you need to configure the query result location to be the S3 bucket you created earlier. You should be able to see that the table reviews.all_reviews is available for querying. Run the following query to verify that you have loaded the Iceberg table successfully:

select * from reviews.all_reviews limit 5;

Process incremental data by running insert, update, and delete SQL statements:

//Example update statement
update reviews.all_reviews set star_rating=5 where product_category = 'Watches' and star_rating=4

//Example delete statement
delete from reviews.all_reviews where product_category = 'Watches' and star_rating=1

Performance tuning

In this section, we walk through different ways to improve Apache Iceberg read and write performance.

Configure Apache Iceberg table properties

Apache Iceberg is a table format, and it supports table properties to configure table behavior such as read, write, and catalog. You can improve the read and write performance on Iceberg tables by adjusting the table properties.

For example, if you notice that you write too many small files for an Iceberg table, you can config the write file size to write fewer but bigger size files, to help improve query performance.

Property Default Description
write.target-file-size-bytes 536870912 (512 MB) Controls the size of files generated to target about this many bytes

Use the following code to alter the table format:

//Example code to alter table format in EMR Studio Workspace notebook
spark.sql("ALTER TABLE demo.reviews.all_reviews 
SET TBLPROPERTIES ('write_target_data_file_size_bytes'='536870912')")

Partitioning and sorting

To make a query run fast, the less data read the better. Iceberg takes advantage of the rich metadata it captures at write time and facilitates techniques such as scan planning, partitioning, pruning, and column-level stats such as min/max values to skip data files that don’t have match records. We walk you through how query scan planning and partitioning work in Iceberg and how we use them to improve query performance.

Query scan planning

For a given query, the first step in a query engine is scan planning, which is the process to find the files in a table needed for a query. Planning in an Iceberg table is very efficient, because Iceberg’s rich metadata can be used to prune metadata files that aren’t needed, in addition to filtering data files that don’t contain matching data. In our tests, we observed Athena scanned 50% or less data for a given query on an Iceberg table compared to original data before conversion to Iceberg format.

There are two types of filtering:

  • Metadata filtering – Iceberg uses two levels of metadata to track the files in a snapshot: the manifest list and manifest files. It first uses the manifest list, which acts as an index of the manifest files. During planning, Iceberg filters manifests using the partition value range in the manifest list without reading all the manifest files. Then it uses selected manifest files to get data files.
  • Data filtering – After selecting the list of manifest files, Iceberg uses the partition data and column-level stats for each data file stored in manifest files to filter data files. During planning, query predicates are converted to predicates on the partition data and applied first to filter data files. Then, the column stats like column-level value counts, null counts, lower bounds, and upper bounds are used to filter out data files that can’t match the query predicate. By using upper and lower bounds to filter data files at planning time, Iceberg greatly improves query performance.

Partitioning and sorting

Partitioning is a way to group records with the same key column values together in writing. The benefit of partitioning is faster queries that access only part of the data, as explained earlier in query scan planning: data filtering. Iceberg makes partitioning simple by supporting hidden partitioning, in the way that Iceberg produces partition values by taking a column value and optionally transforming it.

In our use case, we first run the following query on the Iceberg table not partitioned. Then we partition the Iceberg table by the category of the reviews, which will be used in the query WHERE condition to filter out records. With partitioning, the query could scan much less data. See the following code:

//Example code in EMR Studio Workspace notebook to create an Iceberg table all_reviews_partitioned partitioned by product_category
reviews_all.writeTo("demo.reviews.all_reviews_partitioned").tableProperty("format-version", "2").partitionedBy($"product_category").createOrReplace()

Run the following select statement on the non-partitioned all_reviews table vs. the partitioned table to see the performance difference:

//Run this query on all_reviews table and the partitioned table for performance testing
select marketplace,customer_id, review_id,product_id,product_title,star_rating from reviews.all_reviews where product_category = 'Watches' and review_date between date('2005-01-01') and date('2005-03-31')

//Run the same select query on partitioned dataset
select marketplace,customer_id, review_id,product_id,product_title,star_rating from reviews.all_reviews_partitioned where product_category = 'Watches' and review_date between date('2005-01-01') and date('2005-03-31')

The following table shows the performance improvement of data partitioning, with about 50% performance improvement and 70% less data scanned.

Dataset Name Non-Partitioned Dataset Partitioned Dataset
Runtime (seconds) 8.20 4.25
Data Scanned (MB) 131.55 33.79

Note that the runtime is the average runtime with multiple runs in our test.

We saw good performance improvement after partitioning. However, this can be further improved by using column-level stats from Iceberg manifest files. In order to use the column-level stats effectively, you want to further sort your records based on the query patterns. Sorting the whole dataset using the columns that are often used in queries will reorder the data in such a way that each data file ends up with a unique range of values for the specific columns. If these columns are used in the query condition, it allows query engines to further skip data files, thereby enabling even faster queries.

Copy-on-write vs. read-on-merge

When implementing update and delete on Iceberg tables in the data lake, there are two approaches defined by the Iceberg table properties:

  • Copy-on-write – With this approach, when there are changes to the Iceberg table, either updates or deletes, the data files associated with the impacted records will be duplicated and updated. The records will be either updated or deleted from the duplicated data files. A new snapshot of the Iceberg table will be created and pointing to the newer version of data files. This makes the overall writes slower. There might be situations that concurrent writes are needed with conflicts so retry has to happen, which increases the write time even more. On the other hand, when reading the data, there is no extra process needed. The query will retrieve data from the latest version of data files.
  • Merge-on-read – With this approach, when there are updates or deletes on the Iceberg table, the existing data files will not be rewritten; instead new delete files will be created to track the changes. For deletes, a new delete file will be created with the deleted records. When reading the Iceberg table, the delete file will be applied to the retrieved data to filter out the delete records. For updates, a new delete file will be created to mark the updated records as deleted. Then a new file will be created for those records but with updated values. When reading the Iceberg table, both the delete and new files will be applied to the retrieved data to reflect the latest changes and produce the correct results. So, for any subsequent queries, an extra step to merge the data files with the delete and new files will happen, which will usually increase the query time. On the other hand, the writes might be faster because there is no need to rewrite the existing data files.

To test the impact of the two approaches, you can run the following code to set the Iceberg table properties:

//Run code to alter Iceberg table property to set copy-on-write and merge-on-read in EMR Studio Workspace notebook
spark.sql(“ALTER TABLE demo.reviews.all_reviews 
SET TBLPROPERTIES (‘write.delete.mode’=’copy-on-write’,’write.update.mode’=’copy-on-write’)”)

Run the update, delete, and select SQL statements in Athena to show the runtime difference for copy-on-write vs. merge-on-read:

//Example update statement
update reviews.all_reviews set star_rating=5 where product_category = ‘Watches’ and star_rating=4

//Example delete statement
delete from reviews.all_reviews where product_category = ‘Watches’ and star_rating=1

//Example select statement
select marketplace,customer_id, review_id,product_id,product_title,star_rating from reviews.all_reviews where product_category = ‘Watches’ and review_date between date(‘2005-01-01’) and date(‘2005-03-31’)

The following table summarizes the query runtimes.

Query Copy-on-Write Merge-on-Read
UPDATE DELETE SELECT UPDATE DELETE SELECT
Runtime (seconds) 66.251 116.174 97.75 10.788 54.941 113.44
Data scanned (MB) 494.06 3.07 137.16 494.06 3.07 137.16

Note that the runtime is the average runtime with multiple runs in our test.

As our test results show, there are always trade-offs in the two approaches. Which approach to use depends on your use cases. In summary, the considerations come down to latency on the read vs. write. You can reference the following table and make the right choice.

. Copy-on-Write Merge-on-Read
Pros Faster reads Faster writes
Cons Expensive writes Higher latency on reads
When to use Good for frequent reads, infrequent updates and deletes or large batch updates Good for tables with frequent updates and deletes

Data compaction

If your data file size is small, you might end up with thousands or millions of files in an Iceberg table. This dramatically increases the I/O operation and slows down the queries. Furthermore, Iceberg tracks each data file in a dataset. More data files lead to more metadata. This in turn increases the overhead and I/O operation on reading metadata files. In order to improve the query performance, it’s recommended to compact small data files to larger data files.

When updating and deleting records in Iceberg table, if the read-on-merge approach is used, you might end up with many small deletes or new data files. Running compaction will combine all these files and create a newer version of the data file. This eliminates the need to reconcile them during reads. It’s recommended to have regular compaction jobs to impact reads as little as possible while still maintaining faster write speed.

Run the following data compaction command, then run the select query from Athena:

//Data compaction 
optimize reviews.all_reviews REWRITE DATA USING BIN_PACK

//Run this query before and after data compaction
select marketplace,customer_id, review_id,product_id,product_title,star_rating from reviews.all_reviews where product_category = 'Watches' and review_date between date('2005-01-01') and date('2005-03-31')

The following table compares the runtime before vs. after data compaction. You can see about 40% performance improvement.

Query Before Data Compaction After Data Compaction
Runtime (seconds) 97.75 32.676 seconds
Data scanned (MB) 137.16 M 189.19 M

Note that the select queries ran on the all_reviews table after update and delete operations, before and after data compaction. The runtime is the average runtime with multiple runs in our test.

Clean up

After you follow the solution walkthrough to perform the use cases, complete the following steps to clean up your resources and avoid further costs:

  1. Drop the AWS Glue tables and database from Athena or run the following code in your notebook:
// DROP the table 
spark.sql("DROP TABLE demo.reviews.all_reviews") 
spark.sql("DROP TABLE demo.reviews.all_reviews_partitioned") 

// DROP the database 
spark.sql("DROP DATABASE demo.reviews")
  1. On the EMR Studio console, choose Workspaces in the navigation pane.
  2. Select the Workspace you created and choose Delete.
  3. On the EMR console, navigate to the Studios page.
  4. Select the Studio you created and choose Delete.
  5. On the EMR console, choose Clusters in the navigation pane.
  6. Select the cluster and choose Terminate.
  7. Delete the S3 bucket and any other resources that you created as part of the prerequisites for this post.

Conclusion

In this post, we introduced the Apache Iceberg framework and how it helps resolve some of the challenges we have in a modern data lake. Then we walked you though a solution to process incremental data in a data lake using Apache Iceberg. Finally, we had a deep dive into performance tuning to improve read and write performance for our use cases.

We hope this post provides some useful information for you to decide whether you want to adopt Apache Iceberg in your data lake solution.


About the Authors

Flora Wu is a Sr. Resident Architect at AWS Data Lab. She helps enterprise customers create data analytics strategies and build solutions to accelerate their businesses outcomes. In her spare time, she enjoys playing tennis, dancing salsa, and traveling.

Daniel Li is a Sr. Solutions Architect at Amazon Web Services. He focuses on helping customers develop, adopt, and implement cloud services and strategy. When not working, he likes spending time outdoors with his family.

Build a real-time GDPR-aligned Apache Iceberg data lake

Post Syndicated from Dhiraj Thakur original https://aws.amazon.com/blogs/big-data/build-a-real-time-gdpr-aligned-apache-iceberg-data-lake/

Data lakes are a popular choice for today’s organizations to store their data around their business activities. As a best practice of a data lake design, data should be immutable once stored. But regulations such as the General Data Protection Regulation (GDPR) have created obligations for data operators who must be able to erase or update personal data from their data lake when requested.

A data lake built on AWS uses Amazon Simple Storage Service (Amazon S3) as its primary storage environment. When a customer asks to erase or update private data, the data lake operator needs to find the required objects in Amazon S3 that contain the required data and take steps to erase or update that data. This activity can be a complex process for the following reasons:

  • Data lakes may contain many S3 objects (each may contain multiple rows), and often it’s difficult to find the object containing the exact data that needs to be erased or personally identifiable information (PII) to be updated as per the request
  • By nature, S3 objects are immutable and therefore applying direct row-based transactions like DELETE or UPDATE isn’t possible

To handle these situations, a transactional feature on S3 objects is required, and frameworks such as Apache Hudi or Apache Iceberg provide you the transactional feature for upserts in Amazon S3.

AWS contributed the Apache Iceberg integration with the AWS Glue Data Catalog, which enables you to use open-source data computation engines like Apache Spark with Iceberg on AWS Glue. In 2022, Amazon Athena announced support of Iceberg, enabling transaction queries on S3 objects.

In this post, we show you how to stream real-time data to an Iceberg table in Amazon S3 using AWS Glue streaming and perform transactions using Amazon Athena for deletes and updates. We use a serverless mechanism for this implementation, which requires minimum operational overhead to manage and fine-tune various configuration parameters, and enables you to extend your use case to ACID operations beyond the GDPR.

Solution overview

We used the Amazon Kinesis Data Generator (KDG) to produce synthetic streaming data in Amazon Kinesis Data Streams and then processed the streaming input data using AWS Glue streaming to store the data in Amazon S3 in Iceberg table format. As part of the customer’s request, we ran delete and update statements using Athena with Iceberg support.

The following diagram illustrates the solution architecture.

The solution workflow consists of the following steps:

  1. Streaming data is generated in JSON format using the KDG template and inserted into Kinesis Data Streams.
  2. An AWS Glue streaming job is connected to Kinesis Data Streams to process the data using the Iceberg connector.
  3. The streaming job output is stored in Amazon S3 in Iceberg table format.
  4. Athena uses the AWS Glue Data Catalog to store and retrieve table metadata for the Amazon S3 data in Iceberg format.
  5. Athena interacts with the Data Catalog tables in Iceberg format for transactional queries required for GDPR.

The codebase required for this post is available in the GitHub repository.

Prerequisites

Before starting the implementation, make sure the following prerequisites are met:

Deploy resources using AWS CloudFormation

Complete the following steps to deploy your solution resources:

  1. After you sign in to your AWS account, launch the CloudFormation template by choosing Launch Stack:
  2. For Stack name, enter a name.
  3. For Username, enter the user name for the KDG.
  4. For Password, enter the password for the KDG (this must be at least six alphanumeric characters, and contain at least one number).
  5. For IAMGlueStreamingJobRoleName, enter a name for the IAM role used for the AWS Glue streaming job.
  6. Choose Next and create your stack.

This CloudFormation template configures the following resources in your account:

  • An S3 bucket named streamingicebergdemo-XX (note that the XX part is a random unique number to make the S3 bucket name unique)
  • An IAM policy and role
  • The KDG URL used for creating synthetic data
  1. After you complete the setup, go to the Outputs tab of the CloudFormation stack to get the S3 bucket name, AWS Glue job execution role (as per your input), and KDG URL.
  2. Before proceeding with the demo, create a folder named custdata under the created S3 bucket.

Create a Kinesis data stream

We use Kinesis Data Streams to create a serverless streaming data service that is built to handle millions of events with low latency. The following steps guide you on how to create the data stream in the us-east-1 Region:

  1. Log in to the AWS Management Console.
  2. Navigate to Kinesis console (make sure the Region is us-east-1).
  3. Select Kinesis Data Streams and choose Create data stream.
  4. For Data stream name, enter demo-data-stream.
  5. For this post, we select On-demand as the Kinesis data stream capacity mode.

On-demand mode works to eliminate the need for provisioning and managing the capacity for streaming data. However, you can implement this solution with Kinesis Data Streams in provisioned mode as well.

  1. Choose Create data stream.
  2. Wait for successful creation of demo-data-stream and for it to be in Active status.

Set up the Kinesis Data Generator

To create a sample streaming dataset, we use the KDG URL generated on the CloudFormation stack Outputs tab and log in with the credentials used in the parameters for the CloudFormation template. For this post, we use the following template to generate sample data in the demo-data-stream Kinesis data stream.

  1. Log in to the KDG URL with the user name and password you supplied during stack creation.
  2. Change the Region to us-east-1.
  3. Select the Kinesis data stream demo-data-stream.
  4. For Records per second, choose Constant and enter 100 (it can be another number, depending on the rate of record creation).
  5. On the Template 1 tab, enter the KDG data generation template:
{
"year": "{{random.number({"min":2000,"max":2022})}}",
"month": "{{random.number({"min":1,"max":12})}}",
"day": "{{random.number({"min":1,"max":30})}}",
"hour": "{{random.number({"min":0,"max":24})}}",
"minute": "{{random.number({"min":0,"max":60})}}",
"customerid": {{random.number({"min":5023,"max":59874})}},
"firstname" : "{{name.firstName}}",
"lastname" : "{{name.lastName}}",
"dateofbirth" : "{{date.past(70)}}",
"city" : "{{address.city}}",
"buildingnumber" : {{random.number({"min":63,"max":947})}},
"streetaddress" : "{{address.streetAddress}}",
"state" : "{{address.state}}",
"zipcode" : "{{address.zipCode}}",
"country" : "{{address.country}}",
"countrycode" : "{{address.countryCode}}",
"phonenumber" : "{{phone.phoneNumber}}",
"productname" : "{{commerce.productName}}",
"transactionamount": {{random.number(
{
"min":10,
"max":150
}
)}}
}
  1. Choose Test template to test the sample records.
  2. When the testing is correct, choose Send data.

This will start sending 100 records per second in the Kinesis data stream. (To stop sending data, choose Stop Sending Data to Kinesis.)

Integrate Iceberg with AWS Glue

To add the Apache Iceberg Connector for AWS Glue, complete the following steps. The connector is free to use and supports AWS Glue 1.0, 2.0, and 3.0.

  1. On the AWS Glue console, choose AWS Glue Studio in the navigation pane.
  2. In the navigation pane, navigate to AWS Marketplace.
  3. Search for and choose Apache Iceberg Connector for AWS Glue.
  4. Choose Accept Terms and Continue to Subscribe.
  5. Choose Continue to Configuration.
  6. For Fulfillment option, choose your AWS Glue version.
  7. For Software version, choose the latest software version.
  8. Choose Continue to Launch.
  9. Under Usage Instructions, choose the link to activate the connector.
  10. Enter a name for the connection, then choose Create connection and activate the connector.
  11. Verify the new connector on the AWS Glue Studio Connectors.

Create the AWS Glue Data Catalog database

The AWS Glue Data Catalog contains references to data that is used as sources and targets of your extract, transform, and load (ETL) jobs in AWS Glue. To create your data warehouse or data lake, you must catalog this data. The AWS Glue Data Catalog is an index to the location and schema of your data. You use the information in the Data Catalog to create and monitor your ETL jobs.

For this post, we create a Data Catalog database named icebergdemodb containing the metadata information of a table named customer, which will be queried through Athena.

  1. On the AWS Glue console, choose Databases in the navigation pane.
  2. Choose Add database.
  3. For Database name, enter icebergdemodb.

This creates an AWS Glue database for metadata storage.

Create a Data Catalog table in Iceberg format

In this step, we create a Data Catalog table in Iceberg table format.

  1. On the Athena console, create an Athena workgroup named demoworkgroup for SQL queries.
  2. Choose Athena engine version 3 for Query engine version.

For more information about Athena versions, refer to Changing Athena engine versions.

  1. Enter the S3 bucket location for Query result configuration under Additional configurations.
  2. Open the Athena query editor and choose demoworkgroup.
  3. Choose the database icebergdemodb.
  4. Enter and run the following DDL to create a table pointing to the Data Catalog database icerbergdemodb. Note that the TBLPROPERTIES section mentions ICEBERG as the table type and LOCATION points to the S3 folder (custdata) URI created in earlier steps. This DDL command is available on the GitHub repo.
CREATE TABLE icebergdemodb.customer(
year string,
month string,
day string,
hour string,
minute string,
customerid string,
firstname string,
lastname string,
dateofbirth string,
city string,
buildingnumber string,
streetaddress string,
state string,
zipcode string,
country string,
countrycode string,
phonenumber string,
productname string,
transactionamount int)
LOCATION '<S3 Location URI>'
TBLPROPERTIES (
'table_type'='ICEBERG',
'format'='parquet',
'write_target_data_file_size_bytes'='536870912',
'optimize_rewrite_delete_file_threshold'='10'
);

After you run the command successfully, you can see the table customer in the Data Catalog.

Create an AWS Glue streaming job

In this section, we create the AWS Glue streaming job, which fetches the record from the Kinesis data stream using the Spark script editor.

  1. On the AWS Glue console, choose Jobs (new) in the navigation pane.
  2. For Create job¸ select Spark script editor.
  3. For Options¸ select Create a new script with boilerplate code.
  4. Choose Create.
  5. Enter the code available in the GitHub repo in the editor.

The sample code keeps appending data in the target location by fetching records from the Kinesis data stream.

  1. Choose the Job details tab in the query editor.
  2. For Name, enter Demo_Job.
  3. For IAM role¸ choose demojobrole.
  4. For Type, choose Spark Streaming.
  5. For Glue Version, choose Glue 3.0.
  6. For Language, choose Python 3.
  7. For Worker type, choose G 0.25X.
  8. Select Automatically scale the number of workers.
  9. For Maximum number of workers, enter 5.
  10. Under Advanced properties, select Use Glue Data Catalog as the Hive metastore.
  11. For Connections, choose the connector you created.
  12. For Job parameters, enter the following key pairs (provide your S3 bucket and account ID):
Key Value
--iceberg_job_catalog_warehouse s3://streamingicebergdemo-XX/custdata/
--output_path s3://streamingicebergdemo-XX
--kinesis_arn arn:aws:kinesis:us-east-1:<AWS Account ID>:stream/demo-data-stream
--user-jars-first True

  1. Choose Run to start the AWS Glue streaming job.
  2. To monitor the job, choose Monitoring in the navigation pane.
  3. Select Demo_Job and choose View run details to check the job run details and Amazon CloudWatch logs.

Run GDPR use cases on Athena

In this section, we demonstrate a few use cases that are relevant to GDPR alignment with the user data that’s stored in Iceberg format in the Amazon S3-based data lake as implemented in the previous steps. For this, let’s consider that the following requests are being initiated in the workflow to comply with the regulations:

  • Delete the records for the input customerid (for example, 59289)
  • Update phonenumber for the customerid (for example, 51842)

The IDs used in this example are samples only because they were created through the KDG template used earlier, which creates sample data. You can search for IDs in your implementation by querying through the Athena query editor. The steps remain the same.

Delete data by customer ID

Complete the following steps to fulfill the first use case:

  1. On the Athena console, and make sure icebergdemodb is chosen as the database.
  2. Open the query editor.
  3. Enter the following query using a customer ID and choose Run:
SELECT count(*)
FROM icebergdemodb.customer
WHERE customerid = '59289';

This query gives the count of records for the input customerid before delete.

  1. Enter the following query with the same customer ID and choose Run:
MERGE INTO icebergdemodb.customer trg
USING (SELECT customerid
FROM icebergdemodb.customer
WHERE customerid = '59289') src
ON (trg.customerid = src.customerid)
WHEN MATCHED
THEN DELETE;

This query deletes the data for the input customerid as per the workflow generated.

  1. Test if there is data with the customer ID using a count query.

The count should be 0.

Update data by customer ID

Complete the following steps to test the second use case:

  1. On the Athena console, make sure icebergdemodb is chosen as the database.
  2. Open the query editor.
  3. Enter the following query with a customer ID and choose Run.
SELECT customerid, phonenumber
FROM icebergdemodb.customer
WHERE customerid = '51936';

This query gives the value for phonenumber before update.

  1. Run the following query to update the required columns:
MERGE INTO icebergdemodb.customer trg
USING (SELECT customerid
FROM icebergdemodb.customer
WHERE customerid = '51936') src
ON (trg.customerid = src.customerid)
WHEN MATCHED
THEN UPDATE SET phonenumber = '000';

This query updates the data to a dummy value.

  1. Run the SELECT query to check the update.

You can see the data is updated correctly.

Vacuum table

A good practice is to run the VACUUM command periodically on the table because operations like INSERT, UPDATE, DELETE, and MERGE will take place on the Iceberg table. See the following code:

VACUUM icebergdemodb.customer;

Considerations

The following are a few considerations to keep in mind for this implementation:

Clean up

Complete the following steps to clean up the resources you created for this post:

    1. Delete the custdata folder in the S3 bucket.
    2. Delete the CloudFormation stack.
    3. Delete the Kinesis data stream.
    4. Delete the S3 bucket storing the data.
    5. Delete the AWS Glue job and Iceberg connector.
    6. Delete the AWS Glue Data Catalog database and table.
    7. Delete the Athena workgroup.
    8. Delete the IAM roles and policies.

Conclusion

This post explained how you can use the Iceberg table format on Athena to implement GDPR use cases like data deletion and data upserts as required, when streaming data is being generated and ingested through AWS Glue streaming jobs in Amazon S3.

The operations for the Iceberg table that we demonstrated in this post aren’t all of the data operations that Iceberg supports. Refer to the Apache Iceberg documentation for details on various operations.


About the Authors

Dhiraj Thakur is a Solutions Architect with Amazon Web Services. He works with AWS customers and partners to provide guidance on enterprise cloud adoption, migration, and strategy. He is passionate about technology and enjoys building and experimenting in the analytics and AI/ML space.

Rajdip Chaudhuri is Solutions Architect with Amazon Web Services specializing in data and analytics. He enjoys working with AWS customers and partners on data and analytics requirements. In his spare time, he enjoys soccer.

Automate replication of relational sources into a transactional data lake with Apache Iceberg and AWS Glue

Post Syndicated from Luis Gerardo Baeza original https://aws.amazon.com/blogs/big-data/automate-replication-of-relational-sources-into-a-transactional-data-lake-with-apache-iceberg-and-aws-glue/

Organizations have chosen to build data lakes on top of Amazon Simple Storage Service (Amazon S3) for many years. A data lake is the most popular choice for organizations to store all their organizational data generated by different teams, across business domains, from all different formats, and even over history. According to a study, the average company is seeing the volume of their data growing at a rate that exceeds 50% per year, usually managing an average of 33 unique data sources for analysis.

Teams often try to replicate thousands of jobs from relational databases with the same extract, transform, and load (ETL) pattern. There is lot of effort in maintaining the job states and scheduling these individual jobs. This approach helps the teams add tables with few changes and also maintains the job status with minimum effort. This can lead to a huge improvement in the development timeline and tracking the jobs with ease.

In this post, we show you how to easily replicate all your relational data stores into a transactional data lake in an automated fashion with a single ETL job using Apache Iceberg and AWS Glue.

Solution architecture

Data lakes are usually organized using separate S3 buckets for three layers of data: the raw layer containing data in its original form, the stage layer containing intermediate processed data optimized for consumption, and the analytics layer containing aggregated data for specific use cases. In the raw layer, tables usually are organized based on their data sources, whereas tables in the stage layer are organized based on the business domains they belong to.

This post provides an AWS CloudFormation template that deploys an AWS Glue job that reads an Amazon S3 path for one data source of the data lake raw layer, and ingests the data into Apache Iceberg tables on the stage layer using AWS Glue support for data lake frameworks. The job expects tables in the raw layer to be structured in the way AWS Database Migration Service (AWS DMS) ingests them: schema, then table, then data files.

This solution uses AWS Systems Manager Parameter Store for table configuration. You should modify this parameter specifying the tables you want to process and how, including information such as primary key, partitions, and the business domain associated. The job uses this information to automatically create a database (if it doesn’t already exist) for every business domain, create the Iceberg tables, and perform the data loading.

Finally, we can use Amazon Athena to query the data in the Iceberg tables.

The following diagram illustrates this architecture.

Solution architecture

This implementation has the following considerations:

  • All tables from the data source must have a primary key to be replicated using this solution. The primary key can be a single column or a composite key with more than one column.
  • If the data lake contains tables that don’t need upserts or don’t have a primary key, you can exclude them from the parameter configuration and implement traditional ETL processes to ingest them into the data lake. That’s outside of the scope of this post.
  • If there are additional data sources that need to be ingested, you can deploy multiple CloudFormation stacks, one to handle each data source.
  • The AWS Glue job is designed to process data in two phases: the initial load that runs after AWS DMS finishes the full load task, and the incremental load that runs on a schedule that applies change data capture (CDC) files captured by AWS DMS. Incremental processing is performed using an AWS Glue job bookmark.

There are nine steps to complete this tutorial:

  1. Set up a source endpoint for AWS DMS.
  2. Deploy the solution using AWS CloudFormation.
  3. Review the AWS DMS replication task.
  4. Optionally, add permissions for encryption and decryption or AWS Lake Formation.
  5. Review the table configuration on Parameter Store.
  6. Perform initial data loading.
  7. Perform incremental data loading.
  8. Monitor table ingestion.
  9. Schedule incremental batch data loading.

Prerequisites

Before starting this tutorial, you should already be familiar with Iceberg. If you’re not, you can get started by replicating a single table following the instructions in Implement a CDC-based UPSERT in a data lake using Apache Iceberg and AWS Glue. Additionally, set up the following:

Set up a source endpoint for AWS DMS

Before we create our AWS DMS task, we need to set up a source endpoint to connect to the source database:

  1. On the AWS DMS console, choose Endpoints in the navigation pane.
  2. Choose Create endpoint.
  3. If your database is running on Amazon RDS, choose Select RDS DB instance, then choose the instance from the list. Otherwise, choose the source engine and provide the connection information either through AWS Secrets Manager or manually.
  4. For Endpoint identifier, enter a name for the endpoint; for example, source-postgresql.
  5. Choose Create endpoint.

Deploy the solution using AWS CloudFormation

Create a CloudFormation stack using the provided template. Complete the following steps:

  1. Choose Launch Stack:
  2. Choose Next.
  3. Provide a stack name, such as transactionaldl-postgresql.
  4. Enter the required parameters:
    1. DMSS3EndpointIAMRoleARN – The IAM role ARN for AWS DMS to write data into Amazon S3.
    2. ReplicationInstanceArn – The AWS DMS replication instance ARN.
    3. S3BucketStage – The name of the existing bucket used for the stage layer of the data lake.
    4. S3BucketGlue – The name of the existing S3 bucket for storing AWS Glue scripts.
    5. S3BucketRaw – The name of the existing bucket used for the raw layer of the data lake.
    6. SourceEndpointArn – The AWS DMS endpoint ARN that you created earlier.
    7. SourceName – The arbitrary identifier of the data source to replicate (for example, postgres). This is used to define the S3 path of the data lake (raw layer) where data will be stored.
  5. Do not modify the following parameters:
    1. SourceS3BucketBlog – The bucket name where the provided AWS Glue script is stored.
    2. SourceS3BucketPrefix – The bucket prefix name where the provided AWS Glue script is stored.
  6. Choose Next twice.
  7. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  8. Choose Create stack.

After approximately 5 minutes, the CloudFormation stack is deployed.

Review the AWS DMS replication task

The AWS CloudFormation deployment created an AWS DMS target endpoint for you. Because of two specific endpoint settings, the data will be ingested as we need it on Amazon S3.

  1. On the AWS DMS console, choose Endpoints in the navigation pane.
  2. Search for and choose the endpoint that begins with dmsIcebergs3endpoint.
  3. Review the endpoint settings:
    1. DataFormat is specified as parquet.
    2. TimestampColumnName will add the column last_update_time with the date of creation of the records on Amazon S3.

AWS DMS endpoint settings

The deployment also creates an AWS DMS replication task that begins with dmsicebergtask.

  1. Choose Replication tasks in the navigation pane and search for the task.

You will see that the Task Type is marked as Full load, ongoing replication. AWS DMS will perform an initial full load of existing data, and then create incremental files with changes performed to the source database.

On the Mapping Rules tab, there are two types of rules:

  • A selection rule with the name of the source schema and tables that will be ingested from the source database. By default, it uses the sample database provided in the prerequisites, dms_sample, and all tables with the keyword %.
  • Two transformation rules that include in the target files on Amazon S3 the schema name and table name as columns. This is used by our AWS Glue job to know to which tables the files in the data lake correspond.

To learn more about how to customize this for your own data sources, refer to Selection rules and actions.

AWS mapping rules

Let’s change some configurations to finish our task preparation.

  1. On the Actions menu, choose Modify.
  2. In the Task Settings section, under Stop task after full load completes, choose Stop after applying cached changes.

This way, we can control the initial load and incremental file generation as two different steps. We use this two-step approach to run the AWS Glue job once per each step.

  1. Under Task logs, choose Turn on CloudWatch logs.
  2. Choose Save.
  3. Wait about 1 minute for the database migration task status to show as Ready.

Add permissions for encryption and decryption or Lake Formation

Optionally, you can add permissions for encryption and decryption or Lake Formation.

Add encryption and decryption permissions

If your S3 buckets used for the raw and stage layers are encrypted using AWS Key Management Service (AWS KMS) customer managed keys, you need to add permissions to allow the AWS Glue job to access the data:

Add Lake Formation permissions

If you’re managing permissions using Lake Formation, you need to allow your AWS Glue job to create your domain’s databases and tables through the IAM role GlueJobRole.

  1. Grant permissions to create databases (for instructions, refer to Creating a Database).
  2. Grant SUPER permissions to the default database.
  3. Grant data location permissions.
  4. If you create databases manually, grant permissions on all databases to create tables. Refer to Granting table permissions using the Lake Formation console and the named resource method or Granting Data Catalog permissions using the LF-TBAC method according to your use case.

After you complete the later step of performing the initial data load, make sure to also add permissions for consumers to query the tables. The job role will become the owner of all the tables created, and the data lake admin can then perform grants to additional users.

Review table configuration in Parameter Store

The AWS Glue job that performs the data ingestion into Iceberg tables uses the table specification provided in Parameter Store. Complete the following steps to review the parameter store that was configured automatically for you. If needed, modify according to your own needs.

  1. On the Parameter Store console, choose My parameters in the navigation pane.

The CloudFormation stack created two parameters:

  • iceberg-config for job configurations
  • iceberg-tables for table configuration
  1. Choose the parameter iceberg-tables.

The JSON structure contains information that AWS Glue uses to read data and write the Iceberg tables on the target domain:

  • One object per table – The name of the object is created using the schema name, a period, and the table name; for example, schema.table.
  • primaryKey – This should be specified for every source table. You can provide a single column or a comma-separated list of columns (without spaces).
  • partitionCols – This optionally partitions columns for target tables. If you don’t want to create partitioned tables, provide an empty string. Otherwise, provide a single column or a comma-separated list of columns to be used (without spaces).
  1. If you want to use your own data source, use the following JSON code and replace the text in CAPS from the template provided. If you’re using the sample data source provided, keep the default settings:
{
    "SCHEMA_NAME.TABLE_NAME_1": {
        "primaryKey": "ONLY_PRIMARY_KEY",
        "domain": "TARGET_DOMAIN",
        "partitionCols": ""
    },
    "SCHEMA_NAME.TABLE_NAME_2": {
        "primaryKey": "FIRST_PRIMARY_KEY,SECOND_PRIMARY_KEY",
        "domain": "TARGET_DOMAIN",
        "partitionCols": "PARTITION_COLUMN_ONE,PARTITION_COLUMN_TWO"
    }
}
  1. Choose Save changes.

Perform initial data loading

Now that the required configuration is finished, we ingest the initial data. This step includes three parts: ingesting the data from the source relational database into the raw layer of the data lake, creating the Iceberg tables on the stage layer of the data lake, and verifying results using Athena.

Ingest data into the raw layer of the data lake

To ingest data from the relational data source (PostgreSQL if you are using the sample provided) to our transactional data lake using Iceberg, complete the following steps:

  1. On the AWS DMS console, choose Database migration tasks in the navigation pane.
  2. Select the replication task you created and on the Actions menu, choose Restart/Resume.
  3. Wait about 5 minutes for the replication task to complete. You can monitor the tables ingested on the Statistics tab of the replication task.

AWS DMS full load statistics

After some minutes, the task finishes with the message Full load complete.

  1. On the Amazon S3 console, choose the bucket you defined as the raw layer.

Under the S3 prefix defined on AWS DMS (for example, postgres), you should see a hierarchy of folders with the following structure:

  • Schema
    • Table name
      • LOAD00000001.parquet
      • LOAD0000000N.parquet

AWS DMS full load objects created on S3

If your S3 bucket is empty, review Troubleshooting migration tasks in AWS Database Migration Service before running the AWS Glue job.

Create and ingest data into Iceberg tables

Before running the job, let’s navigate the script of the AWS Glue job provided as part of the CloudFormation stack to understand its behavior.

  1. On the AWS Glue Studio console, choose Jobs in the navigation pane.
  2. Search for the job that starts with IcebergJob- and a suffix of your CloudFormation stack name (for example, IcebergJob-transactionaldl-postgresql).
  3. Choose the job.

AWS Glue ETL job review

The job script gets the configuration it needs from Parameter Store. The function getConfigFromSSM() returns job-related configurations such as source and target buckets from where the data needs to be read and written. The variable ssmparam_table_values contain table-related information like the data domain, table name, partition columns, and primary key of the tables that needs to be ingested. See the following Python code:

# Main application
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'stackName'])
SSM_PARAMETER_NAME = f"{args['stackName']}-iceberg-config"
SSM_TABLE_PARAMETER_NAME = f"{args['stackName']}-iceberg-tables"

# Parameters for job
rawS3BucketName, rawBucketPrefix, stageS3BucketName, warehouse_path = getConfigFromSSM(SSM_PARAMETER_NAME)
ssm_param_table_values = json.loads(ssmClient.get_parameter(Name = SSM_TABLE_PARAMETER_NAME)['Parameter']['Value'])
dropColumnList = ['db','table_name', 'schema_name','Op', 'last_update_time', 'max_op_date']

The script uses an arbitrary catalog name for Iceberg that is defined as my_catalog. This is implemented on the AWS Glue Data Catalog using Spark configurations, so a SQL operation pointing to my_catalog will be applied on the Data Catalog. See the following code:

catalog_name = 'my_catalog'
errored_table_list = []

# Iceberg configuration
spark = SparkSession.builder \
    .config('spark.sql.warehouse.dir', warehouse_path) \
    .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \
    .config(f'spark.sql.catalog.{catalog_name}.warehouse', warehouse_path) \
    .config(f'spark.sql.catalog.{catalog_name}.catalog-impl', 'org.apache.iceberg.aws.glue.GlueCatalog') \
    .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO') \
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .getOrCreate()

The script iterates over the tables defined in Parameter Store and performs the logic for detecting if the table exists and if the incoming data is an initial load or an upsert:

# Iteration over tables stored on Parameter Store
for key in ssm_param_table_values:
    # Get table data
    isTableExists = False
    schemaName, tableName = key.split('.')
    logger.info(f'Processing table : {tableName}')

The initialLoadRecordsSparkSQL() function loads initial data when no operation column is present in the S3 files. AWS DMS adds this column only to Parquet data files produced by the continuous replication (CDC). The data loading is performed using the INSERT INTO command with SparkSQL. See the following code:

sqltemp = Template("""
    INSERT INTO $catalog_name.$dbName.$tableName  ($insertTableColumnList)
    SELECT $insertTableColumnList FROM insertTable $partitionStrSQL
""")
SQLQUERY = sqltemp.substitute(
    catalog_name = catalog_name, 
    dbName = dbName, 
    tableName = tableName,
    insertTableColumnList = insertTableColumnList[ : -1],
    partitionStrSQL = partitionStrSQL)

logger.info(f'****SQL QUERY IS : {SQLQUERY}')
spark.sql(SQLQUERY)

Now we run the AWS Glue job to ingest the initial data into the Iceberg tables. The CloudFormation stack adds the --datalake-formats parameter, adding the required Iceberg libraries to the job.

  1. Choose Run job.
  2. Choose Job Runs to monitor the status. Wait until the status is Run Succeeded.

Verify the data loaded

To confirm that the job processed the data as expected, complete the following steps:

  1. On the Athena console, choose Query Editor in the navigation pane.
  2. Verify AwsDataCatalog is selected as the data source.
  3. Under Database, choose the data domain that you want to explore, based on the configuration you defined in the parameter store. If using the sample database provided, use sports.

Under Tables and views, we can see the list of tables that were created by the AWS Glue job.

  1. Choose the options menu (three dots) next to the first table name, then choose Preview Data.

You can see the data loaded into Iceberg tables. Amazon Athena review initial data loaded

Perform incremental data loading

Now we start capturing changes from our relational database and applying them to the transactional data lake. This step is also divided in three parts: capturing the changes, applying them to the Iceberg tables, and verifying the results.

Capture changes from the relational database

Due to the configuration we specified, the replication task stopped after running the full load phase. Now we restart the task to add incremental files with changes into the raw layer of the data lake.

  1. On the AWS DMS console, select the task we created and ran before.
  2. On the Actions menu, choose Resume.
  3. Choose Start task to start capturing changes.
  4. To trigger new file creation on the data lake, perform inserts, updates, or deletes on the tables of your source database using your preferred database administration tool. If using the sample database provided, you could run the following SQL commands:
UPDATE dms_sample.nfl_stadium_data_upd
SET seatin_capacity=93703
WHERE team = 'Los Angeles Rams' and sport_location_id = '31';

update  dms_sample.mlb_data 
set bats = 'R'
where mlb_id=506560 and bats='L';

update dms_sample.sporting_event 
set start_date  = current_date 
where id=11 and sold_out=0;
  1. On the AWS DMS task details page, choose the Table statistics tab to see the changes captured.
    AWS DMS CDC statistics
  2. Open the raw layer of the data lake to find a new file holding the incremental changes inside every table’s prefix, for example under the sporting_event prefix.

The record with changes for the sporting_event table looks like the following screenshot.

AWS DMS objects migrated into S3 with CDC

Notice the Op column in the beginning identified with an update (U). Also, the second date/time value is the control column added by AWS DMS with the time the change was captured.

CDC file schema on Amazon S3

Apply changes on the Iceberg tables using AWS Glue

Now we run the AWS Glue job again, and it will automatically process only the new incremental files since the job bookmark is enabled. Let’s review how it works.

The dedupCDCRecords() function performs deduplication of data because multiple changes to a single record ID could be captured within the same data file on Amazon S3. Deduplication is performed based on the last_update_time column added by AWS DMS that indicates the timestamp of when the change was captured. See the following Python code:

def dedupCDCRecords(inputDf, keylist):
    IDWindowDF = Window.partitionBy(*keylist).orderBy(inputDf.last_update_time).rangeBetween(-sys.maxsize, sys.maxsize)
    inputDFWithTS = inputDf.withColumn('max_op_date', max(inputDf.last_update_time).over(IDWindowDF))
    
    NewInsertsDF = inputDFWithTS.filter('last_update_time=max_op_date').filter("op='I'")
    UpdateDeleteDf = inputDFWithTS.filter('last_update_time=max_op_date').filter("op IN ('U','D')")
    finalInputDF = NewInsertsDF.unionAll(UpdateDeleteDf)

    return finalInputDF

On line 99, the upsertRecordsSparkSQL() function performs the upsert in a similar fashion to the initial load, but this time with a SQL MERGE command.

Review the applied changes

Open the Athena console and run a query that selects the changed records on the source database. If using the provided sample database, use one the following SQL queries:

SELECT * FROM "sports"."nfl_stadiu_data_upd"
WHERE team = 'Los Angeles Rams' and sport_location_id = 31
LIMIT 1;

Amazon Athena review cdc data loaded

Monitor table ingestion

The AWS Glue job script is coded with simple Python exception handling to catch errors during processing a specific table. The job bookmark is saved after each table finishes processing successfully, to avoid reprocessing tables if the job run is retried for the tables with errors.

The AWS Command Line Interface (AWS CLI) provides a get-job-bookmark command for AWS Glue that provides insight into the status of the bookmark for each table processed.

  1. On the AWS Glue Studio console, choose the ETL job.
  2. Choose the Job Runs tab and copy the job run ID.
  3. Run the following command on a terminal authenticated for the AWS CLI, replacing <GLUE_JOB_RUN_ID> on line 1 with the value you copied. If your CloudFormation stack is not named transactionaldl-postgresql, provide the name of your job on line 2 of the script:
jobrun=<GLUE_JOB_RUN_ID>
jobname=IcebergJob-transactionaldl-postgresql
aws glue get-job-bookmark --job-name jobname --run-id $jobrun

In this solution, when a table processing causes an exception, the AWS Glue job will not fail according to this logic. Instead, the table will be added into an array that is printed after the job is complete. In such scenario, the job will be marked as failed after it tries to process the rest of the tables detected on the raw data source. This way, tables without errors don’t have to wait until the user identifies and solves the problem on the conflicting tables. The user can quickly detect job runs that had issues using the AWS Glue job run status, and identify which specific tables are causing the problem using the CloudWatch logs for the job run.

  1. The job script implements this feature with the following Python code:
# Performed for every table
        try:
            # Table processing logic
        except Exception as e:
            logger.info(f'There is an issue with table: {tableName}')
            logger.info(f'The exception is : {e}')
            errored_table_list.append(tableName)
            continue
        job.commit()
if (len(errored_table_list)):
    logger.info('Total number of errored tables are ',len(errored_table_list))
    logger.info('Tables that failed during processing are ', *errored_table_list, sep=', ')
    raise Exception(f'***** Some tables failed to process.')

The following screenshot shows how the CloudWatch logs look for tables that cause errors on processing.

AWS Glue job monitoring with logs

Aligned with the AWS Well-Architected Framework Data Analytics Lens practices, you can adapt more sophisticated control mechanisms to identify and notify stakeholders when errors appear on the data pipelines. For example, you can use an Amazon DynamoDB control table to store all tables and job runs with errors, or using Amazon Simple Notification Service (Amazon SNS) to send alerts to operators when certain criteria is met.

Schedule incremental batch data loading

The CloudFormation stack deploys an Amazon EventBridge rule (disabled by default) that can trigger the AWS Glue job to run on a schedule. To provide your own schedule and enable the rule, complete the following steps:

  1. On the EventBridge console, choose Rules in the navigation pane.
  2. Search for the rule prefixed with the name of your CloudFormation stack followed by JobTrigger (for example, transactionaldl-postgresql-JobTrigger-randomvalue).
  3. Choose the rule.
  4. Under Event Schedule, choose Edit.

The default schedule is configured to trigger every hour.

  1. Provide the schedule you want to run the job.
  2. Additionally, you can use an EventBridge cron expression by selecting A fine-grained schedule.
    Amazon EventBridge schedule ETL job
  3. When you finish setting up the cron expression, choose Next three times, and finally choose Update Rule to save changes.

The rule is created disabled by default to allow you to run the initial data load first.

  1. Activate the rule by choosing Enable.

You can use the Monitoring tab to view rule invocations, or directly on the AWS Glue Job Run details.

Conclusion

After deploying this solution, you have automated the ingestion of your tables on a single relational data source. Organizations using a data lake as their central data platform usually need to handle multiple, sometimes even tens of data sources. Also, more and more use cases require organizations to implement transactional capabilities to the data lake. You can use this solution to accelerate the adoption of such capabilities across all your relational data sources to enable new business use cases, automating the implementation process to derive more value from your data.


About the Authors

Luis Gerardo BaezaLuis Gerardo Baeza is a Big Data Architect in the Amazon Web Services (AWS) Data Lab. He has 12 years of experience helping organizations in the healthcare, financial and education sectors to adopt enterprise architecture programs, cloud computing, and data analytics capabilities. Luis currently helps organizations across Latin America to accelerate strategic data initiatives.

SaiKiran Reddy AenuguSaiKiran Reddy Aenugu is a Data Architect in the Amazon Web Services (AWS) Data Lab. He has 10 years of experience implementing data loading, transformation, and visualization processes. SaiKiran currently helps organizations in North America to adopt modern data architectures such as data lakes and data mesh. He has experience in the retail, airline, and finance sectors.

Narendra MerlaNarendra Merla is a Data Architect in the Amazon Web Services (AWS) Data Lab. He has 12 years of experience in designing and productionalizing both real-time and batch-oriented data pipelines and building data lakes on both cloud and on-premises environments. Narendra currently helps organizations in North America to build and design robust data architectures, and has experience in the telecom and finance sectors.

Build a data lake with Apache Flink on Amazon EMR

Post Syndicated from Jianwei Li original https://aws.amazon.com/blogs/big-data/build-a-unified-data-lake-with-apache-flink-on-amazon-emr/

To build a data-driven business, it is important to democratize enterprise data assets in a data catalog. With a unified data catalog, you can quickly search datasets and figure out data schema, data format, and location. The AWS Glue Data Catalog provides a uniform repository where disparate systems can store and find metadata to keep track of data in data silos.

Apache Flink is a widely used data processing engine for scalable streaming ETL, analytics, and event-driven applications. It provides precise time and state management with fault tolerance. Flink can process bounded stream (batch) and unbounded stream (stream) with a unified API or application. After data is processed with Apache Flink, downstream applications can access the curated data with a unified data catalog. With unified metadata, both data processing and data consuming applications can access the tables using the same metadata.

This post shows you how to integrate Apache Flink in Amazon EMR with the AWS Glue Data Catalog so that you can ingest streaming data in real time and access the data in near-real time for business analysis.

Apache Flink connector and catalog architecture

Apache Flink uses a connector and catalog to interact with data and metadata. The following diagram shows the architecture of the Apache Flink connector for data read/write, and catalog for metadata read/write.

Flink Glue Architecture

For data read/write, Flink has the interface DynamicTableSourceFactory for read and DynamicTableSinkFactory for write. A different Flink connector implements two interfaces to access data in different storage. For example, the Flink FileSystem connector has FileSystemTableFactory to read/write data in Hadoop Distributed File System (HDFS) or Amazon Simple Storage Service (Amazon S3), the Flink HBase connector has HBase2DynamicTableFactory to read/write data in HBase, and the Flink Kafka connector has KafkaDynamicTableFactory to read/write data in Kafka. You can refer to Table & SQL Connectors for more information.

For metadata read/write, Flink has the catalog interface. Flink has three built-in implementations for the catalog. GenericInMemoryCatalog stores the catalog data in memory. JdbcCatalog stores the catalog data in a JDBC-supported relational database. As of this writing, MySQL and PostgreSQL databases are supported in the JDBC catalog. HiveCatalog stores the catalog data in Hive Metastore. HiveCatalog uses HiveShim to provide different Hive version compatibility. We can configure different metastore clients to use Hive Metastore or the AWS Glue Data Catalog. In this post, we configure the Amazon EMR property hive.metastore.client.factory.class to com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory (see Using the AWS Glue Data Catalog as the metastore for Hive) so that we can use the AWS Glue Data Catalog to store Flink catalog data. Refer to Catalogs for more information.

Most Flink built-in connectors, such as for Kafka, Amazon Kinesis, Amazon DynamoDB, Elasticsearch, or FileSystem, can use Flink HiveCatalog to store metadata in the AWS Glue Data Catalog. However, some connector implementations such as Apache Iceberg have their own catalog management mechanism. FlinkCatalog in Iceberg implements the catalog interface in Flink. FlinkCatalog in Iceberg has a wrapper to its own catalog implementation. The following diagram shows the relationship between Apache Flink, the Iceberg connector, and the catalog. For more information, refer to Creating catalogs and using catalogs and Catalogs.

Flink Iceberg Glue Architecture

Apache Hudi also has its own catalog management. Both HoodieCatalog and HoodieHiveCatalog implements a catalog interface in Flink. HoodieCatalog stores metadata in a file system such as HDFS. HoodieHiveCatalog stores metadata in Hive Metastore or the AWS Glue Data Catalog, depending on whether you configure hive.metastore.client.factory.class to use com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory. The following diagram shows relationship between Apache Flink, the Hudi connector, and the catalog. For more information, refer to Create Catalog.

Flink Hudi Glue Architecture

Because Iceberg and Hudi have different catalog management mechanisms, we show three scenarios of Flink integration with the AWS Glue Data Catalog in this post:

  • Read/Write to Iceberg tables in Flink with metadata in Glue Data Catalog
  • Read/Write to Hudi tables in Flink with metadata in Glue Data Catalog
  • Read/Write to other storage format in Flink with metadata in Glue Data Catalog

Solution overview

The following diagram shows the overall architecture of the solution described in this post.

Flink Glue Integration

In this solution, we enable an Amazon RDS for MySQL binlog to extract transaction changes in real time. The Amazon EMR Flink CDC connector reads the binlog data and processes the data. Transformed data can be stored in Amazon S3. We use the AWS Glue Data Catalog to store the metadata such as table schema and table location. Downstream data consumer applications such as Amazon Athena or Amazon EMR Trino access the data for business analysis.

The following are the high-level steps to set up this solution:

  1. Enable binlog for Amazon RDS for MySQL and initialize the database.
  2. Create an EMR cluster with the AWS Glue Data Catalog.
  3. Ingest change data capture (CDC) data with Apache Flink CDC in Amazon EMR.
  4. Store the processed data in Amazon S3 with metadata in the AWS Glue Data Catalog.
  5. Verify all table metadata is stored in the AWS Glue Data Catalog.
  6. Consume data with Athena or Amazon EMR Trino for business analysis.
  7. Update and delete source records in Amazon RDS for MySQL and validate the reflection of the data lake tables.

Prerequisites

This post uses an AWS Identity and Access Management (IAM) role with permissions for the following services:

  • Amazon RDS for MySQL (5.7.40)
  • Amazon EMR (6.9.0)
  • Amazon Athena
  • AWS Glue Data Catalog
  • Amazon S3

Enable binlog for Amazon RDS for MySQL and initialize the database

To enable CDC in Amazon RDS for MySQL, we need to configure binary logging for Amazon RDS for MySQL. Refer to Configuring MySQL binary logging for more information. We also create the database salesdb in MySQL and create the tables customer, order, and others to set up the data source.

  1. On the Amazon RDS console, choose Parameter groups in the navigation pane.
  2. Create a new parameter group for MySQL.
  3. Edit the parameter group you just created to set binlog_format=ROW.

RDS-Binlog-Format

  1. Edit the parameter group you just created to set binlog_row_image=full.

RDS-Binlog-Row-Image

  1. Create an RDS for MySQL DB instance with the parameter group.
  2. Note down the values for hostname, username, and password, which we use later.
  3. Download the MySQL database initialization script from Amazon S3 by running the following command:
aws s3 cp s3://emr-workshops-us-west-2/glue_immersion_day/scripts/salesdb.sql ./salesdb.sql
  1. Connect to the RDS for MySQL database and run the salesdb.sql command to initialize the database, providing the host name and user name according to your RDS for MySQL database configuration:
mysql -h <hostname> -u <username> -p
mysql> source salesdb.sql

Create an EMR cluster with the AWS Glue Data Catalog

From Amazon EMR 6.9.0, the Flink table API/SQL can integrate with the AWS Glue Data Catalog. To use the Flink and AWS Glue integration, you must create an Amazon EMR 6.9.0 or later version.

  1. Create the file iceberg.properties for the Amazon EMR Trino integration with the Data Catalog. When the table format is Iceberg, your file should have following content:
iceberg.catalog.type=glue
connector.name=iceberg
  1. Upload iceberg.properties to an S3 bucket, for example DOC-EXAMPLE-BUCKET.

For more information on how to integrate Amazon EMR Trino with Iceberg, refer to Use an Iceberg cluster with Trino.

  1. Create the file trino-glue-catalog-setup.sh to configure the Trino integration with the Data Catalog. Use trino-glue-catalog-setup.sh as the bootstrap script. Your file should have the following content (replace DOC-EXAMPLE-BUCKET with your S3 bucket name):
set -ex 
sudo aws s3 cp s3://DOC-EXAMPLE-BUCKET/iceberg.properties /etc/trino/conf/catalog/iceberg.properties

  1. Upload trino-glue-catalog-setup.sh to your S3 bucket (DOC-EXAMPLE-BUCKET).

Refer to Create bootstrap actions to install additional software to run a bootstrap script.

  1. Create the file flink-glue-catalog-setup.sh to configure the Flink integration with the Data Catalog.
  2. Use a script runner and run the flink-glue-catalog-setup.sh script as a step function.

Your file should have the following content (the JAR file name here is using Amazon EMR 6.9.0; a later version JAR name may change, so make sure to update according to your Amazon EMR version).

Note that here we use an Amazon EMR step, not a bootstrap, to run this script. An Amazon EMR step script is run after Amazon EMR Flink is provisioned.

set -ex

sudo cp /usr/lib/hive/auxlib/aws-glue-datacatalog-hive3-client.jar /usr/lib/flink/lib
sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib
sudo cp /usr/lib/hive/lib/hive-exec.jar /lib/flink/lib
sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib
sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib
sudo chmod 755 /usr/lib/flink/lib/aws-glue-datacatalog-hive3-client.jar
sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar
sudo chmod 755 /usr/lib/flink/lib/hive-exec.jar
sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar
sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar

sudo wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jar -O /lib/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar
sudo chmod 755 /lib/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar

sudo ln -s /usr/share/aws/iceberg/lib/iceberg-flink-runtime.jar /usr/lib/flink/lib/
sudo ln -s /usr/lib/hudi/hudi-flink-bundle.jar /usr/lib/flink/lib/

sudo mv /usr/lib/flink/opt/flink-table-planner_2.12-1.15.2.jar /usr/lib/flink/lib/
sudo mv /usr/lib/flink/lib/flink-table-planner-loader-1.15.2.jar /usr/lib/flink/opt/
  1. Upload flink-glue-catalog-setup.sh to your S3 bucket (DOC-EXAMPLE-BUCKET).

Refer to Configuring Flink to Hive Metastore in Amazon EMR for more information on how to configure Flink and Hive Metastore. Refer to Run commands and scripts on an Amazon EMR cluster for more details on running the Amazon EMR step script.

  1. Create an EMR 6.9.0 cluster with the applications Hive, Flink, and Trino.

You can create an EMR cluster with the AWS Command Line Interface (AWS CLI) or the AWS Management Console. Refer to the appropriate subsection for instructions.

Create an EMR cluster with the AWS CLI

To use the AWS CLI, complete the following steps:

  1. Create the file emr-flink-trino-glue.json to configure Amazon EMR to use the Data Catalog. Your file should have the following content:
[
{
"Classification": "hive-site",
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
}
},
{
"Classification": "trino-connector-hive",
"Properties": {
"hive.metastore": "glue"
}
}
]
  1. Run the following command to create the EMR cluster. Provide your local emr-flink-trino-glue.json parent folder path, S3 bucket, EMR cluster Region, EC2 key name, and S3 bucket for EMR logs.
aws emr create-cluster --release-label emr-6.9.0 \
--applications Name=Hive Name=Flink Name=Spark Name=Trino \
--region us-west-2 \
--name flink-trino-glue-emr69 \
--configurations "file:///<your configuration path>/emr-flink-trino-glue.json" \
--bootstrap-actions '[{"Path":"s3://DOC-EXAMPLE-BUCKET/trino-glue-catalog-setup.sh","Name":"Add iceberg.properties for Trino"}]' \
--steps '[{"Args":["s3://DOC-EXAMPLE-BUCKET/flink-glue-catalog-setup.sh"],"Type":"CUSTOM_JAR","ActionOnFailure":"CONTINUE","Jar":"s3://<region>.elasticmapreduce/libs/script-runner/script-runner.jar","Properties":"","Name":"Flink-glue-integration"}]' \
--instance-groups \
InstanceGroupType=MASTER,InstanceType=m6g.2xlarge,InstanceCount=1 \
InstanceGroupType=CORE,InstanceType=m6g.2xlarge,InstanceCount=2 \
--use-default-roles \
--ebs-root-volume-size 30 \
--ec2-attributes KeyName=<keyname> \
--log-uri s3://<s3-bucket-for-emr>/elasticmapreduce/

Create an EMR cluster on the console

To use the console, complete the following steps:

  1. On the Amazon EMR console, create an EMR cluster and select Use for Hive table metadata for AWS Glue Data Catalog settings.
  2. Add configuration settings with the following code:
[
{
"Classification": "trino-connector-hive",
"Properties": {
"hive.metastore": "glue"
}
}
]

EMR-6.9-Flink-Hive-Glue-1

  1. In the Steps section, add a step called Custom JAR.
  2. Set JAR location to s3://<region>.elasticmapreduce/libs/script-runner/script-runner.jar, where <region> is the region in which your EMR cluster resides.
  3. Set Arguments to the S3 path you uploaded earlier.

EMR-6.9-Flink-Hive-Glue-2

  1. In the Bootstrap Actions section, choose Custom Action.
  2. Set Script location to the S3 path you uploaded.

EMR-6.9-Flink-Hive-Glue-3

  1. Continue the subsequent steps to complete your EMR cluster creation.

Ingest CDC data with Apache Flink CDC in Amazon EMR

The Flink CDC connector supports reading database snapshots and captures updates in the configured tables. We have deployed the Flink CDC connector for MySQL by downloading flink-sql-connector-mysql-cdc-2.2.1.jar and putting it into the Flink library when we create our EMR cluster. The Flink CDC connector can use the Flink Hive catalog to store Flink CDC table schema into Hive Metastore or the AWS Glue Data Catalog. In this post, we use the Data Catalog to store our Flink CDC table.

Complete the following steps to ingest RDS for MySQL databases and tables with Flink CDC and store metadata in the Data Catalog:

  1. SSH to the EMR primary node.
  2. Start Flink on a YARN session by running the following command, providing your S3 bucket name:
flink-yarn-session -d -jm 2048 -tm 4096 -s 2 \
-D state.backend=rocksdb \
-D state.backend.incremental=true \
-D state.checkpoint-storage=filesystem \
-D state.checkpoints.dir=s3://<flink-glue-integration-bucket>/flink-checkponts/ \
-D state.checkpoints.num-retained=10 \
-D execution.checkpointing.interval=10s \
-D execution.checkpointing.mode=EXACTLY_ONCE \
-D execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION \
-D execution.checkpointing.max-concurrent-checkpoints=1
  1. Start the Flink SQL client CLI by running the following command:
/usr/lib/flink/bin/sql-client.sh embedded
  1. Create the Flink Hive catalog by specifying the catalog type as hive and providing your S3 bucket name:
CREATE CATALOG glue_catalog WITH (
'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/etc/hive/conf.dist'
);
USE CATALOG glue_catalog;
CREATE DATABASE IF NOT EXISTS flink_cdc_db WITH ('hive.database.location-uri'= 's3://<flink-glue-integration-bucket>/flink-glue-for-hive/warehouse/')
use flink_cdc_db;

Because we’re configuring the EMR Hive catalog use the AWS Glue Data Catalog, all the databases and tables created in the Flink Hive catalog are stored in the Data Catalog.

  1. Create the Flink CDC table, providing the host name, user name, and password for the RDS for MySQL instance you created earlier.

Note that because the RDS for MySQL user name and password will be stored in the Data Catalog as table properties, you should be enable AWS Glue database/table authorization with AWS Lake Formation to protect your sensitive data.

CREATE TABLE `glue_catalog`.`flink_cdc_db`.`customer_cdc` (
`CUST_ID` double NOT NULL,
`NAME` STRING NOT NULL,
`MKTSEGMENT` STRING NOT NULL,
PRIMARY KEY (`CUST_ID`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '<hostname>',
'port' = '3306',
'username' = '<username>',
'password' = '<password>',
'database-name' = 'salesdb',
'table-name' = 'CUSTOMER'
);

CREATE TABLE `glue_catalog`.`flink_cdc_db`.`customer_site_cdc` (
`SITE_ID` double NOT NULL,
`CUST_ID` double NOT NULL,
`ADDRESS` STRING NOT NULL,
`CITY` STRING NOT NULL,
`STATE` STRING NOT NULL,
`COUNTRY` STRING NOT NULL,
`PHONE` STRING NOT NULL,
PRIMARY KEY (`SITE_ID`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '<hostname>',
'port' = '3306',
'username' = '<username>',
'password' = '<password>',
'database-name' = 'salesdb',
'table-name' = 'CUSTOMER_SITE'
);

CREATE TABLE `glue_catalog`.`flink_cdc_db`.`sales_order_all_cdc` (
`ORDER_ID` int NOT NULL,
`SITE_ID` double NOT NULL,
`ORDER_DATE` TIMESTAMP NOT NULL,
`SHIP_MODE` STRING NOT NULL
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '<hostname>',
'port' = '3306',
'username' = '<username>',
'password' = '<password>',
'database-name' = 'salesdb',
'table-name' = 'SALES_ORDER_ALL',
'scan.incremental.snapshot.enabled' = 'FALSE'
);
  1. Query the table you just created:
SELECT count(O.ORDER_ID) AS ORDER_COUNT,
C.CUST_ID,
C.NAME,
C.MKTSEGMENT
FROM   customer_cdc C
JOIN customer_site_cdc CS
ON C.CUST_ID = CS.CUST_ID
JOIN sales_order_all_cdc O
ON O.SITE_ID = CS.SITE_ID
GROUP  BY C.CUST_ID,
C.NAME,
C.MKTSEGMENT;

You will get a query result like following screenshot.

Flink-SQL-CDC-Test

Store processed data in Amazon S3 with metadata in the Data Catalog

As we’re ingesting the relational database data in Amazon RDS for MySQL, raw data may be updated or deleted. To support data update and delete, we can choose data lake technologies such as Apache Iceberg or Apache Hudi to store the processed data. As we mentioned earlier, Iceberg and Hudi have different catalog management. We show both scenarios to use Flink to read/write the Iceberg and Hudi tables with metadata in the AWS Glue Data Catalog.

For non-Iceberg and non-Hudi, we use a FileSystem Parquet file to show how the Flink built-in connector uses the Data Catalog.

Read/Write to Iceberg tables in Flink with metadata in Glue Data Catalog

The following diagram shows the architecture for this configuration.

Flink Glue Integration for Iceberg

  1. Create a Flink Iceberg catalog using the Data Catalog by specifying catalog-impl as org.apache.iceberg.aws.glue.GlueCatalog.

For more information about Flink and Data Catalog integration for Iceberg, refer to Glue Catalog.

  1. In the Flink SQL client CLI, run the following command, providing your S3 bucket name:
CREATE CATALOG glue_catalog_for_iceberg WITH (
'type'='iceberg',
'warehouse'='s3://<flink-glue-integration-bucket>/flink-glue-for-iceberg/warehouse/',
'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
'io-impl'='org.apache.iceberg.aws.s3.S3FileIO',
'lock-impl'='org.apache.iceberg.aws.glue.DynamoLockManager',
'lock.table'='FlinkGlue4IcebergLockTable' );
  1. Create an Iceberg table to store processed data:
USE CATALOG glue_catalog_for_iceberg;
CREATE DATABASE IF NOT EXISTS flink_glue_iceberg_db;
USE flink_glue_iceberg_db;
CREATE TABLE `glue_catalog_for_iceberg`.`flink_glue_iceberg_db`.`customer_summary` (
`CUSTOMER_ID` bigint,
`NAME` STRING,
`MKTSEGMENT` STRING,
`COUNTRY` STRING,
`ORDER_COUNT` BIGINT,
PRIMARY KEY (`CUSTOMER_ID`) NOT Enforced
)
WITH (
'format-version'='2',
'write.upsert.enabled'='true');
  1. Insert the processed data into Iceberg:
INSERT INTO `glue_catalog_for_iceberg`.`flink_glue_iceberg_db`.`customer_summary`
SELECT CAST(C.CUST_ID AS BIGINT) CUST_ID,
C.NAME,
C.MKTSEGMENT,
CS.COUNTRY,
count(O.ORDER_ID) AS ORDER_COUNT
FROM   `glue_catalog`.`flink_cdc_db`.`customer_cdc` C
JOIN `glue_catalog`.`flink_cdc_db`.`customer_site_cdc` CS
ON C.CUST_ID = CS.CUST_ID
JOIN `glue_catalog`.`flink_cdc_db`.`sales_order_all_cdc` O
ON O.SITE_ID = CS.SITE_ID
GROUP  BY C.CUST_ID,
C.NAME,
C.MKTSEGMENT,
CS.COUNTRY;

Read/Write to Hudi tables in Flink with metadata in Glue Data Catalog

The following diagram shows the architecture for this configuration.

Flink Glue Integration for Hudi

Complete the following steps:

  1. Create a catalog for Hudi to use the Hive catalog by specifying mode as hms.

Because we already configured Amazon EMR to use the Data Catalog when we created the EMR cluster, this Hudi Hive catalog uses the Data Catalog under the hood. For more information about Flink and Data Catalog integration for Hudi, refer to Create Catalog.

  1. In the Flink SQL client CLI, run the following command, providing your S3 bucket name:
CREATE CATALOG glue_catalog_for_hudi WITH (
'type' = 'hudi',
'mode' = 'hms',
'table.external' = 'true',
'default-database' = 'default',
'hive.conf.dir' = '/etc/hive/conf.dist',
'catalog.path' = 's3://<flink-glue-integration-bucket>/flink-glue-for-hudi/warehouse/'
);
  1. Create a Hudi table using the Data Catalog, and provide your S3 bucket name:
USE CATALOG glue_catalog_for_hudi;
CREATE DATABASE IF NOT EXISTS flink_glue_hudi_db;
use flink_glue_hudi_db;
CREATE TABLE `glue_catalog_for_hudi`.`flink_glue_hudi_db`.`customer_summary` (
`CUSTOMER_ID` bigint,
`NAME` STRING,
`MKTSEGMENT` STRING,
`COUNTRY` STRING,
`ORDER_COUNT` BIGINT,
PRIMARY KEY (`CUSTOMER_ID`) NOT Enforced
)
WITH (
'connector' = 'hudi',
'write.tasks' = '4',
'path' = 's3://<flink-glue-integration-bucket>/flink-glue-for-hudi/warehouse/customer_summary',
'table.type' = 'COPY_ON_WRITE',
'read.streaming.enabled' = 'true',
'read.streaming.check-interval' = '1'
);
  1. Insert the processed data into Hudi:
INSERT INTO `glue_catalog_for_hudi`.`flink_glue_hudi_db`.`customer_summary`
SELECT CAST(C.CUST_ID AS BIGINT) CUST_ID,
C.NAME,
C.MKTSEGMENT,
CS.COUNTRY,
count(O.ORDER_ID) AS ORDER_COUNT
FROM   `glue_catalog`.`flink_cdc_db`.`customer_cdc` C
JOIN `glue_catalog`.`flink_cdc_db`.`customer_site_cdc` CS
ON C.CUST_ID = CS.CUST_ID
JOIN `glue_catalog`.`flink_cdc_db`.`sales_order_all_cdc` O
ON O.SITE_ID = CS.SITE_ID
GROUP  BY C.CUST_ID,
C.NAME,
C.MKTSEGMENT,
CS.COUNTRY;

Read/Write to other storage format in Flink with metadata in Glue Data Catalog

The following diagram shows the architecture for this configuration.

Flink Glue Integration for Parquet

We already created the Flink Hive catalog in the previous step, so we’ll reuse that catalog.

  1. In the Flink SQL client CLI, run the following command:
USE CATALOG glue_catalog;
CREATE DATABASE IF NOT EXISTS flink_hive_parquet_db;
use flink_hive_parquet_db;

We change the SQL dialect to Hive to create a table with Hive syntax.

  1. Create a table with the following SQL, and provide your S3 bucket name:
SET table.sql-dialect=hive;

CREATE TABLE `customer_summary` (
`CUSTOMER_ID` bigint,
`NAME` STRING,
`MKTSEGMENT` STRING,
`COUNTRY` STRING,
`ORDER_COUNT` BIGINT
)
STORED AS parquet
LOCATION 's3://<flink-glue-integration-bucket>/flink-glue-for-hive-parquet/warehouse/customer_summary';

Because Parquet files don’t support updated rows, we can’t consume data from CDC data. However, we can consume data from Iceberg or Hudi.

  1. Use the following code to query the Iceberg table and insert data into the Parquet table:
SET table.sql-dialect=default;
SET execution.runtime-mode = batch;
INSERT INTO `glue_catalog`.`flink_hive_parquet_db`.`customer_summary`
SELECT * from `glue_catalog_for_iceberg`.`flink_glue_iceberg_db`.`customer_summary`;

Verify all table metadata is stored in the Data Catalog

You can navigate to the AWS Glue console to verify all the tables are stored in the Data Catalog.

  1. On the AWS Glue console, choose Databases in the navigation pane to list all the databases we created.

Glue-Databases

  1. Open a database and verify that all the tables are in that database.

Glue-Tables

Consume data with Athena or Amazon EMR Trino for business analysis

You can use Athena or Amazon EMR Trino to access the result data.

Query the data with Athena

To access the data with Athena, complete the following steps:

  1. Open the Athena query editor.
  2. Choose flink_glue_iceberg_db for Database.

You should see the customer_summary table listed.

  1. Run the following SQL script to query the Iceberg result table:
select * from customer_summary order by order_count desc limit 10

The query result will look like the following screenshot.

Athena-Iceberg-Query

  1. For the Hudi table, change Database to flink_glue_hudi_db and run the same SQL query.

Athena-Hudi-Query

  1. For the Parquet table, change Database to flink_hive_parquet_db and run the same SQL query.

Athena-Parquet-Query

Query the data with Amazon EMR Trino

To access Iceberg with Amazon EMR Trino, SSH to the EMR primary node.

  1. Run the following command to start the Trino CLI:
trino-cli --catalog iceberg

Amazon EMR Trino can now query the tables in the AWS Glue Data Catalog.

  1. Run the following command to query the result table:
show schemas;
use flink_glue_iceberg_db;
show tables;
select * from customer_summary order by order_count desc limit 10;

The query result looks like the following screenshot.

EMR-Trino-Iceberg-Query

  1. Exit the Trino CLI.
  2. Start the Trino CLI with the hive catalog to query the Hudi table:
trino-cli --catalog hive
  1. Run the following command to query the Hudi table:
show schemas;
use flink_glue_hudi_db;
show tables;
select * from customer_summary order by order_count desc limit 10;

Update and delete source records in Amazon RDS for MySQL and validate the reflection of the data lake tables

We can update and delete some records in the RDS for MySQL database and then validate that the changes are reflected in the Iceberg and Hudi tables.

  1. Connect to the RDS for MySQL database and run the following SQL:
update CUSTOMER set NAME = 'updated_name' where CUST_ID=7;

delete from CUSTOMER where CUST_ID=11;
  1. Query the customer_summary table with Athena or Amazon EMR Trino.

The updated and deleted records are reflected in the Iceberg and Hudi tables.

Athena-Iceberg-Query-Updated

Clean up

When you’re done with this exercise, complete the following steps to delete your resources and stop incurring costs:

  1. Delete the RDS for MySQL database.
  2. Delete the EMR cluster.
  3. Drop the databases and tables created in the Data Catalog.
  4. Remove files in Amazon S3.

Conclusion

This post showed you how to integrate Apache Flink in Amazon EMR with the AWS Glue Data Catalog. You can use a Flink SQL connector to read/write data in a different store, such as Kafka, CDC, HBase, Amazon S3, Iceberg, or Hudi. You can also store the metadata in the Data Catalog. The Flink table API has the same connector and catalog implementation mechanism. In a single session, we can use multiple catalog instances pointing to different types, like IcebergCatalog and HiveCatalog, and use then interchangeably in your query. You can also write code with the Flink table API to develop the same solution to integrate Flink and the Data Catalog.

In our solution, we consumed the RDS for MySQL binary log directly with Flink CDC. You can also use Amazon MSK Connect to consume the binary log with MySQL Debezim and store the data in Amazon Managed Streaming for Apache Kafka (Amazon MSK). Refer to Create a low-latency source-to-data lake pipeline using Amazon MSK Connect, Apache Flink, and Apache Hudi for more information.

With the Amazon EMR Flink unified batch and streaming data processing function, you can ingest and process data with one computing engine. With Apache Iceberg and Hudi integrated in Amazon EMR, you can build an evolvable and scalable data lake. With the AWS Glue Data Catalog, you can manage all enterprise data catalogs in a unified manner and consume data easily.

Follow the steps in this post to build your unified batch and streaming solution with Amazon EMR Flink and the AWS Glue Data Catalog. Please leave a comment if you have any questions.


About the Authors

Jianwei Li is Senior Analytics Specialist TAM. He provides consultant service for AWS enterprise support customers to design and build modern data platform.


Samrat Deb is Software Development Engineer at Amazon EMR. In his spare time, he love exploring new places, different culture and food.


Prabhu Josephraj is a Senior Software Development Engineer working for Amazon EMR. He is focused on leading the team that builds solutions in Apache Hadoop and Apache Flink. In his spare time, Prabhu enjoys spending time with his family.