Tag Archives: Apache Iceberg

Incremental Processing using Netflix Maestro and Apache Iceberg

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/incremental-processing-using-netflix-maestro-and-apache-iceberg-b8ba072ddeeb

by Jun He, Yingyi Zhang, and Pawan Dixit

Incremental processing is an approach to process new or changed data in workflows. The key advantage is that it only incrementally processes data that are newly added or updated to a dataset, instead of re-processing the complete dataset. This not only reduces the cost of compute resources but also reduces the execution time in a significant manner. When workflow execution has a shorter duration, chances of failure and manual intervention reduce. It also improves the engineering productivity by simplifying the existing pipelines and unlocking the new patterns.

In this blog post, we talk about the landscape and the challenges in workflows at Netflix. We will show how we are building a clean and efficient incremental processing solution (IPS) by using Netflix Maestro and Apache Iceberg. IPS provides the incremental processing support with data accuracy, data freshness, and backfill for users and addresses many of the challenges in workflows. IPS enables users to continue to use the data processing patterns with minimal changes.

Introduction

Netflix relies on data to power its business in all phases. Whether in analyzing A/B tests, optimizing studio production, training algorithms, investing in content acquisition, detecting security breaches, or optimizing payments, well structured and accurate data is foundational. As our business scales globally, the demand for data is growing and the needs for scalable low latency incremental processing begin to emerge. There are three common issues that the dataset owners usually face.

  • Data Freshness: Large datasets from Iceberg tables needed to be processed quickly and accurately to generate insights to enable faster product decisions. The hourly processing semantics along with valid–through-timestamp watermark or data signals provided by the Data Platform toolset today satisfies many use cases, but is not the best for low-latency batch processing. Before IPS, the Data Platform did not have a solution for tracking the state and progression of data sets as a single easy to use offering. This has led to a few internal solutions such as Psyberg. These internal libraries process data by capturing the changed partitions, which works only on specific use cases. Additionally, the libraries have tight coupling to the user business logic, which often incurs higher migration costs, maintenance costs, and requires heavy coordination with the Data Platform team.
  • Data Accuracy: Late arriving data causes datasets processed in the past to become incomplete and as a result inaccurate. To compensate for that, ETL workflows often use a lookback window, based on which they reprocess the data in that certain time window. For example, a job would reprocess aggregates for the past 3 days because it assumes that there would be late arriving data, but data prior to 3 days isn’t worth the cost of reprocessing.
  • Backfill: Backfilling datasets is a common operation in big data processing. This requires repopulating data for a historical time period which is before the scheduled processing. The need for backfilling could be due to a variety of factors, e.g. (1) upstream data sets got repopulated due to changes in business logic of its data pipeline, (2) business logic was changed in a data pipeline, (3) anew metric was created that needs to be populated for historical time ranges, (4) historical data was found missing, etc.

These challenges are currently addressed in suboptimal and less cost efficient ways by individual local teams to fulfill the needs, such as

  • Lookback: This is a generic and simple approach that data engineers use to solve the data accuracy problem. Users configure the workflow to read the data in a window (e.g. past 3 hours or 10 days). The window is set based on users’ domain knowledge so that users have a high confidence that the late arriving data will be included or will not matter (i.e. data arrives too late to be useful). It ensures the correctness with a high cost in terms of time and compute resources.
  • Foreach pattern: Users build backfill workflows using Maestro foreach support. It works well to backfill data produced by a single workflow. If the pipeline has multiple stages or many downstream workflows, users have to manually create backfill workflows for each of them and that requires significant manual work.

The incremental processing solution (IPS) described here has been designed to address the above problems. The design goal is to provide a clean and easy to adopt solution for the Incremental processing to ensure data freshness, data accuracy, and to provide easy backfill support.

  • Data Freshness: provide the support for scheduling workflows in a micro batch fashion (e.g. 15 min interval) with state tracking functionality
  • Data Accuracy: provide the support to process all late arriving data to achieve data accuracy needed by the business with significantly improved performance in terms of multifold time and cost efficiency
  • Backfill: provide managed backfill support to build, monitor, and validate the backfill, including automatically propagating changes from upstream to downstream workflows, to greatly improve engineering productivity (i.e. a few days or weeks of engineering work to build backfill workflows vs one click for managed backfill)

Approach Overview

General Concept

Incremental processing is an approach to process data in batch — but only on new or changed data. To support incremental processing, we need an approach for not only capturing incremental data changes but also tracking their states (i.e. whether a change is processed by a workflow or not). It must be aware of the change and can capture the changes from the source table(s) and then keep tracking those changes. Here, changes mean more than just new data itself. For example, a row in an aggregation target table needs all the rows from the source table associated with the aggregation row. Also, if there are multiple source tables, usually the union of the changed data ranges from all input tables gives the full change data set. Thus, change information captured must include all related data including those unchanged rows in the source table as well. Due to previously mentioned complexities, change tracking cannot be simply achieved by using a single watermark. IPS has to track those captured changes in finer granularity.

The changes from the source tables might affect the transformed result in the target table in various ways.

  • If one row in the target table is derived from one row in the source table, newly captured data change will be the complete input dataset for the workflow pipeline.
  • If one row in the target table is derived from multiple rows in the source table, capturing new data will only tell us the rows have to be re-processed. But the dataset needed for ETL is beyond the change data itself. For example, an aggregation based on account id requires all rows from the source table about an account id. The change dataset will tell us which account ids are changed and then the user business logic needs to load all data associated with those account ids found in the change data.
  • If one row in the target table is derived based on the data beyond the changed data set, e.g. joining source table with other tables, newly captured data is still useful and can indicate a range of data to be affected. Then the workflow will re-process the data based on the range. For example, assuming we have a table that keeps the accumulated view time for a given account partitioned by the day. If the view time 3-days ago is updated right now due to late arriving data, then the view time for the following two days has to be re-calculated for this account. In this case, the captured late arriving data will tell us the start of the re-calculation, which is much more accurate than recomputing everything for the past X days by guesstimate, where X is a cutoff lookback window decided by business domain knowledge.

Once the change information (data or range) is captured, a workflow has to write the data to the target table in a slightly more complicated way because the simple INSERT OVERWRITE mechanism won’t work well. There are two alternatives:

  • Merge pattern: In some compute frameworks, e.g. Spark 3, it supports MERGE INTO to allow new data to be merged into the existing data set. That solves the write problem for incremental processing. Note that the workflow/step can be safely restarted without worrying about duplicate data being inserted when using MERGE INTO.
  • Append pattern: Users can also use append only write (e.g. INSERT INTO) to add the new data to the existing data set. Once the processing is completed, the append data is committed to the table. If users want to re-run or re-build the data set, they will run a backfill workflow to completely overwrite the target data set (e.g. INSERT OVERWRITE).

Additionally, the IPS will naturally support the backfill in many cases. Downstream workflows (if there is no business logic change) will be triggered by the data change due to backfill. This enables auto propagation of backfill data in multi-stage pipelines. Note that the backfill support is skipped in this blog. We will talk about IPS backfill support in another following blog post.

Netflix Maestro

Maestro is the Netflix data workflow orchestration platform built to meet the current and future needs of Netflix. It is a general-purpose workflow orchestrator that provides a fully managed workflow-as-a-service (WAAS) to the data platform users at Netflix. It serves thousands of users, including data scientists, data engineers, machine learning engineers, software engineers, content producers, and business analysts, in various use cases. Maestro is highly scalable and extensible to support existing and new use cases and offers enhanced usability to end users.

Since the last blog on Maestro, we have migrated all the workflows to it on behalf of users with minimal interruption. Maestro has been fully deployed in production with 100% workload running on it.

IPS is built upon Maestro as an extension by adding two building blocks, i.e. a new trigger mechanism and step job type, to enable incremental processing for all workflows. It is seamlessly integrated into the whole Maestro ecosystem with minimal onboarding cost.

Apache Iceberg

Iceberg is a high-performance format for huge analytic tables. Iceberg brings the reliability and simplicity of SQL tables to big data, while making it possible for engines like Spark, Trino, Flink, Presto, Hive and Impala to safely work with the same tables, at the same time. It supports expressive SQL, full schema evolution, hidden partitioning, data compaction, and time travel & rollback. In the IPS, we leverage the rich features provided by Apache Iceberg to develop a lightweight approach to capture the table changes.

Incremental Change Capture Design

Using Netflix Maestro and Apache Iceberg, we created a novel solution for incremental processing, which provides the incremental change (data and range) capture in a super lightweight way without copying any data. During our exploration, we see a huge opportunity to improve cost efficiency and engineering productivity using incremental processing.

Here is our solution to achieve incremental change capture built upon Apache Iceberg features. As we know, an iceberg table contains a list of snapshots with a set of metadata data. Snapshots include references to the actual immutable data files. A snapshot can contain data files from different partitions.

Design to achieve incremental change capture built upon Apache Iceberg features

The graph above shows that s0 contains data for Partition P0 and P1 at T1. Then at T2, a new snapshot s1 is committed to the table with a list of new data files, which includes late arriving data for partition P0 and P1 and data for P2.

We implemented a lightweight approach to create an iceberg table (called ICDC table), which has its own snapshot but only includes the new data file references from the original table without copying the data files. It is highly efficient with a low cost. Then workflow pipelines can just load the ICDC table to process only the change data from partition P0, P1, P2 without reprocessing the unchanged data in P0 and P1. Meanwhile, the change range is also captured for the specified data field as the Iceberg table metadata contains the upper and lower bound information of each data field for each data file. Moreover, IPS will track the changes in data file granularity for each workflow.

This lightweight approach is seamlessly integrated with Maestro to allow all (thousands) scheduler users to use this new building block (i.e. incremental processing) in their tens of thousands of workflows. Each workflow using IPS will be injected with a table parameter, which is the table name of the lightweight ICDC table. The ICDC table contains only the change data. Additionally, if the workflow needs the change range, a list of parameters will be injected to the user workflow to include the change range information. The incremental processing can be enabled by a new step job type (ICDC) and/or a new incremental trigger mechanism. Users can use them together with all existing Maestro features, e.g. foreach patterns, step dependencies based on valid–through-timestamp watermark, write-audit-publish templatized pattern, etc.

Main Advantages

With this design, user workflows can adopt incremental processing with very low efforts. The user business logic is also decoupled from the IPS implementation. Multi-stage pipelines can also mix the incremental processing workflows with existing normal workflows. We also found that user workflows can be simplified after using IPS by removing additional steps to handle the complexity of the lookback window or calling some internal libraries.

Adding incremental processing features into Netflix Maestro as new features/building blocks for users will enable users to build their workflows in a much more efficient way and bridge the gaps to solve many challenging problems (e.g. dealing with late arriving data) in a much simpler way.

Emerging Incremental Processing Patterns

While onboarding user pipelines to IPS, we have discovered a few incremental processing patterns:

Incrementally process the captured incremental change data and directly append them to the target table

Incrementally process the captured incremental change data and directly append them to the target table

This is the straightforward incremental processing use case, where the change data carries all the information needed for the data processing. Upstream changes (usually from a single source table) are propagated to the downstream (usually another target table) and the workflow pipeline only needs to process the change data (might join with other dimension tables) and then merge into (usually append) to the target table. This pattern will replace lookback window patterns to take care of late arriving data. Instead of overwriting past X days of data completely by using a lookback window pattern, user workflows just need to MERGE the change data (including late arriving data) into the target table by processing the ICDC table.

Use captured incremental change data as the row level filter list to remove unnecessary transformation

Use captured incremental change data as the row level filter list to remove unnecessary transformation

ETL jobs usually need to aggregate data based on certain group-by keys. Change data will disclose all the group-by keys that require a re-aggregation due to the new landing data from the source table(s). Then ETL jobs can join the original source table with the ICDC table on those group-by keys by using ICDC as a filter to speed up the processing to enable calculations of a much smaller set of data. There is no change to business transform logic and no re-design of ETL workflow. ETL pipelines keep all the benefits of batch workflows.

Use the captured range parameters in the business logic

Use the captured range parameters in the business logic

This pattern is usually used in complicated use cases, such as joining multiple tables and doing complex processings. In this case, the change data do not give the full picture of the input needed by the ETL workflow. Instead, the change data indicates a range of changed data sets for a specific set of fields (might be partition keys) in a given input table or usually multiple input tables. Then, the union of the change ranges from all input tables gives the full change data set needed by the workflow. Additionally, the whole range of data usually has to be overwritten because the transformation is not stateless and depends on the outcome result from the previous ranges. Another example is that the aggregated record in the target table or window function in the query has to be updated based on the whole data set in the partition (e.g. calculating a medium across the whole partition). Basically, the range derived from the change data indicates the dataset to be re-processed.

Use cases

Data workflows at Netflix usually have to deal with late arriving data which is commonly solved by using lookback window pattern due to its simplicity and ease of implementation. In the lookback pattern, the ETL pipeline will always consume the past X number of partition data from the source table and then overwrite the target table in every run. Here, X is a number decided by the pipeline owners based on their domain expertise. The drawback is the cost of computation and execution time. It usually costs almost X times more than the pipeline without considering late arriving data. Given the fact that the late arriving data is sparse, the majority of the processing is done on the data that have been already processed, which is unnecessary. Also, note that this approach is based on domain knowledge and sometimes is subject to changes of the business environment or the domain expertise of data engineers. In certain cases, it is challenging to come up with a good constant number.

Below, we will use a two-stage data pipeline to illustrate how to rebuild it using IPS to improve the cost efficiency. We will observe a significant cost reduction (> 80%) with little changes in the business logic. In this use case, we will set the lookback window size X to be 14 days, which varies in different real pipelines.

Original Data Pipeline with Lookback Window

Original data pipeline with lookback window
  • playback_table: an iceberg table holding playback events from user devices ingested by streaming pipelines with late arriving data, which is sparse, only about few percents of the data is late arriving.
  • playback_daily_workflow: a daily scheduled workflow to process the past X days playback_table data and write the transformed data to the target table for the past X days
  • playback_daily_table: the target table of the playback_daily_workflow and get overwritten every day for the past X days
  • playback_daily_agg_workflow: a daily scheduled workflow to process the past X days’ playback_daily_table data and write the aggregated data to the target table for the past X days
  • playback_daily_agg_table: the target table of the playback_daily_agg_workflow and get overwritten every day for the past 14 days.

We ran this pipeline in a sample dataset using the real business logic and here is the average execution result of sample runs

  • The first stage workflow takes about 7 hours to process playback_table data
  • The second stage workflow takes about 3.5 hours to process playback_daily_table data

New Data Pipeline with Incremental Processing

Using IPS, we rewrite the pipeline to avoid re-processing data as much as possible. The new pipeline is shown below.

New data pipeline with incremental processing

Stage 1:

  • ips_playback_daily_workflow: it is the updated version of playback_daily_workflow.
  • The workflow spark sql job then reads an incremental change data capture (ICDC) iceberg table (i.e. playback_icdc_table), which only includes the new data added into the playback_table. It includes the late arriving data but does not include any unchanged data from playback_table.
  • The business logic will replace INSERT OVERWRITE by MERGE INTO SQL query and then the new data will be merged into the playback_daily_table.

Stage 2:

  • IPS captures the changed data of playback_daily_table and also keeps the change data in an ICDC source table (playback_daily_icdc_table). So we don’t need to hard code the lookback window in the business logic. If there are only Y days having changed data in playback_daily_table, then it only needs to load data for Y days.
  • In ips_playback_daily_agg_workflow, the business logic will be the same for the current day’s partition. We then need to update business logic to take care of late arriving data by
  • JOIN the playback_daily table with playback_daily_icdc_table on the aggregation group-by keys for the past 2 to X days, excluding the current day (i.e. day 1)
  • Because late arriving data is sparse, JOIN will narrow down the playback_daily_table data set so as to only process a very small portion of it.
  • The business logic will use MERGE INTO SQL query then the change will be propagated to the downstream target table
  • For the current day, the business logic will be the same and consume the data from playback_daily_table and then write the outcome to the target table playback_daily_agg_table using INSERT OVERWRITE because there is no need to join with the ICDC table.

With these small changes, the data pipeline efficiency is greatly improved. In our sample run,

  • The first stage workflow takes just about 30 minutes to process X day change data from playback_table.
  • The second stage workflow takes about 15 minutes to process change data between day 2 to day X from playback_daily_table by joining with playback_daily_cdc_table data and takes another 15 minutes to process the current day (i.e. day 1) playback_daily_table change data.

Here the spark job settings are the same in original and new pipelines. So in total, the new IPS based pipeline overall needs around 10% of resources (measured by the execution time) to finish.

Looking Forward

We will improve IPS to support more complicated cases beyond append-only cases. IPS will be able to keep track of the progress of the table changes and support multiple Iceberg table change types (e.g. append, overwrite, etc.). We will also add managed backfill support into IPS to help users to build, monitor, and validate the backfill.

We are taking Big Data Orchestration to the next level and constantly solving new problems and challenges, please stay tuned. If you are motivated to solve large scale orchestration problems, please join us.

Acknowledgements

Thanks to our Product Manager Ashim Pokharel for driving the strategy and requirements. We’d also like to thank Andy Chu, Kyoko Shimada, Abhinaya Shetty, Bharath Mummadisetty, John Zhuge, Rakesh Veeramacheneni, and other stunning colleagues at Netflix for their suggestions and feedback while developing IPS. We’d also like to thank Prashanth Ramdas, Eva Tse, Charles Smith, and other leaders of Netflix engineering organizations for their constructive feedback and suggestions on the IPS architecture and design.


Incremental Processing using Netflix Maestro and Apache Iceberg was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Migrate an existing data lake to a transactional data lake using Apache Iceberg

Post Syndicated from Rajdip Chaudhuri original https://aws.amazon.com/blogs/big-data/migrate-an-existing-data-lake-to-a-transactional-data-lake-using-apache-iceberg/

A data lake is a centralized repository that you can use to store all your structured and unstructured data at any scale. You can store your data as-is, without having to first structure the data and then run different types of analytics for better business insights. Over the years, data lakes on Amazon Simple Storage Service (Amazon S3) have become the default repository for enterprise data and are a common choice for a large set of users who query data for a variety of analytics and machine leaning use cases. Amazon S3 allows you to access diverse data sets, build business intelligence dashboards, and accelerate the consumption of data by adopting a modern data architecture or data mesh pattern on Amazon Web Services (AWS).

Analytics use cases on data lakes are always evolving. Oftentimes, you want to continuously ingest data from various sources into a data lake and query the data concurrently through multiple analytics tools with transactional capabilities. But traditionally, data lakes built on Amazon S3 are immutable and don’t provide the transactional capabilities needed to support changing use cases. With changing use cases, customers are looking for ways to not only move new or incremental data to data lakes as transactions, but also to convert existing data based on Apache Parquet to a transactional format. Open table formats, such as Apache Iceberg, provide a solution to this issue. Apache Iceberg enables transactions on data lakes and can simplify data storage, management, ingestion, and processing.

In this post, we show you how you can convert existing data in an Amazon S3 data lake in Apache Parquet format to Apache Iceberg format to support transactions on the data using Jupyter Notebook based interactive sessions over AWS Glue 4.0.

Existing Parquet to Iceberg migration

There are two broad methods to migrate the existing data in a data lake in Apache Parquet format to Apache Iceberg format to convert the data lake to a transactional table format.

In-place data upgrade

In an in-place data migration strategy, existing datasets are upgraded to Apache Iceberg format without first reprocessing or restating existing data. This means the data files in the data lake aren’t modified during the migration and all Apache Iceberg metadata files (manifests, manifest files, and table metadata files) are generated outside the purview of the data. In this method, the metadata are recreated in an isolated environment and colocated with the existing data files. This can be a much less expensive operation compared to rewriting all the data files. The existing data file format must be Apache Parquet, Apache ORC, or Apache Avro. An in-place migration can be performed in either of two ways:

  1. Using add_files: This procedure adds existing data files to an existing Iceberg table with a new snapshot that includes the files. Unlike migrate or snapshot, add_files can import files from a specific partition or partitions and doesn’t create a new Iceberg table. This procedure doesn’t analyze the schema of the files to determine if they match the schema of the Iceberg table. Upon completion, the Iceberg table treats these files as if they are part of the set of files owned by Apache Iceberg.
  2. Using migrate: This procedure replaces a table with an Apache Iceberg table loaded with the source’s data files. The table’s schema, partitioning, properties, and location are copied from the source table. Supported formats are Avro, Parquet, and ORC. By default, the original table is retained with the name table_BACKUP_. However, to leave the original table intact during the process, you must use snapshot to create a new temporary table that has the same source data files and schema.

In this post, we show you how you can use the Iceberg add_files procedure for an in-place data upgrade. Note that the migrate procedure isn’t supported in AWS Glue Data Catalog.

The following diagram shows a high-level representation.

CTAS migration of data

The create table as select (CTAS) migration approach is a technique where all the metadata information for Iceberg is generated along with restating all the data files. This method shadows the source dataset in batches. When the shadow is caught up, you can swap the shadowed dataset with the source.

The following diagram showcases the high-level flow.

Prerequisites

To follow along with the walkthrough, you must have the following:

You can check the data size using the following code in the AWS CLI or AWS CloudShell:

//Run this command to check the data size

aws s3 ls --summarize --human-readable --recursive s3://noaa-ghcn-pds/parquet/by_year/YEAR=2023

As of writing this post, there are 107 objects with total size of 70 MB for year 2023 in the Amazon S3 path.

Note that to implement the solution, you must complete a few preparatory steps.

Deploy resources using AWS CloudFormation

Complete the following steps to create the S3 bucket and the AWS IAM role and policy for the solution:

  1. Sign in to your AWS account and then choose Launch Stack to launch the CloudFormation template.

  1. For Stack name, enter a name.
  2. Leave the parameters at the default values. Note that if the default values are changed, then you must make corresponding changes throughout the following steps.
  3. Choose Next to create your stack.

This AWS CloudFormation template deploys the following resources:

  • An S3 bucket named demo-blog-post-XXXXXXXX (XXXXXXXX represents the AWS account ID used).
  • Two folders named parquet and iceberg under the bucket.
  • An IAM role and a policy named demoblogpostrole and demoblogpostscoped respectively.
  • An AWS Glue database named ghcn_db.
  • An AWS Glue Crawler named demopostcrawlerparquet.

After the the AWS CloudFormation template is successfully deployed:

  1. Copy the data in the created S3 bucket using following command in AWS CLI or AWS CloudShell. Replace XXXXXXXX appropriately in the target S3 bucket name.
    Note: In the example, we copy data only for the year 2023. However, you can work with the entire dataset, following the same instructions.

    aws s3 sync s3://noaa-ghcn-pds/parquet/by_year/YEAR=2023/ s3://demo-blog-post-XXXXXXXX/parquet/year=2023

  2. Open the AWS Management Console and go to the AWS Glue console.
  3. On the navigation pane, select Crawlers.
  4. Run the crawler named demopostcrawlerparquet.
  5. After the AWS Glue crawler demopostcrawlerparquet is successfully run, the metadata information of the Apache Parquet data will be cataloged under the ghcn_db AWS Glue database with the table name source_parquet. Notice that the table is partitioned over year and element columns (as in the S3 bucket).

  1. Use the following query to verify the data from the Amazon Athena console. If you’re using Amazon Athena for the first time in your AWS Account, set up a query result location in Amazon S3.
    SELECT * FROM ghcn_db.source_parquet limit 10;

Launch an AWS Glue Studio notebook for processing

For this post, we use an AWS Glue Studio notebook. Follow the steps in Getting started with notebooks in AWS Glue Studio to set up the notebook environment. Launch the notebooks hosted under this link and unzip them on a local workstation.

  1. Open AWS Glue Studio.
  2. Choose ETL Jobs.
  3. Choose Jupyter notebook and then choose Upload and edit an existing notebook. From Choose file, select required ipynb file and choose Open, then choose Create.
  4. On the Notebook setup page, for Job name, enter a logical name.
  5. For IAM role, select demoblogpostrole. Choose Create job. After a minute, the Jupyter notebook editor appears. Clear all the default cells.

The preceding steps launch an AWS Glue Studio notebook environment. Make sure you Save the notebook every time it’s used.

In-place data upgrade

In this section we show you how you can use the add_files procedure to achieve an in-place data upgrade. This section uses the ipynb file named demo-in-place-upgrade-addfiles.ipynb. To use with the add_files procedure, complete the following:

  1. On the Notebook setup page, for Job name, enter demo-in-place-upgrade for the notebook session as explained in Launch Glue notebook for processing.
  2. Run the cells under the section Glue session configurations. Provide the S3 bucket name from the prerequisites for the bucket_name variable by replacing XXXXXXXX.
  3. Run the subsequent cells in the notebook.

Notice that the cell under Execute add_files procedure section performs the metadata creation in the mentioned path.

Review the data file paths for the new Iceberg table. To show an Iceberg table’s current data files, .files can be used to get details such as file_path, partition, and others. Recreated files are pointing to the source path under Amazon S3.

Note the metadata file location after transformation. It’s pointing to the new folder named iceberg under Amazon S3. This can be checked using .snapshots to check Iceberg tables’ snapshot file location. Also, check the same in the Amazon S3 URI s3://demo-blog-post-XXXXXXXX/iceberg/ghcn_db.db/target_iceberg_add_files/metadata/. Also notice that there are two versions of the manifest list created after the add_files procedure has been run. The first is an empty table with the data schema and the second is adding the files.

The table is cataloged in AWS Glue under the database ghcn_db with the table type as ICEBERG.

Compare the count of records using Amazon Athena between the source and target table. They are the same.

In summary, you can use the add_files procedure to convert existing data files in Apache Parquet format in a data lake to Apache Iceberg format by adding the metadata files and without recreating the table from scratch. Following are some pros and cons of this method.

Pros

  • Avoids full table scans to read the data as there is no restatement. This can save time.
  • If there are any errors during while writing the metadata, only a metadata re-write is required and not the entire data.
  • Lineage of the existing jobs is maintained because the existing catalog still exists.

Cons

  • If data is processed (inserts, updates, and deletes) in the dataset during the metadata writing process, the process must be run again to include the new data.
  • There must be write downtime to avoid having to run the process a second time.
  • If a data restatement is required, this workflow will not work as source data files aren’t modified.

CTAS migration of data

This section uses the ipynb file named demo-ctas-upgrade.ipynb. Complete the following:

  1. On the Notebook setup page, for Job name, enter demo-ctas-upgrade for the notebook session as explained under Launch Glue notebook for processing.
  2. Run the cells under the section Glue session configurations. Provide the S3 bucket name from the prerequisites for the bucket_name variable by replacing XXXXXXXX.
  3. Run the subsequent cells in the notebook.

Notice that the cell under Create Iceberg table from Parquet section performs the shadow upgrade to Iceberg format. Note that Iceberg requires sorting the data according to table partitions before writing to the Iceberg table. Further details can be found in Writing Distribution Modes.

Notice the data and metadata file paths for the new Iceberg table. It’s pointing to the new path under Amazon S3. Also, check under the Amazon S3 URI s3://demo-blog-post-XXXXXXXX/iceberg/ghcn_db.db/target_iceberg_ctas/ used for this post.

The table is cataloged under AWS Glue under the database ghcn_db with the table type as ICEBERG.

Compare the count of records using Amazon Athena between the source and target table. They are same.

In summary, the CTAS method creates a new table by generating all the metadata files along with restating the actual data. Following are some pros and cons of this method:

Pros

  • It allows you to audit and validate the data during the process because data is restated.
  • If there are any runtime issues during the migration process, rollback and recovery can be easily performed by deleting the Apache Iceberg table.
  • You can test different configurations when migrating a source. You can create a new table for each configuration and evaluate the impact.
  • Shadow data is renamed to a different directory in the source (so it doesn’t collide with old Apache Parquet data).

Cons

  • Storage of the dataset is doubled during the process as both the original Apache Parquet and new Apache Iceberg tables are present during the migration and testing phase. This needs to be considered during cost estimation.
  • The migration can take much longer (depending on the volume of the data) because both data and metadata are written.
  • It’s difficult to keep tables in sync if there changes to the source table during the process.

Clean up

To avoid incurring future charges, and to clean up unused roles and policies, delete the resources you created: the datasets, CloudFormation stack, S3 bucket, AWS Glue job, AWS Glue database, and AWS Glue table.

Conclusion

In this post, you learned strategies for migrating existing Apache Parquet formatted data to Apache Iceberg in Amazon S3 to convert to a transactional data lake using interactive sessions in AWS Glue 4.0 to complete the processes. If you have an evolving use case where an existing data lake needs to be converted to a transactional data lake based on Apache Iceberg table format, follow the guidance in this post.

The path you choose for this upgrade, an in-place upgrade or CTAS migration, or a combination of both, will depend on careful analysis of the data architecture and data integration pipeline. Both pathways have pros and cons, as discussed. At a high level, this upgrade process should go through multiple well-defined phases to identify the patterns of data integration and use cases. Choosing the correct strategy will depend on your requirements—such as performance, cost, data freshness, acceptable downtime during migration, and so on.


About the author

Rajdip Chaudhuri is a Senior Solutions Architect with Amazon Web Services specializing in data and analytics. He enjoys working with AWS customers and partners on data and analytics requirements. In his spare time, he enjoys soccer and movies.

Apache Iceberg optimization: Solving the small files problem in Amazon EMR

Post Syndicated from Avijit Goswami original https://aws.amazon.com/blogs/big-data/apache-iceberg-optimization-solving-the-small-files-problem-in-amazon-emr/

In our previous post Improve operational efficiencies of Apache Iceberg tables built on Amazon S3 data lakes, we discussed how you can implement solutions to improve operational efficiencies of your Amazon Simple Storage Service (Amazon S3) data lake that is using the Apache Iceberg open table format and running on the Amazon EMR big data platform. Iceberg tables store metadata in manifest files. As the number of data files increase, the amount of metadata stored in these manifest files also increases, leading to longer query planning time. The query runtime also increases because it’s proportional to the number of data or metadata file read operations. Compaction is the process of combining these small data and metadata files to improve performance and reduce cost. Compaction also gets rid of deleting files by applying deletes and rewriting a new file without deleting records. Currently, Iceberg provides a compaction utility that compacts small files at a table or partition level. But this approach requires you to implement the compaction job using your preferred job scheduler or manually triggering the compaction job.

In this post, we discuss the new Iceberg feature that you can use to automatically compact small files while writing data into Iceberg tables using Spark on Amazon EMR or Amazon Athena.

Use cases for processing small files

Streaming applications are prone to creating a large number of small files, which can negatively impact the performance of subsequent processing times. For example, consider a critical Internet of Things (IoT) sensor from a cold storage facility that is continuously sending temperature and health data into an S3 data lake for downstream data processing and triggering actions like emergency maintenance. Systems of this nature generate a huge number of small objects and need attention to compact them to a more optimal size for faster reading, such as 128 MB, 256 MB, or 512 MB. In this post, we show you a streaming sensor data use case with a large number of small files and the mitigation steps using the Iceberg open table format. For more information on streaming applications on AWS, refer to Real-time Data Streaming and Analytics.

Streaming Architecture

Solution overview

To compact the small files for improved performance, in this example, Amazon EMR triggers a compaction job after the write commit as a post-commit hook when defined thresholds (for example, number of commits) are met. By default, Amazon EMR waits for 10 commits to trigger the post-commit hook compaction utility.

This Iceberg event-based table management feature lets you monitor table activities during writes to make better decisions about how to manage each table differently based on events. As of this writing, only the optimize-data optimization is supported. To learn more about the available optimize data executors and catalog properties, refer to the README file in the GitHub repo.

To use the feature, you can use the iceberg-aws-event-based-table-management source code and provide the built JAR in the engine’s class-path. The following bootstrap action can place the JAR in the engine’s class-path:

sudo aws s3 cp s3://<path>/iceberg-aws-event-based-table-management-0.1.jar /usr/lib/spark/jars/

Note that the Iceberg AWS event-based table management feature works with Iceberg v1.2.0 and above (available from Amazon EMR 6.11.0).

In some use cases, you may want to run the event-based compaction jobs in a different EMR cluster in order to avoid any impact to the ETL jobs running in their current EMR cluster. You can get the metadata, including the cluster ID of your current ETL workflows, from the /mnt/var/lib/info/job-flow.json file and then use a different cluster to process the event-based compactions.

The notebook examples shown in the following sections are also available in the aws-samples GitHub repo.

Prerequisite

For this performance comparison exercise between a Spark external table and an Iceberg table and Iceberg with compaction, we generate a significant number of small files in Parquet format and store them in an S3 bucket. We used the Amazon Kinesis Data Generator (KDG) tool to generate sample sensor data information using the following template:

{"sensorId": {{random.number(5000)}},
 "currentTemperature": {{random.number(
        {
            "min":10,
            "max":150
        }
  )}},
 "status": "{{random.arrayElement(
        ["OK","FAIL","WARN"]
    )}}",
 "date_ts": "{{date.now("YYYY-MM-DD HH:mm:ss")}}"
}

We configured an Amazon Kinesis Data Firehose delivery stream and sent the generated data into a staging S3 bucket. Then we ran an AWS Glue extract, transform, and load (ETL) job to convert the JSON files into Parquet format. For our testing, we generated about 58,176 small objects with total size of 2 GB.

For running the Amazon EMR tests, we used Amazon EMR version emr-6.11.0 with Spark 3.3.2, and JupyterEnterpriseGateway 2.6.0. The cluster used had one primary node (r5.2xlarge) and two core nodes (r5.xlarge). We used a bootstrap action during cluster creation to enable event-based table management:

sudo aws s3 cp s3://<path>/iceberg-aws-event-based-table-management-0.1.jar /usr/lib/spark/jars/

Also, refer to our guidance on how to use an Iceberg cluster with Spark, which is a prerequisite for this exercise.

As part of the exercise, we see new steps are being added to the EMR cluster to trigger the compaction jobs. To enable adding new steps to the running cluster, we add the elasticmapreduce:AddJobFlowSteps action to the cluster’s default role, EMR_EC2_DefaultRole, as a prerequisite.

Performance of Iceberg reads with the compaction utility on Amazon EMR

In the following steps, we demonstrate how to use the compaction utility and what performance benefits you can achieve. We use an EMR notebook to demonstrate the benefits of the compaction utility. For instructions to set up an EMR notebook, refer to Amazon EMR Studio overview.

First, you configure your Spark session using the %%configure magic command. We use the Hive catalog for Iceberg tables.

  1. Before you run the following step, create an Amazon S3 bucket in your AWS account called <your-iceberg-storage-blog>. To check how to create an Amazon S3 bucket, follow the instructions given here. Update the your-iceberg-storage-blog bucket name in the following configuration with the actual bucket name you created to test this example:
    %%configure -f
    {
    "conf":{
        "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
        "spark.sql.catalog.dev":"org.apache.iceberg.spark.SparkCatalog",
        "spark.sql.catalog.dev.catalog-impl":"org.apache.iceberg.aws.glue.GlueCatalog",
        "spark.sql.catalog.dev.io-impl":"org.apache.iceberg.aws.s3.S3FileIO",
        "spark.sql.catalog.dev.warehouse":"s3://<your-iceberg-storage-blog>/iceberg/"
        }
    }

  2. Create a new database for the Iceberg table in the AWS Glue Data Catalog named DB and provide the S3 URI specified in the Spark config as s3://<your-iceberg-storage-blog>/iceberg/db. Also, create another Database named iceberg_db in Glue for the parquet tables. Follow the instructions given in Working with databases on the AWS Glue console to create your Glue databases. Then create a new Spark table in Parquet format pointing to the bucket containing small objects in your AWS account. See the following code:
    spark.sql(""" CREATE TABLE iceberg_db.sensor_data_parquet_table (
        sensorid int,
        currenttemperature int,
        status string,
        date_ts timestamp)
    USING parquet
    location 's3://<your-bucket-with-parquet-files>/'
    """)

  3. Run an aggregate SQL to measure the performance of Spark SQL on the Parquet table with 58,176 small objects:
    spark.sql(""" select maxtemp, mintemp, avgtemp from
    (select
    max(currenttemperature) as maxtemp,
    min(currenttemperature) as mintemp,
    avg(currenttemperature) as avgtemp
    from iceberg_db.sensor_data_parquet_table
    where month(date_ts) between 2 and 10
    order by maxtemp, mintemp, avgtemp)""").show()

In the following steps, we create a new Iceberg table from the Spark/Parquet table using CTAS (Create Table As Select). Then we show how the automated compaction job can help improve query performance.

  1. Create a new Iceberg table using CTAS from the earlier AWS Glue table with the small files:
    spark.sql(""" CREATE TABLE dev.db.sensor_data_iceberg_format USING iceberg AS (SELECT * FROM iceberg_db.sensor_data_parquet_table)""")

  2. Validate that a new Iceberg snapshot was created for the new table:
    spark.sql(""" Select * from dev.db.sensor_data_iceberg_format.snapshots limit 5""").show()

We have confirmed that our S3 folder corresponds to the newly created Iceberg table. It shows that during the CTAS statement, it added 1,879 objects in the new folder with a total size of 1.3 GB. We can conclude that Iceberg did some optimization while loading data from the Parquet table.

  1. Now that you have data in the Iceberg table, run the previous aggregation SQL to check the runtime:
    spark.sql(""" select maxtemp, mintemp, avgtemp from
    (select
    max(currenttemperature) as maxtemp,
    min(currenttemperature) as mintemp,
    avg(currenttemperature) as avgtemp
    from dev.db.sensor_data_iceberg_format
    where month(date_ts) between 2 and 10
    order by maxtemp, mintemp, avgtemp)""").show()

The runtime for the preceding query ran on the Iceberg table with 1,879 objects in 1 minute, 39 seconds. There is already some significant performance improvement by converting the external Parquet table to an Iceberg table.

  1. Now let’s add the configurations needed to apply the automatic compaction of small files in the Iceberg tables. Note the last four newly added configurations in the following statement. The parameter optimize-data.commit-threshold suggests that the compaction will take place after the first successful commit. The default is 10 successful commits to trigger the compaction.
    %%configure -f
    {
    "conf":{
        "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
        "spark.sql.catalog.dev":"org.apache.iceberg.spark.SparkCatalog",
        "spark.sql.catalog.dev.catalog-impl":"org.apache.iceberg.aws.glue.GlueCatalog",
        "spark.sql.catalog.dev.io-impl":"org.apache.iceberg.aws.s3.S3FileIO",
        "spark.sql.catalog.dev.warehouse":"s3://<your-iceberg-storage-blog>/iceberg/",
        "spark.sql.catalog.dev.metrics-reporter-impl":"org.apache.iceberg.aws.manage.AwsTableManagementMetricsEvaluator",
        "spark.sql.catalog.dev.optimize-data.impl":"org.apache.iceberg.aws.manage.EmrOnEc2OptimizeDataExecutor",
        "spark.sql.catalog.dev.optimize-data.emr.cluster-id":"j-1N8J5NZI0KEU3",
        "spark.sql.catalog.dev.optimize-data.commit-threshold":"1"
        }
    }

  2. Run a quick sanity check to confirm that the configurations are working fine with Spark SQL.

  1. 10. To activate the automatic compaction process, add a new record to the existing Iceberg table using a Spark insert:
    spark.sql(""" Insert into dev.db.sensor_data_iceberg_format values(999123, 86, 'PASS', timestamp'2023-07-26 12:50:25') """)

  2. Navigate to the Amazon EMR console to check the cluster steps.

You should see a new step added that goes from Pending to Running and finally the Completed state. Every time the data in the Iceberg table is updated or inserted, based on configuration optimize-data.commit-threshold, the optimize job will automatically trigger to compact the underlying data.

  1. Validate that the record insert was successful.

  1. Check the snapshot table to see that a new snapshot is created for the table with the operation replace.

For every successful run of the background optimize job, a new entry will be added to the snapshot table.

  1. On the Amazon S3 console, navigate to the folder corresponding to the Iceberg table and see that the data files are compacted.

In our case, it was compacted from the previous smaller sizes to approximately 437 MB. The folder will still contain the previous smaller files for time travel unless you issue an expire snapshot command to remove them.

  1. Now you can run the same aggregate query and record the performance after the compaction.

Summary of Amazon EMR testing

The runtime for the preceding aggregation query on the compacted Iceberg table reduced to approximately 59 seconds from the previous runtime of 1 minute, 39 seconds. That is about a 40% improvement. The more small files you have in your source bucket, the bigger performance boost you can achieve with this post-hook compaction implementation. The examples shown in this blog were executed in a small Amazon EMR cluster with only two core nodes (r5.xlarge). To improve the performance of your Spark applications, Amazon EMR provides multiple optimization features that you can implement for your production workloads.

Performance of Iceberg reads with the compaction utility on Athena

To manage the Iceberg table based on events, you can start the Spark 3.3 SQL shell as shown in the following code. Make sure that the athena:StartQueryExecution and athena:GetQueryExecution permission policies are enabled.

spark-sql --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
          --conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
          --conf spark.sql.catalog.my_catalog.warehouse=<s3-bucket> \
          --conf spark.sql.catalog.my_catalog.metrics-reporter-impl=org.apache.iceberg.aws.manage.AwsTableManagementMetricsEvaluator \
          --conf spark.sql.catalog.my_catalog.optimize-data.impl=org.apache.iceberg.aws.manage.AthenaOptimizeDataExecutor \
          --conf spark.sql.catalog.my_catalog.optimize-data.athena.output-bucket=<s3-bucket>

Clean up

After you complete the test, clean up your resources to avoid any recurring costs:

  1. Delete the S3 buckets that you created for this test.
  2. Delete the EMR cluster.
  3. Stop and delete the EMR notebook instance.

Conclusion

In this post, we showed how Iceberg event-based table management lets you manage each table differently based on events and compact small files to boost application performance. This event-based process significantly reduces the operational overhead of using the Iceberg rewrite_data_files procedure, which needs manual or scheduled operation.

To learn more about Apache Iceberg and implement this open table format for your transactional data lake use cases, refer to the following resources:


About the Authors

Avijit Goswami is a Principal Solutions Architect at AWS specialized in data and analytics. He supports AWS strategic customers in building high-performing, secure, and scalable data lake solutions on AWS using AWS managed services and open-source solutions. Outside of his work, Avijit likes to travel, hike, watch sports, and listen to music.

Rajarshi Sarkar is a Software Development Engineer at Amazon EMR/Athena. He works on cutting-edge features of Amazon EMR/Athena and is also involved in open-source projects such as Apache Iceberg and Trino. In his spare time, he likes to travel, watch movies, and hang out with friends.

Perform upserts in a data lake using Amazon Athena and Apache Iceberg

Post Syndicated from Ranjit Rajan original https://aws.amazon.com/blogs/big-data/perform-upserts-in-a-data-lake-using-amazon-athena-and-apache-iceberg/

Amazon Athena supports the MERGE command on Apache Iceberg tables, which allows you to perform inserts, updates, and deletes in your data lake at scale using familiar SQL statements that are compliant with ACID (Atomic, Consistent, Isolated, Durable). Apache Iceberg is an open table format for data lakes that manages large collections of files as tables. It supports modern analytical data lake operations such as create table as select (CTAS), upsert and merge, and time travel queries. Athena also supports the ability to create views and perform VACUUM (snapshot expiration) on Apache Iceberg tables to optimize storage and performance. With these features, you can now build data pipelines completely in standard SQL that are serverless, more simple to build, and able to operate at scale. This enables developers to:

  • Focus on writing business logic and not worry about setting up and managing the underlying infrastructure
  • Perform data transformations with Athena
  • Help comply with certain data deletion requirements
  • Apply change data capture (CDC) from sources databases

With data lakes, data pipelines are typically configured to write data into a raw zone, which is an Amazon Simple Storage Service (Amazon S3) bucket or folder that contains data as is from source systems. Data is accumulated in this zone, such that inserts, updates, or deletes on the sources database appear as records in new files as transactions occur on the source. Although the raw zone can be queried, any downstream processing or analytical queries typically need to deduplicate data to derive a current view of the source table. For example, if a single record is updated multiple times in the source database, these be need to be deduplicated and the most recent record selected.

Typically, data transformation processes are used to perform this operation, and a final consistent view is stored in an S3 bucket or folder. Data transformation processes can be complex requiring more coding, more testing and are also error prone. This was a challenge because data lakes are based on files and have been optimized for appending data. Previously, you had to overwrite the complete S3 object or folder, which was not only inefficient but also interrupted users who were querying the same data. With the evolution of frameworks such as Apache Iceberg, you can perform SQL-based upsert in-place in Amazon S3 using Athena, without blocking user queries and while still maintaining query performance.

In this post, we demonstrate how you can use Athena to apply CDC from a relational database to target tables in an S3 data lake.

Overview of solution

For this post, consider a mock sports ticketing application based on the following project. We use a single table in that database that contains sporting events information and ingest it into an S3 data lake on a continuous basis (initial load and ongoing changes). This data ingestion pipeline can be implemented using AWS Database Migration Service (AWS DMS) to extract both full and ongoing CDC extracts. With CDC, you can determine and track data that has changed and provide it as a stream of changes that a downstream application can consume. Most databases use a transaction log to record changes made to the database. AWS DMS reads the transaction log by using engine-specific API operations and captures the changes made to the database in a nonintrusive manner.

Specifically, to extract changed data including inserts, updates, and deletes from the database, you can configure AWS DMS with two replication tasks, as described in the following workshop. The first task performs an initial copy of the full data into an S3 folder. The second task is configured to replicate ongoing CDC into a separate folder in S3, which is further organized into date-based subfolders based on the source databases’ transaction commit date. With full and CDC data in separate S3 folders, it’s easier to maintain and operate data replication and downstream processing jobs. To enable this, you can apply the following extra connection attributes to the S3 endpoint in AWS DMS, (refer to S3Settings for other CSV and related settings):

  • TimestampColumnName – AWS DMS adds a column that you name with timestamp information for the commit of that row in the source database.
  • includeOpForFullLoad – AWS DMS adds a column named Op to every file to indicate if the record is an I (INSERT), U (UPDATE), or D (DELETE).
  • DatePartitionEnabled, DatePartitionSequence, DatePartitionDelimiter – These settings are used to configure AWS DMS to write changed data to date/time-based folders in the data lake. By partitioning folders, you can better manage S3 objects and optimize data lake queries for subsequent downstream processing.

We use the support in Athena for Apache Iceberg tables called MERGE INTO, which can express row-level updates. Apache Iceberg supports MERGE INTO by rewriting data files that contain rows that need to be updated. After the data is merged, we demonstrate how to use Athena to perform time travel on the sporting_event table, and use views to abstract and present different versions of the data to end-users. Finally, to simplify table maintenance, we demonstrate performing VACUUM on Apache Iceberg tables to delete older snapshots, which will optimize latency and cost of both read and write operations.

The following diagram illustrates the solution architecture.

The solution workflow consists of the following steps:

  • Data ingestion:
    • Steps 1 and 2 use AWS DMS, which connects to the source database to load initial data and ongoing changes (CDC) to Amazon S3 in CSV format. For this post, we have provided sample full and CDC datasets in CSV format that have been generated using AWS DMS.
    • Step 3 is comprised of the following actions:
      • Create an external table in Athena pointing to the source data ingested in Amazon S3.
      • Create an Apache Iceberg target table and load data from the source table.
      • Merge CDC data into the Apache Iceberg table using MERGE INTO.
  • Data access:
    • In Step 4, create a view on the Apache Iceberg table.
    • Use the view to query data using standard SQL.

Prerequisites

Before getting started, make sure you have the required permissions to perform the following in your AWS account:

Create tables on the raw data

First, create a database for this demo.

  1. Navigate to the Athena console and choose Query editor.
    If this is your first time using the Athena query editor, you need to configure and specify an S3 bucket to store the query results.
  2. Create a database with the following code:
    CREATE DATABASE raw_demo;

  3. Next, create a folder in an S3 bucket that you can use for this demo. Name this folder sporting_event_full.
  4. Upload LOAD00000001.csv into the folder.
  5. Switch to the raw_demo database and create a table to point to the raw input data:
    CREATE EXTERNAL TABLE raw_demo.sporting_event(
      op string,
      cdc_timestamp timestamp, 
      id bigint, 
      sport_type_name string, 
      home_team_id int, 
      away_team_id int, 
      location_id smallint, 
      start_date_time timestamp, 
      start_date date, 
      sold_out smallint)
    ROW FORMAT DELIMITED 
      FIELDS TERMINATED BY ',' 
    STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' 
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION 's3://<your bucket>/sporting_event_full/'
      ;

  6. Run the following query to review the data:
    SELECT * FROM raw_demo.sporting_event LIMIT 5;

  7. Next, create another folder in the same S3 bucket called sporting_event_cdc.
  8. Within this folder, create three subfolders in a time hierarchy folder structure such that the final S3 folder URI looks like s3://<your-bucket>/sporting_event_cdc/2022/09/22/.
  9. Upload 20220922-184314489.csv into this folder.This folder structure is similar to how AWS DMS stores CDC data when you enable date-based folder partitioning.
  10. Create a table to point to the CDC data. This table also includes a partition column because the source data in Amazon S3 is organized into date-based folders.
    CREATE EXTERNAL TABLE raw_demo.sporting_event_cdc(
    op string,
    cdc_timestamp timestamp,
    id bigint,
    sport_type_name string,
    home_team_id int,
    away_team_id int,
    location_id smallint,
    start_date_time timestamp,
    start_date date,
    sold_out smallint)
    PARTITIONED BY (partition_date string)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION 's3://<your-bucket>/sporting_event_cdc/'
    ;

  11. Next, alter the table to add new partitions. Because the data is stored in non-Hive style format by AWS DMS, to query this data, add this partition manually or use an AWS Glue crawler. As data accumulates, continue to add new partitions to query this data.
    ALTER TABLE raw_demo.sporting_event_cdc ADD PARTITION (partition_date='2022-09-22') location 's3://<your-bucket>/sporting_event_cdc/2022/09/22/'

  12. Run the following query to review the CDC data:
    SELECT * FROM raw_demo.sporting_event_cdc;

There are two records with IDs 1 and 11 that are updates with op code U. The record with ID 21 has a delete (D) op code, and the record with ID 5 is an insert (I).

cdc data

Use CTAS to create the target Iceberg table in Parquet format

CTAS statements create new tables using standard SELECT queries. The resultant table is added to the AWS Glue Data Catalog and made available for querying.

  1. First, create another database to store the target table:
    CREATE DATABASE curated_demo;

  2. Next, switch to this database and run the CTAS statement to select data from the raw input table to create the target Iceberg table (replace the location with an appropriate S3 bucket in your account):
    CREATE TABLE curated_demo.sporting_event
    WITH (table_type='ICEBERG',
    location='s3://<your-bucket>/curated/sporting_event',
    format='PARQUET',
    is_external=false)
    AS SELECT
    id,
    sport_type_name,
    home_team_id,
    away_team_id,
    cast(location_id as int) as location_id,
    cast(start_date_time as timestamp(6)) as start_date_time,
    start_date,
    cast(sold_out as int) as sold_out
    FROM raw_demo.sporting_event
    ;

  3. Run the following query to review data in the Iceberg table:
    SELECT * FROM curated_demo.sporting_event LIMIT 5;

iceberg data

Use MERGE INTO to insert, update, and delete data into the Iceberg table

The MERGE INTO command updates the target table with data from the CDC table. The following statement uses a combination of primary keys and the Op column in the source data, which indicates if the source row is an insert, update, or delete. We use the id column as the primary key to join the target table to the source table, and we use the Op column to determine if a record needs to be deleted.

MERGE INTO curated_demo.sporting_event t
USING (SELECT op,
cdc_timestamp,
id,
sport_type_name,
home_team_id,
away_team_id,
location_id,
start_date_time,
start_date,
sold_out
FROM raw_demo.sporting_event_cdc
WHERE partition_date ='2022-09-22') s
ON t.id = s.id
WHEN MATCHED AND s.op = 'D' THEN DELETE
WHEN MATCHED THEN
UPDATE SET
sport_type_name = s.sport_type_name,
home_team_id = s.home_team_id,
location_id = s.location_id,
start_date_time = s.start_date_time,
start_date = s.start_date,
sold_out = s.sold_out
WHEN NOT MATCHED THEN
INSERT (id,
sport_type_name,
home_team_id,
away_team_id,
location_id,
start_date_time,
start_date)
VALUES
(s.id,
s.sport_type_name,
s.home_team_id,
s.away_team_id,
s.location_id,
s.start_date_time,
s.start_date)

Run the following query to verify data in the Iceberg table:

SELECT * FROM curated_demo.sporting_event WHERE id in (1, 5, 11, 21);

The record with ID 21 has been deleted, and the other records in the CDC dataset have been updated and inserted, as expected.

merge and delete

Create a view that contains the previous state

When you write to an Iceberg table, a new snapshot or version of a table is created each time.

A snapshot represents the state of a table at a point in time and is used to access the complete set of data files in the table. Time travel queries in Athena query Amazon S3 for historical data from a consistent snapshot as of a specified date and time or a specified snapshot ID. However, this requires knowledge of a table’s current snapshots. To abstract this information from users, you can create views on top of Iceberg tables:

CREATE VIEW curated_demo.v_sporting_event_previous_snapshot AS
SELECT id,
sport_type_name,
home_team_id,
away_team_id,
location_id,
cast(start_date_time as timestamp(3)) as start_date_time,
start_date,
sold_out
FROM curated_demo.sporting_event
FOR TIMESTAMP AS OF current_timestamp + interval '-5' minute;

Run the following query using this view to retrieve the snapshot of data before the CDC was applied:

SELECT * FROM curated_demo.v_sporting_event_previous_snapshot WHERE id = 21;

You can see the record with ID 21, which was deleted earlier.

view data

Compliance with privacy regulations may require that you permanently delete records in all snapshots. To accomplish this, you can set properties for snapshot retention in Athena when creating the table, or you can alter the table:

ALTER TABLE curated_demo.sporting_event SET TBLPROPERTIES (
'vacuum_min_snapshots_to_keep'='1',
'vacuum_max_snapshot_age_seconds'='1'
)

This instructs Athena to store only one version of the data and not maintain any transaction history. After a table has been updated with these properties, run the VACUUM command to remove the older snapshots and clean up storage:

VACUUM curated_demo.sporting_event;

Run the following query again:

SELECT * FROM curated_demo.v_sporting_event_previous_snapshot WHERE id = 21;

The record with ID 21 has been permanently deleted.

final validation

Considerations

As data accumulates in the CDC folder of your raw zone, older files can be archived to Amazon S3 Glacier. Subsequently, the MERGE INTO statement can also be run on a single source file if needed by using $path in the WHERE condition of the USING clause:

MERGE INTO curated_demo.sporting_event t
USING (SELECT op, cdc_timestamp,id,sport_type_name, home_team_id, away_team_id, location_id, start_date_time, start_date, sold_out FROM raw_demo.sporting_event_cdc WHERE partition_date='2022-09-22' AND regexp_like("$path", ‘/sporting_event_cdc/2022/09/22/20220922-184314489.csv')
………..

This results in Athena scanning all files in the partition’s folder before the filter is applied, but can be minimized by choosing fine-grained hourly partitions. With this approach, you can trigger the MERGE INTO to run on Athena as files arrive in your S3 bucket using Amazon S3 event notifications. This could enable near-real-time use cases where users need to query a consistent view of data in the data lake as soon it is created in source systems.

Clean up

To avoid incurring ongoing costs, complete the following steps to clean up your resources:

  1. Run the following SQL to drop the tables and views:
    DROP TABLE raw_demo.sporting_event;
    DROP TABLE raw_demo.sporting_event_cdc;
    DROP TABLE curated_demo.sporting_event;
    DROP VIEW curated_demo.v_sporting_event_previous_snapshot;

    Because Iceberg tables are considered managed tables in Athena, dropping an Iceberg table also removes all the data in the corresponding S3 folder.

  2. Run the following SQL to drop the databases:
    DROP DATABASE raw_demo;
    DROP DATABASE curated_demo;

  3. Delete the S3 folders and CSV files that you had uploaded.

Conclusion

This post showed you how to apply CDC to a target Iceberg table using CTAS and MERGE INTO statements in Athena. You can perform bulk load using a CTAS statement. When new data or changed data arrives, use the MERGE INTO statement to merge the CDC changes. To optimize storage and improve performance of queries, use the VACUUM command regularly.

As next steps, you can orchestrate these SQL statements using AWS Step Functions to implement end-to-end data pipelines for your data lake. For more information, refer to Build and orchestrate ETL pipelines using Amazon Athena and AWS Step Functions.


About the Authors

Ranjit Rajan is a Principal Data Lab Solutions Architect with AWS. Ranjit works with AWS customers to help them design and build data and analytics applications in the cloud.

Kannan Iyer is a Senior Data Lab Solutions Architect with AWS. Kannan works with AWS customers to help them design and build data and analytics applications in the cloud.

Alexandre Rezende is a Data Lab Solutions Architect with AWS. Alexandre works with customers on their Business Intelligence, Data Warehouse, and Data Lake use cases, design architectures to solve their business problems, and helps them build MVPs to accelerate their path to production.

Use Apache Iceberg in a data lake to support incremental data processing

Post Syndicated from Flora Wu original https://aws.amazon.com/blogs/big-data/use-apache-iceberg-in-a-data-lake-to-support-incremental-data-processing/

Apache Iceberg is an open table format for very large analytic datasets, which captures metadata information on the state of datasets as they evolve and change over time. It adds tables to compute engines including Spark, Trino, PrestoDB, Flink, and Hive using a high-performance table format that works just like a SQL table. Iceberg has become very popular for its support for ACID transactions in data lakes and features like schema and partition evolution, time travel, and rollback.

Apache Iceberg integration is supported by AWS analytics services including Amazon EMR, Amazon Athena, and AWS Glue. Amazon EMR can provision clusters with Spark, Hive, Trino, and Flink that can run Iceberg. Starting with Amazon EMR version 6.5.0, you can use Iceberg with your EMR cluster without requiring a bootstrap action. In early 2022, AWS announced general availability of Athena ACID transactions, powered by Apache Iceberg. The recently released Athena query engine version 3 provides better integration with the Iceberg table format. AWS Glue 3.0 and later supports the Apache Iceberg framework for data lakes.

In this post, we discuss what customers want in modern data lakes and how Apache Iceberg helps address customer needs. Then we walk through a solution to build a high-performance and evolving Iceberg data lake on Amazon Simple Storage Service (Amazon S3) and process incremental data by running insert, update, and delete SQL statements. Finally, we show you how to performance tune the process to improve read and write performance.

How Apache Iceberg addresses what customers want in modern data lakes

More and more customers are building data lakes, with structured and unstructured data, to support many users, applications, and analytics tools. There is an increased need for data lakes to support database like features such as ACID transactions, record-level updates and deletes, time travel, and rollback. Apache Iceberg is designed to support these features on cost-effective petabyte-scale data lakes on Amazon S3.

Apache Iceberg addresses customer needs by capturing rich metadata information about the dataset at the time the individual data files are created. There are three layers in the architecture of an Iceberg table: the Iceberg catalog, the metadata layer, and the data layer, as depicted in the following figure (source).

The Iceberg catalog stores the metadata pointer to the current table metadata file. When a select query is reading an Iceberg table, the query engine first goes to the Iceberg catalog, then retrieves the location of the current metadata file. Whenever there is an update to the Iceberg table, a new snapshot of the table is created, and the metadata pointer points to the current table metadata file.

The following is an example Iceberg catalog with AWS Glue implementation. You can see the database name, the location (S3 path) of the Iceberg table, and the metadata location.

The metadata layer has three types of files: the metadata file, manifest list, and manifest file in a hierarchy. At the top of the hierarchy is the metadata file, which stores information about the table’s schema, partition information, and snapshots. The snapshot points to the manifest list. The manifest list has the information about each manifest file that makes up the snapshot, such as location of the manifest file, the partitions it belongs to, and the lower and upper bounds for partition columns for the data files it tracks. The manifest file tracks data files as well as additional details about each file, such as the file format. All three files work in a hierarchy to track the snapshots, schema, partitioning, properties, and data files in an Iceberg table.

The data layer has the individual data files of the Iceberg table. Iceberg supports a wide range of file formats including Parquet, ORC, and Avro. Because the Iceberg table tracks the individual data files instead of only pointing to the partition location with data files, it isolates the writing operations from reading operations. You can write the data files at any time, but only commit the change explicitly, which creates a new version of the snapshot and metadata files.

Solution overview

In this post, we walk you through a solution to build a high-performing Apache Iceberg data lake on Amazon S3; process incremental data with insert, update, and delete SQL statements; and tune the Iceberg table to improve read and write performance. The following diagram illustrates the solution architecture.

To demonstrate this solution, we use the Amazon Customer Reviews dataset in an S3 bucket (s3://amazon-reviews-pds/parquet/). In real use case, it would be raw data stored in your S3 bucket. We can check the data size with the following code in the AWS Command Line Interface (AWS CLI):

//Run this AWS CLI command to check the data size
aws s3 ls --summarize --human-readable --recursive s3://amazon-reviews-pds/parquet

The total object count is 430, and total size is 47.4 GiB.

To set up and test this solution, we complete the following high-level steps:

  1. Set up an S3 bucket in the curated zone to store converted data in Iceberg table format.
  2. Launch an EMR cluster with appropriate configurations for Apache Iceberg.
  3. Create a notebook in EMR Studio.
  4. Configure the Spark session for Apache Iceberg.
  5. Convert data to Iceberg table format and move data to the curated zone.
  6. Run insert, update, and delete queries in Athena to process incremental data.
  7. Carry out performance tuning.

Prerequisites

To follow along with this walkthrough, you must have an AWS account with an AWS Identity and Access Management (IAM) role that has sufficient access to provision the required resources.

Set up the S3 bucket for Iceberg data in the curated zone in your data lake

Choose the Region in which you want to create the S3 bucket and provide a unique name:

s3://iceberg-curated-blog-data

Launch an EMR cluster to run Iceberg jobs using Spark

You can create an EMR cluster from the AWS Management Console, Amazon EMR CLI, or AWS Cloud Development Kit (AWS CDK). For this post, we walk you through how to create an EMR cluster from the console.

  1. On the Amazon EMR console, choose Create cluster.
  2. Choose Advanced options.
  3. For Software Configuration, choose the latest Amazon EMR release. As of January 2023, the latest release is 6.9.0. Iceberg requires release 6.5.0 and above.
  4. Select JupyterEnterpriseGateway and Spark as the software to install.
  5. For Edit software settings, select Enter configuration and enter [{"classification":"iceberg-defaults","properties":{"iceberg.enabled":true}}].
  6. Leave other settings at their default and choose Next.
  7. For Hardware, use the default setting.
  8. Choose Next.
  9. For Cluster name, enter a name. We use iceberg-blog-cluster.
  10. Leave the remaining settings unchanged and choose Next.
  11. Choose Create cluster.

Create a notebook in EMR Studio

We now walk you through how to create a notebook in EMR Studio from the console.

  1. On the IAM console, create an EMR Studio service role.
  2. On the Amazon EMR console, choose EMR Studio.
  3. Choose Get started.

The Get started page appears in a new tab.

  1. Choose Create Studio in the new tab.
  2. Enter a name. We use iceberg-studio.
  3. Choose the same VPC and subnet as those for the EMR cluster, and the default security group.
  4. Choose AWS Identity and Access Management (IAM) for authentication, and choose the EMR Studio service role you just created.
  5. Choose an S3 path for Workspaces backup.
  6. Choose Create Studio.
  7. After the Studio is created, choose the Studio access URL.
  8. On the EMR Studio dashboard, choose Create workspace.
  9. Enter a name for your Workspace. We use iceberg-workspace.
  10. Expand Advanced configuration and choose Attach Workspace to an EMR cluster.
  11. Choose the EMR cluster you created earlier.
  12. Choose Create Workspace.
  13. Choose the Workspace name to open a new tab.

In the navigation pane, there is a notebook that has the same name as the Workspace. In our case, it is iceberg-workspace.

  1. Open the notebook.
  2. When prompted to choose a kernel, choose Spark.

Configure a Spark session for Apache Iceberg

Use the following code, providing your own S3 bucket name:

%%configure -f
{
"conf": {
"spark.sql.catalog.demo": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.demo.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
"spark.sql.catalog.demo.warehouse": "s3://iceberg-curated-blog-data",
"spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.sql.catalog.demo.io-impl":"org.apache.iceberg.aws.s3.S3FileIO"
}
}

This sets the following Spark session configurations:

  • spark.sql.catalog.demo – Registers a Spark catalog named demo, which uses the Iceberg Spark catalog plugin.
  • spark.sql.catalog.demo.catalog-impl – The demo Spark catalog uses AWS Glue as the physical catalog to store Iceberg database and table information.
  • spark.sql.catalog.demo.warehouse – The demo Spark catalog stores all Iceberg metadata and data files under the root path defined by this property: s3://iceberg-curated-blog-data.
  • spark.sql.extensions – Adds support to Iceberg Spark SQL extensions, which allows you to run Iceberg Spark procedures and some Iceberg-only SQL commands (you use this in a later step).
  • spark.sql.catalog.demo.io-impl – Iceberg allows users to write data to Amazon S3 through S3FileIO. The AWS Glue Data Catalog by default uses this FileIO, and other catalogs can load this FileIO using the io-impl catalog property.

Convert data to Iceberg table format

You can use either Spark on Amazon EMR or Athena to load the Iceberg table. In the EMR Studio Workspace notebook Spark session, run the following commands to load the data:

// create a database in AWS Glue named reviews if not exist
spark.sql("CREATE DATABASE IF NOT EXISTS demo.reviews")

// load reviews - this load all the parquet files
val reviews_all_location = "s3://amazon-reviews-pds/parquet/"
val reviews_all = spark.read.parquet(reviews_all_location)

// write reviews data to an Iceberg v2 table
reviews_all.writeTo("demo.reviews.all_reviews").tableProperty("format-version", "2").createOrReplace()

After you run the code, you should find two prefixes created in your data warehouse S3 path (s3://iceberg-curated-blog-data/reviews.db/all_reviews): data and metadata.

Process incremental data using insert, update, and delete SQL statements in Athena

Athena is a serverless query engine that you can use to perform read, write, update, and optimization tasks against Iceberg tables. To demonstrate how the Apache Iceberg data lake format supports incremental data ingestion, we run insert, update, and delete SQL statements on the data lake.

Navigate to the Athena console and choose Query editor. If this is your first time using the Athena query editor, you need to configure the query result location to be the S3 bucket you created earlier. You should be able to see that the table reviews.all_reviews is available for querying. Run the following query to verify that you have loaded the Iceberg table successfully:

select * from reviews.all_reviews limit 5;

Process incremental data by running insert, update, and delete SQL statements:

//Example update statement
update reviews.all_reviews set star_rating=5 where product_category = 'Watches' and star_rating=4

//Example delete statement
delete from reviews.all_reviews where product_category = 'Watches' and star_rating=1

Performance tuning

In this section, we walk through different ways to improve Apache Iceberg read and write performance.

Configure Apache Iceberg table properties

Apache Iceberg is a table format, and it supports table properties to configure table behavior such as read, write, and catalog. You can improve the read and write performance on Iceberg tables by adjusting the table properties.

For example, if you notice that you write too many small files for an Iceberg table, you can config the write file size to write fewer but bigger size files, to help improve query performance.

Property Default Description
write.target-file-size-bytes 536870912 (512 MB) Controls the size of files generated to target about this many bytes

Use the following code to alter the table format:

//Example code to alter table format in EMR Studio Workspace notebook
spark.sql("ALTER TABLE demo.reviews.all_reviews 
SET TBLPROPERTIES ('write_target_data_file_size_bytes'='536870912')")

Partitioning and sorting

To make a query run fast, the less data read the better. Iceberg takes advantage of the rich metadata it captures at write time and facilitates techniques such as scan planning, partitioning, pruning, and column-level stats such as min/max values to skip data files that don’t have match records. We walk you through how query scan planning and partitioning work in Iceberg and how we use them to improve query performance.

Query scan planning

For a given query, the first step in a query engine is scan planning, which is the process to find the files in a table needed for a query. Planning in an Iceberg table is very efficient, because Iceberg’s rich metadata can be used to prune metadata files that aren’t needed, in addition to filtering data files that don’t contain matching data. In our tests, we observed Athena scanned 50% or less data for a given query on an Iceberg table compared to original data before conversion to Iceberg format.

There are two types of filtering:

  • Metadata filtering – Iceberg uses two levels of metadata to track the files in a snapshot: the manifest list and manifest files. It first uses the manifest list, which acts as an index of the manifest files. During planning, Iceberg filters manifests using the partition value range in the manifest list without reading all the manifest files. Then it uses selected manifest files to get data files.
  • Data filtering – After selecting the list of manifest files, Iceberg uses the partition data and column-level stats for each data file stored in manifest files to filter data files. During planning, query predicates are converted to predicates on the partition data and applied first to filter data files. Then, the column stats like column-level value counts, null counts, lower bounds, and upper bounds are used to filter out data files that can’t match the query predicate. By using upper and lower bounds to filter data files at planning time, Iceberg greatly improves query performance.

Partitioning and sorting

Partitioning is a way to group records with the same key column values together in writing. The benefit of partitioning is faster queries that access only part of the data, as explained earlier in query scan planning: data filtering. Iceberg makes partitioning simple by supporting hidden partitioning, in the way that Iceberg produces partition values by taking a column value and optionally transforming it.

In our use case, we first run the following query on the Iceberg table not partitioned. Then we partition the Iceberg table by the category of the reviews, which will be used in the query WHERE condition to filter out records. With partitioning, the query could scan much less data. See the following code:

//Example code in EMR Studio Workspace notebook to create an Iceberg table all_reviews_partitioned partitioned by product_category
reviews_all.writeTo("demo.reviews.all_reviews_partitioned").tableProperty("format-version", "2").partitionedBy($"product_category").createOrReplace()

Run the following select statement on the non-partitioned all_reviews table vs. the partitioned table to see the performance difference:

//Run this query on all_reviews table and the partitioned table for performance testing
select marketplace,customer_id, review_id,product_id,product_title,star_rating from reviews.all_reviews where product_category = 'Watches' and review_date between date('2005-01-01') and date('2005-03-31')

//Run the same select query on partitioned dataset
select marketplace,customer_id, review_id,product_id,product_title,star_rating from reviews.all_reviews_partitioned where product_category = 'Watches' and review_date between date('2005-01-01') and date('2005-03-31')

The following table shows the performance improvement of data partitioning, with about 50% performance improvement and 70% less data scanned.

Dataset Name Non-Partitioned Dataset Partitioned Dataset
Runtime (seconds) 8.20 4.25
Data Scanned (MB) 131.55 33.79

Note that the runtime is the average runtime with multiple runs in our test.

We saw good performance improvement after partitioning. However, this can be further improved by using column-level stats from Iceberg manifest files. In order to use the column-level stats effectively, you want to further sort your records based on the query patterns. Sorting the whole dataset using the columns that are often used in queries will reorder the data in such a way that each data file ends up with a unique range of values for the specific columns. If these columns are used in the query condition, it allows query engines to further skip data files, thereby enabling even faster queries.

Copy-on-write vs. read-on-merge

When implementing update and delete on Iceberg tables in the data lake, there are two approaches defined by the Iceberg table properties:

  • Copy-on-write – With this approach, when there are changes to the Iceberg table, either updates or deletes, the data files associated with the impacted records will be duplicated and updated. The records will be either updated or deleted from the duplicated data files. A new snapshot of the Iceberg table will be created and pointing to the newer version of data files. This makes the overall writes slower. There might be situations that concurrent writes are needed with conflicts so retry has to happen, which increases the write time even more. On the other hand, when reading the data, there is no extra process needed. The query will retrieve data from the latest version of data files.
  • Merge-on-read – With this approach, when there are updates or deletes on the Iceberg table, the existing data files will not be rewritten; instead new delete files will be created to track the changes. For deletes, a new delete file will be created with the deleted records. When reading the Iceberg table, the delete file will be applied to the retrieved data to filter out the delete records. For updates, a new delete file will be created to mark the updated records as deleted. Then a new file will be created for those records but with updated values. When reading the Iceberg table, both the delete and new files will be applied to the retrieved data to reflect the latest changes and produce the correct results. So, for any subsequent queries, an extra step to merge the data files with the delete and new files will happen, which will usually increase the query time. On the other hand, the writes might be faster because there is no need to rewrite the existing data files.

To test the impact of the two approaches, you can run the following code to set the Iceberg table properties:

//Run code to alter Iceberg table property to set copy-on-write and merge-on-read in EMR Studio Workspace notebook
spark.sql(“ALTER TABLE demo.reviews.all_reviews 
SET TBLPROPERTIES (‘write.delete.mode’=’copy-on-write’,’write.update.mode’=’copy-on-write’)”)

Run the update, delete, and select SQL statements in Athena to show the runtime difference for copy-on-write vs. merge-on-read:

//Example update statement
update reviews.all_reviews set star_rating=5 where product_category = ‘Watches’ and star_rating=4

//Example delete statement
delete from reviews.all_reviews where product_category = ‘Watches’ and star_rating=1

//Example select statement
select marketplace,customer_id, review_id,product_id,product_title,star_rating from reviews.all_reviews where product_category = ‘Watches’ and review_date between date(‘2005-01-01’) and date(‘2005-03-31’)

The following table summarizes the query runtimes.

Query Copy-on-Write Merge-on-Read
UPDATE DELETE SELECT UPDATE DELETE SELECT
Runtime (seconds) 66.251 116.174 97.75 10.788 54.941 113.44
Data scanned (MB) 494.06 3.07 137.16 494.06 3.07 137.16

Note that the runtime is the average runtime with multiple runs in our test.

As our test results show, there are always trade-offs in the two approaches. Which approach to use depends on your use cases. In summary, the considerations come down to latency on the read vs. write. You can reference the following table and make the right choice.

. Copy-on-Write Merge-on-Read
Pros Faster reads Faster writes
Cons Expensive writes Higher latency on reads
When to use Good for frequent reads, infrequent updates and deletes or large batch updates Good for tables with frequent updates and deletes

Data compaction

If your data file size is small, you might end up with thousands or millions of files in an Iceberg table. This dramatically increases the I/O operation and slows down the queries. Furthermore, Iceberg tracks each data file in a dataset. More data files lead to more metadata. This in turn increases the overhead and I/O operation on reading metadata files. In order to improve the query performance, it’s recommended to compact small data files to larger data files.

When updating and deleting records in Iceberg table, if the read-on-merge approach is used, you might end up with many small deletes or new data files. Running compaction will combine all these files and create a newer version of the data file. This eliminates the need to reconcile them during reads. It’s recommended to have regular compaction jobs to impact reads as little as possible while still maintaining faster write speed.

Run the following data compaction command, then run the select query from Athena:

//Data compaction 
optimize reviews.all_reviews REWRITE DATA USING BIN_PACK

//Run this query before and after data compaction
select marketplace,customer_id, review_id,product_id,product_title,star_rating from reviews.all_reviews where product_category = 'Watches' and review_date between date('2005-01-01') and date('2005-03-31')

The following table compares the runtime before vs. after data compaction. You can see about 40% performance improvement.

Query Before Data Compaction After Data Compaction
Runtime (seconds) 97.75 32.676 seconds
Data scanned (MB) 137.16 M 189.19 M

Note that the select queries ran on the all_reviews table after update and delete operations, before and after data compaction. The runtime is the average runtime with multiple runs in our test.

Clean up

After you follow the solution walkthrough to perform the use cases, complete the following steps to clean up your resources and avoid further costs:

  1. Drop the AWS Glue tables and database from Athena or run the following code in your notebook:
// DROP the table 
spark.sql("DROP TABLE demo.reviews.all_reviews") 
spark.sql("DROP TABLE demo.reviews.all_reviews_partitioned") 

// DROP the database 
spark.sql("DROP DATABASE demo.reviews")
  1. On the EMR Studio console, choose Workspaces in the navigation pane.
  2. Select the Workspace you created and choose Delete.
  3. On the EMR console, navigate to the Studios page.
  4. Select the Studio you created and choose Delete.
  5. On the EMR console, choose Clusters in the navigation pane.
  6. Select the cluster and choose Terminate.
  7. Delete the S3 bucket and any other resources that you created as part of the prerequisites for this post.

Conclusion

In this post, we introduced the Apache Iceberg framework and how it helps resolve some of the challenges we have in a modern data lake. Then we walked you though a solution to process incremental data in a data lake using Apache Iceberg. Finally, we had a deep dive into performance tuning to improve read and write performance for our use cases.

We hope this post provides some useful information for you to decide whether you want to adopt Apache Iceberg in your data lake solution.


About the Authors

Flora Wu is a Sr. Resident Architect at AWS Data Lab. She helps enterprise customers create data analytics strategies and build solutions to accelerate their businesses outcomes. In her spare time, she enjoys playing tennis, dancing salsa, and traveling.

Daniel Li is a Sr. Solutions Architect at Amazon Web Services. He focuses on helping customers develop, adopt, and implement cloud services and strategy. When not working, he likes spending time outdoors with his family.

Build a real-time GDPR-aligned Apache Iceberg data lake

Post Syndicated from Dhiraj Thakur original https://aws.amazon.com/blogs/big-data/build-a-real-time-gdpr-aligned-apache-iceberg-data-lake/

Data lakes are a popular choice for today’s organizations to store their data around their business activities. As a best practice of a data lake design, data should be immutable once stored. But regulations such as the General Data Protection Regulation (GDPR) have created obligations for data operators who must be able to erase or update personal data from their data lake when requested.

A data lake built on AWS uses Amazon Simple Storage Service (Amazon S3) as its primary storage environment. When a customer asks to erase or update private data, the data lake operator needs to find the required objects in Amazon S3 that contain the required data and take steps to erase or update that data. This activity can be a complex process for the following reasons:

  • Data lakes may contain many S3 objects (each may contain multiple rows), and often it’s difficult to find the object containing the exact data that needs to be erased or personally identifiable information (PII) to be updated as per the request
  • By nature, S3 objects are immutable and therefore applying direct row-based transactions like DELETE or UPDATE isn’t possible

To handle these situations, a transactional feature on S3 objects is required, and frameworks such as Apache Hudi or Apache Iceberg provide you the transactional feature for upserts in Amazon S3.

AWS contributed the Apache Iceberg integration with the AWS Glue Data Catalog, which enables you to use open-source data computation engines like Apache Spark with Iceberg on AWS Glue. In 2022, Amazon Athena announced support of Iceberg, enabling transaction queries on S3 objects.

In this post, we show you how to stream real-time data to an Iceberg table in Amazon S3 using AWS Glue streaming and perform transactions using Amazon Athena for deletes and updates. We use a serverless mechanism for this implementation, which requires minimum operational overhead to manage and fine-tune various configuration parameters, and enables you to extend your use case to ACID operations beyond the GDPR.

Solution overview

We used the Amazon Kinesis Data Generator (KDG) to produce synthetic streaming data in Amazon Kinesis Data Streams and then processed the streaming input data using AWS Glue streaming to store the data in Amazon S3 in Iceberg table format. As part of the customer’s request, we ran delete and update statements using Athena with Iceberg support.

The following diagram illustrates the solution architecture.

The solution workflow consists of the following steps:

  1. Streaming data is generated in JSON format using the KDG template and inserted into Kinesis Data Streams.
  2. An AWS Glue streaming job is connected to Kinesis Data Streams to process the data using the Iceberg connector.
  3. The streaming job output is stored in Amazon S3 in Iceberg table format.
  4. Athena uses the AWS Glue Data Catalog to store and retrieve table metadata for the Amazon S3 data in Iceberg format.
  5. Athena interacts with the Data Catalog tables in Iceberg format for transactional queries required for GDPR.

The codebase required for this post is available in the GitHub repository.

Prerequisites

Before starting the implementation, make sure the following prerequisites are met:

Deploy resources using AWS CloudFormation

Complete the following steps to deploy your solution resources:

  1. After you sign in to your AWS account, launch the CloudFormation template by choosing Launch Stack:
  2. For Stack name, enter a name.
  3. For Username, enter the user name for the KDG.
  4. For Password, enter the password for the KDG (this must be at least six alphanumeric characters, and contain at least one number).
  5. For IAMGlueStreamingJobRoleName, enter a name for the IAM role used for the AWS Glue streaming job.
  6. Choose Next and create your stack.

This CloudFormation template configures the following resources in your account:

  • An S3 bucket named streamingicebergdemo-XX (note that the XX part is a random unique number to make the S3 bucket name unique)
  • An IAM policy and role
  • The KDG URL used for creating synthetic data
  1. After you complete the setup, go to the Outputs tab of the CloudFormation stack to get the S3 bucket name, AWS Glue job execution role (as per your input), and KDG URL.
  2. Before proceeding with the demo, create a folder named custdata under the created S3 bucket.

Create a Kinesis data stream

We use Kinesis Data Streams to create a serverless streaming data service that is built to handle millions of events with low latency. The following steps guide you on how to create the data stream in the us-east-1 Region:

  1. Log in to the AWS Management Console.
  2. Navigate to Kinesis console (make sure the Region is us-east-1).
  3. Select Kinesis Data Streams and choose Create data stream.
  4. For Data stream name, enter demo-data-stream.
  5. For this post, we select On-demand as the Kinesis data stream capacity mode.

On-demand mode works to eliminate the need for provisioning and managing the capacity for streaming data. However, you can implement this solution with Kinesis Data Streams in provisioned mode as well.

  1. Choose Create data stream.
  2. Wait for successful creation of demo-data-stream and for it to be in Active status.

Set up the Kinesis Data Generator

To create a sample streaming dataset, we use the KDG URL generated on the CloudFormation stack Outputs tab and log in with the credentials used in the parameters for the CloudFormation template. For this post, we use the following template to generate sample data in the demo-data-stream Kinesis data stream.

  1. Log in to the KDG URL with the user name and password you supplied during stack creation.
  2. Change the Region to us-east-1.
  3. Select the Kinesis data stream demo-data-stream.
  4. For Records per second, choose Constant and enter 100 (it can be another number, depending on the rate of record creation).
  5. On the Template 1 tab, enter the KDG data generation template:
{
"year": "{{random.number({"min":2000,"max":2022})}}",
"month": "{{random.number({"min":1,"max":12})}}",
"day": "{{random.number({"min":1,"max":30})}}",
"hour": "{{random.number({"min":0,"max":24})}}",
"minute": "{{random.number({"min":0,"max":60})}}",
"customerid": {{random.number({"min":5023,"max":59874})}},
"firstname" : "{{name.firstName}}",
"lastname" : "{{name.lastName}}",
"dateofbirth" : "{{date.past(70)}}",
"city" : "{{address.city}}",
"buildingnumber" : {{random.number({"min":63,"max":947})}},
"streetaddress" : "{{address.streetAddress}}",
"state" : "{{address.state}}",
"zipcode" : "{{address.zipCode}}",
"country" : "{{address.country}}",
"countrycode" : "{{address.countryCode}}",
"phonenumber" : "{{phone.phoneNumber}}",
"productname" : "{{commerce.productName}}",
"transactionamount": {{random.number(
{
"min":10,
"max":150
}
)}}
}
  1. Choose Test template to test the sample records.
  2. When the testing is correct, choose Send data.

This will start sending 100 records per second in the Kinesis data stream. (To stop sending data, choose Stop Sending Data to Kinesis.)

Integrate Iceberg with AWS Glue

To add the Apache Iceberg Connector for AWS Glue, complete the following steps. The connector is free to use and supports AWS Glue 1.0, 2.0, and 3.0.

  1. On the AWS Glue console, choose AWS Glue Studio in the navigation pane.
  2. In the navigation pane, navigate to AWS Marketplace.
  3. Search for and choose Apache Iceberg Connector for AWS Glue.
  4. Choose Accept Terms and Continue to Subscribe.
  5. Choose Continue to Configuration.
  6. For Fulfillment option, choose your AWS Glue version.
  7. For Software version, choose the latest software version.
  8. Choose Continue to Launch.
  9. Under Usage Instructions, choose the link to activate the connector.
  10. Enter a name for the connection, then choose Create connection and activate the connector.
  11. Verify the new connector on the AWS Glue Studio Connectors.

Create the AWS Glue Data Catalog database

The AWS Glue Data Catalog contains references to data that is used as sources and targets of your extract, transform, and load (ETL) jobs in AWS Glue. To create your data warehouse or data lake, you must catalog this data. The AWS Glue Data Catalog is an index to the location and schema of your data. You use the information in the Data Catalog to create and monitor your ETL jobs.

For this post, we create a Data Catalog database named icebergdemodb containing the metadata information of a table named customer, which will be queried through Athena.

  1. On the AWS Glue console, choose Databases in the navigation pane.
  2. Choose Add database.
  3. For Database name, enter icebergdemodb.

This creates an AWS Glue database for metadata storage.

Create a Data Catalog table in Iceberg format

In this step, we create a Data Catalog table in Iceberg table format.

  1. On the Athena console, create an Athena workgroup named demoworkgroup for SQL queries.
  2. Choose Athena engine version 3 for Query engine version.

For more information about Athena versions, refer to Changing Athena engine versions.

  1. Enter the S3 bucket location for Query result configuration under Additional configurations.
  2. Open the Athena query editor and choose demoworkgroup.
  3. Choose the database icebergdemodb.
  4. Enter and run the following DDL to create a table pointing to the Data Catalog database icerbergdemodb. Note that the TBLPROPERTIES section mentions ICEBERG as the table type and LOCATION points to the S3 folder (custdata) URI created in earlier steps. This DDL command is available on the GitHub repo.
CREATE TABLE icebergdemodb.customer(
year string,
month string,
day string,
hour string,
minute string,
customerid string,
firstname string,
lastname string,
dateofbirth string,
city string,
buildingnumber string,
streetaddress string,
state string,
zipcode string,
country string,
countrycode string,
phonenumber string,
productname string,
transactionamount int)
LOCATION '<S3 Location URI>'
TBLPROPERTIES (
'table_type'='ICEBERG',
'format'='parquet',
'write_target_data_file_size_bytes'='536870912',
'optimize_rewrite_delete_file_threshold'='10'
);

After you run the command successfully, you can see the table customer in the Data Catalog.

Create an AWS Glue streaming job

In this section, we create the AWS Glue streaming job, which fetches the record from the Kinesis data stream using the Spark script editor.

  1. On the AWS Glue console, choose Jobs (new) in the navigation pane.
  2. For Create job¸ select Spark script editor.
  3. For Options¸ select Create a new script with boilerplate code.
  4. Choose Create.
  5. Enter the code available in the GitHub repo in the editor.

The sample code keeps appending data in the target location by fetching records from the Kinesis data stream.

  1. Choose the Job details tab in the query editor.
  2. For Name, enter Demo_Job.
  3. For IAM role¸ choose demojobrole.
  4. For Type, choose Spark Streaming.
  5. For Glue Version, choose Glue 3.0.
  6. For Language, choose Python 3.
  7. For Worker type, choose G 0.25X.
  8. Select Automatically scale the number of workers.
  9. For Maximum number of workers, enter 5.
  10. Under Advanced properties, select Use Glue Data Catalog as the Hive metastore.
  11. For Connections, choose the connector you created.
  12. For Job parameters, enter the following key pairs (provide your S3 bucket and account ID):
Key Value
--iceberg_job_catalog_warehouse s3://streamingicebergdemo-XX/custdata/
--output_path s3://streamingicebergdemo-XX
--kinesis_arn arn:aws:kinesis:us-east-1:<AWS Account ID>:stream/demo-data-stream
--user-jars-first True

  1. Choose Run to start the AWS Glue streaming job.
  2. To monitor the job, choose Monitoring in the navigation pane.
  3. Select Demo_Job and choose View run details to check the job run details and Amazon CloudWatch logs.

Run GDPR use cases on Athena

In this section, we demonstrate a few use cases that are relevant to GDPR alignment with the user data that’s stored in Iceberg format in the Amazon S3-based data lake as implemented in the previous steps. For this, let’s consider that the following requests are being initiated in the workflow to comply with the regulations:

  • Delete the records for the input customerid (for example, 59289)
  • Update phonenumber for the customerid (for example, 51842)

The IDs used in this example are samples only because they were created through the KDG template used earlier, which creates sample data. You can search for IDs in your implementation by querying through the Athena query editor. The steps remain the same.

Delete data by customer ID

Complete the following steps to fulfill the first use case:

  1. On the Athena console, and make sure icebergdemodb is chosen as the database.
  2. Open the query editor.
  3. Enter the following query using a customer ID and choose Run:
SELECT count(*)
FROM icebergdemodb.customer
WHERE customerid = '59289';

This query gives the count of records for the input customerid before delete.

  1. Enter the following query with the same customer ID and choose Run:
MERGE INTO icebergdemodb.customer trg
USING (SELECT customerid
FROM icebergdemodb.customer
WHERE customerid = '59289') src
ON (trg.customerid = src.customerid)
WHEN MATCHED
THEN DELETE;

This query deletes the data for the input customerid as per the workflow generated.

  1. Test if there is data with the customer ID using a count query.

The count should be 0.

Update data by customer ID

Complete the following steps to test the second use case:

  1. On the Athena console, make sure icebergdemodb is chosen as the database.
  2. Open the query editor.
  3. Enter the following query with a customer ID and choose Run.
SELECT customerid, phonenumber
FROM icebergdemodb.customer
WHERE customerid = '51936';

This query gives the value for phonenumber before update.

  1. Run the following query to update the required columns:
MERGE INTO icebergdemodb.customer trg
USING (SELECT customerid
FROM icebergdemodb.customer
WHERE customerid = '51936') src
ON (trg.customerid = src.customerid)
WHEN MATCHED
THEN UPDATE SET phonenumber = '000';

This query updates the data to a dummy value.

  1. Run the SELECT query to check the update.

You can see the data is updated correctly.

Vacuum table

A good practice is to run the VACUUM command periodically on the table because operations like INSERT, UPDATE, DELETE, and MERGE will take place on the Iceberg table. See the following code:

VACUUM icebergdemodb.customer;

Considerations

The following are a few considerations to keep in mind for this implementation:

Clean up

Complete the following steps to clean up the resources you created for this post:

    1. Delete the custdata folder in the S3 bucket.
    2. Delete the CloudFormation stack.
    3. Delete the Kinesis data stream.
    4. Delete the S3 bucket storing the data.
    5. Delete the AWS Glue job and Iceberg connector.
    6. Delete the AWS Glue Data Catalog database and table.
    7. Delete the Athena workgroup.
    8. Delete the IAM roles and policies.

Conclusion

This post explained how you can use the Iceberg table format on Athena to implement GDPR use cases like data deletion and data upserts as required, when streaming data is being generated and ingested through AWS Glue streaming jobs in Amazon S3.

The operations for the Iceberg table that we demonstrated in this post aren’t all of the data operations that Iceberg supports. Refer to the Apache Iceberg documentation for details on various operations.


About the Authors

Dhiraj Thakur is a Solutions Architect with Amazon Web Services. He works with AWS customers and partners to provide guidance on enterprise cloud adoption, migration, and strategy. He is passionate about technology and enjoys building and experimenting in the analytics and AI/ML space.

Rajdip Chaudhuri is Solutions Architect with Amazon Web Services specializing in data and analytics. He enjoys working with AWS customers and partners on data and analytics requirements. In his spare time, he enjoys soccer.

Automate replication of relational sources into a transactional data lake with Apache Iceberg and AWS Glue

Post Syndicated from Luis Gerardo Baeza original https://aws.amazon.com/blogs/big-data/automate-replication-of-relational-sources-into-a-transactional-data-lake-with-apache-iceberg-and-aws-glue/

Organizations have chosen to build data lakes on top of Amazon Simple Storage Service (Amazon S3) for many years. A data lake is the most popular choice for organizations to store all their organizational data generated by different teams, across business domains, from all different formats, and even over history. According to a study, the average company is seeing the volume of their data growing at a rate that exceeds 50% per year, usually managing an average of 33 unique data sources for analysis.

Teams often try to replicate thousands of jobs from relational databases with the same extract, transform, and load (ETL) pattern. There is lot of effort in maintaining the job states and scheduling these individual jobs. This approach helps the teams add tables with few changes and also maintains the job status with minimum effort. This can lead to a huge improvement in the development timeline and tracking the jobs with ease.

In this post, we show you how to easily replicate all your relational data stores into a transactional data lake in an automated fashion with a single ETL job using Apache Iceberg and AWS Glue.

Solution architecture

Data lakes are usually organized using separate S3 buckets for three layers of data: the raw layer containing data in its original form, the stage layer containing intermediate processed data optimized for consumption, and the analytics layer containing aggregated data for specific use cases. In the raw layer, tables usually are organized based on their data sources, whereas tables in the stage layer are organized based on the business domains they belong to.

This post provides an AWS CloudFormation template that deploys an AWS Glue job that reads an Amazon S3 path for one data source of the data lake raw layer, and ingests the data into Apache Iceberg tables on the stage layer using AWS Glue support for data lake frameworks. The job expects tables in the raw layer to be structured in the way AWS Database Migration Service (AWS DMS) ingests them: schema, then table, then data files.

This solution uses AWS Systems Manager Parameter Store for table configuration. You should modify this parameter specifying the tables you want to process and how, including information such as primary key, partitions, and the business domain associated. The job uses this information to automatically create a database (if it doesn’t already exist) for every business domain, create the Iceberg tables, and perform the data loading.

Finally, we can use Amazon Athena to query the data in the Iceberg tables.

The following diagram illustrates this architecture.

Solution architecture

This implementation has the following considerations:

  • All tables from the data source must have a primary key to be replicated using this solution. The primary key can be a single column or a composite key with more than one column.
  • If the data lake contains tables that don’t need upserts or don’t have a primary key, you can exclude them from the parameter configuration and implement traditional ETL processes to ingest them into the data lake. That’s outside of the scope of this post.
  • If there are additional data sources that need to be ingested, you can deploy multiple CloudFormation stacks, one to handle each data source.
  • The AWS Glue job is designed to process data in two phases: the initial load that runs after AWS DMS finishes the full load task, and the incremental load that runs on a schedule that applies change data capture (CDC) files captured by AWS DMS. Incremental processing is performed using an AWS Glue job bookmark.

There are nine steps to complete this tutorial:

  1. Set up a source endpoint for AWS DMS.
  2. Deploy the solution using AWS CloudFormation.
  3. Review the AWS DMS replication task.
  4. Optionally, add permissions for encryption and decryption or AWS Lake Formation.
  5. Review the table configuration on Parameter Store.
  6. Perform initial data loading.
  7. Perform incremental data loading.
  8. Monitor table ingestion.
  9. Schedule incremental batch data loading.

Prerequisites

Before starting this tutorial, you should already be familiar with Iceberg. If you’re not, you can get started by replicating a single table following the instructions in Implement a CDC-based UPSERT in a data lake using Apache Iceberg and AWS Glue. Additionally, set up the following:

Set up a source endpoint for AWS DMS

Before we create our AWS DMS task, we need to set up a source endpoint to connect to the source database:

  1. On the AWS DMS console, choose Endpoints in the navigation pane.
  2. Choose Create endpoint.
  3. If your database is running on Amazon RDS, choose Select RDS DB instance, then choose the instance from the list. Otherwise, choose the source engine and provide the connection information either through AWS Secrets Manager or manually.
  4. For Endpoint identifier, enter a name for the endpoint; for example, source-postgresql.
  5. Choose Create endpoint.

Deploy the solution using AWS CloudFormation

Create a CloudFormation stack using the provided template. Complete the following steps:

  1. Choose Launch Stack:
  2. Choose Next.
  3. Provide a stack name, such as transactionaldl-postgresql.
  4. Enter the required parameters:
    1. DMSS3EndpointIAMRoleARN – The IAM role ARN for AWS DMS to write data into Amazon S3.
    2. ReplicationInstanceArn – The AWS DMS replication instance ARN.
    3. S3BucketStage – The name of the existing bucket used for the stage layer of the data lake.
    4. S3BucketGlue – The name of the existing S3 bucket for storing AWS Glue scripts.
    5. S3BucketRaw – The name of the existing bucket used for the raw layer of the data lake.
    6. SourceEndpointArn – The AWS DMS endpoint ARN that you created earlier.
    7. SourceName – The arbitrary identifier of the data source to replicate (for example, postgres). This is used to define the S3 path of the data lake (raw layer) where data will be stored.
  5. Do not modify the following parameters:
    1. SourceS3BucketBlog – The bucket name where the provided AWS Glue script is stored.
    2. SourceS3BucketPrefix – The bucket prefix name where the provided AWS Glue script is stored.
  6. Choose Next twice.
  7. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  8. Choose Create stack.

After approximately 5 minutes, the CloudFormation stack is deployed.

Review the AWS DMS replication task

The AWS CloudFormation deployment created an AWS DMS target endpoint for you. Because of two specific endpoint settings, the data will be ingested as we need it on Amazon S3.

  1. On the AWS DMS console, choose Endpoints in the navigation pane.
  2. Search for and choose the endpoint that begins with dmsIcebergs3endpoint.
  3. Review the endpoint settings:
    1. DataFormat is specified as parquet.
    2. TimestampColumnName will add the column last_update_time with the date of creation of the records on Amazon S3.

AWS DMS endpoint settings

The deployment also creates an AWS DMS replication task that begins with dmsicebergtask.

  1. Choose Replication tasks in the navigation pane and search for the task.

You will see that the Task Type is marked as Full load, ongoing replication. AWS DMS will perform an initial full load of existing data, and then create incremental files with changes performed to the source database.

On the Mapping Rules tab, there are two types of rules:

  • A selection rule with the name of the source schema and tables that will be ingested from the source database. By default, it uses the sample database provided in the prerequisites, dms_sample, and all tables with the keyword %.
  • Two transformation rules that include in the target files on Amazon S3 the schema name and table name as columns. This is used by our AWS Glue job to know to which tables the files in the data lake correspond.

To learn more about how to customize this for your own data sources, refer to Selection rules and actions.

AWS mapping rules

Let’s change some configurations to finish our task preparation.

  1. On the Actions menu, choose Modify.
  2. In the Task Settings section, under Stop task after full load completes, choose Stop after applying cached changes.

This way, we can control the initial load and incremental file generation as two different steps. We use this two-step approach to run the AWS Glue job once per each step.

  1. Under Task logs, choose Turn on CloudWatch logs.
  2. Choose Save.
  3. Wait about 1 minute for the database migration task status to show as Ready.

Add permissions for encryption and decryption or Lake Formation

Optionally, you can add permissions for encryption and decryption or Lake Formation.

Add encryption and decryption permissions

If your S3 buckets used for the raw and stage layers are encrypted using AWS Key Management Service (AWS KMS) customer managed keys, you need to add permissions to allow the AWS Glue job to access the data:

Add Lake Formation permissions

If you’re managing permissions using Lake Formation, you need to allow your AWS Glue job to create your domain’s databases and tables through the IAM role GlueJobRole.

  1. Grant permissions to create databases (for instructions, refer to Creating a Database).
  2. Grant SUPER permissions to the default database.
  3. Grant data location permissions.
  4. If you create databases manually, grant permissions on all databases to create tables. Refer to Granting table permissions using the Lake Formation console and the named resource method or Granting Data Catalog permissions using the LF-TBAC method according to your use case.

After you complete the later step of performing the initial data load, make sure to also add permissions for consumers to query the tables. The job role will become the owner of all the tables created, and the data lake admin can then perform grants to additional users.

Review table configuration in Parameter Store

The AWS Glue job that performs the data ingestion into Iceberg tables uses the table specification provided in Parameter Store. Complete the following steps to review the parameter store that was configured automatically for you. If needed, modify according to your own needs.

  1. On the Parameter Store console, choose My parameters in the navigation pane.

The CloudFormation stack created two parameters:

  • iceberg-config for job configurations
  • iceberg-tables for table configuration
  1. Choose the parameter iceberg-tables.

The JSON structure contains information that AWS Glue uses to read data and write the Iceberg tables on the target domain:

  • One object per table – The name of the object is created using the schema name, a period, and the table name; for example, schema.table.
  • primaryKey – This should be specified for every source table. You can provide a single column or a comma-separated list of columns (without spaces).
  • partitionCols – This optionally partitions columns for target tables. If you don’t want to create partitioned tables, provide an empty string. Otherwise, provide a single column or a comma-separated list of columns to be used (without spaces).
  1. If you want to use your own data source, use the following JSON code and replace the text in CAPS from the template provided. If you’re using the sample data source provided, keep the default settings:
{
    "SCHEMA_NAME.TABLE_NAME_1": {
        "primaryKey": "ONLY_PRIMARY_KEY",
        "domain": "TARGET_DOMAIN",
        "partitionCols": ""
    },
    "SCHEMA_NAME.TABLE_NAME_2": {
        "primaryKey": "FIRST_PRIMARY_KEY,SECOND_PRIMARY_KEY",
        "domain": "TARGET_DOMAIN",
        "partitionCols": "PARTITION_COLUMN_ONE,PARTITION_COLUMN_TWO"
    }
}
  1. Choose Save changes.

Perform initial data loading

Now that the required configuration is finished, we ingest the initial data. This step includes three parts: ingesting the data from the source relational database into the raw layer of the data lake, creating the Iceberg tables on the stage layer of the data lake, and verifying results using Athena.

Ingest data into the raw layer of the data lake

To ingest data from the relational data source (PostgreSQL if you are using the sample provided) to our transactional data lake using Iceberg, complete the following steps:

  1. On the AWS DMS console, choose Database migration tasks in the navigation pane.
  2. Select the replication task you created and on the Actions menu, choose Restart/Resume.
  3. Wait about 5 minutes for the replication task to complete. You can monitor the tables ingested on the Statistics tab of the replication task.

AWS DMS full load statistics

After some minutes, the task finishes with the message Full load complete.

  1. On the Amazon S3 console, choose the bucket you defined as the raw layer.

Under the S3 prefix defined on AWS DMS (for example, postgres), you should see a hierarchy of folders with the following structure:

  • Schema
    • Table name
      • LOAD00000001.parquet
      • LOAD0000000N.parquet

AWS DMS full load objects created on S3

If your S3 bucket is empty, review Troubleshooting migration tasks in AWS Database Migration Service before running the AWS Glue job.

Create and ingest data into Iceberg tables

Before running the job, let’s navigate the script of the AWS Glue job provided as part of the CloudFormation stack to understand its behavior.

  1. On the AWS Glue Studio console, choose Jobs in the navigation pane.
  2. Search for the job that starts with IcebergJob- and a suffix of your CloudFormation stack name (for example, IcebergJob-transactionaldl-postgresql).
  3. Choose the job.

AWS Glue ETL job review

The job script gets the configuration it needs from Parameter Store. The function getConfigFromSSM() returns job-related configurations such as source and target buckets from where the data needs to be read and written. The variable ssmparam_table_values contain table-related information like the data domain, table name, partition columns, and primary key of the tables that needs to be ingested. See the following Python code:

# Main application
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'stackName'])
SSM_PARAMETER_NAME = f"{args['stackName']}-iceberg-config"
SSM_TABLE_PARAMETER_NAME = f"{args['stackName']}-iceberg-tables"

# Parameters for job
rawS3BucketName, rawBucketPrefix, stageS3BucketName, warehouse_path = getConfigFromSSM(SSM_PARAMETER_NAME)
ssm_param_table_values = json.loads(ssmClient.get_parameter(Name = SSM_TABLE_PARAMETER_NAME)['Parameter']['Value'])
dropColumnList = ['db','table_name', 'schema_name','Op', 'last_update_time', 'max_op_date']

The script uses an arbitrary catalog name for Iceberg that is defined as my_catalog. This is implemented on the AWS Glue Data Catalog using Spark configurations, so a SQL operation pointing to my_catalog will be applied on the Data Catalog. See the following code:

catalog_name = 'my_catalog'
errored_table_list = []

# Iceberg configuration
spark = SparkSession.builder \
    .config('spark.sql.warehouse.dir', warehouse_path) \
    .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \
    .config(f'spark.sql.catalog.{catalog_name}.warehouse', warehouse_path) \
    .config(f'spark.sql.catalog.{catalog_name}.catalog-impl', 'org.apache.iceberg.aws.glue.GlueCatalog') \
    .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO') \
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .getOrCreate()

The script iterates over the tables defined in Parameter Store and performs the logic for detecting if the table exists and if the incoming data is an initial load or an upsert:

# Iteration over tables stored on Parameter Store
for key in ssm_param_table_values:
    # Get table data
    isTableExists = False
    schemaName, tableName = key.split('.')
    logger.info(f'Processing table : {tableName}')

The initialLoadRecordsSparkSQL() function loads initial data when no operation column is present in the S3 files. AWS DMS adds this column only to Parquet data files produced by the continuous replication (CDC). The data loading is performed using the INSERT INTO command with SparkSQL. See the following code:

sqltemp = Template("""
    INSERT INTO $catalog_name.$dbName.$tableName  ($insertTableColumnList)
    SELECT $insertTableColumnList FROM insertTable $partitionStrSQL
""")
SQLQUERY = sqltemp.substitute(
    catalog_name = catalog_name, 
    dbName = dbName, 
    tableName = tableName,
    insertTableColumnList = insertTableColumnList[ : -1],
    partitionStrSQL = partitionStrSQL)

logger.info(f'****SQL QUERY IS : {SQLQUERY}')
spark.sql(SQLQUERY)

Now we run the AWS Glue job to ingest the initial data into the Iceberg tables. The CloudFormation stack adds the --datalake-formats parameter, adding the required Iceberg libraries to the job.

  1. Choose Run job.
  2. Choose Job Runs to monitor the status. Wait until the status is Run Succeeded.

Verify the data loaded

To confirm that the job processed the data as expected, complete the following steps:

  1. On the Athena console, choose Query Editor in the navigation pane.
  2. Verify AwsDataCatalog is selected as the data source.
  3. Under Database, choose the data domain that you want to explore, based on the configuration you defined in the parameter store. If using the sample database provided, use sports.

Under Tables and views, we can see the list of tables that were created by the AWS Glue job.

  1. Choose the options menu (three dots) next to the first table name, then choose Preview Data.

You can see the data loaded into Iceberg tables. Amazon Athena review initial data loaded

Perform incremental data loading

Now we start capturing changes from our relational database and applying them to the transactional data lake. This step is also divided in three parts: capturing the changes, applying them to the Iceberg tables, and verifying the results.

Capture changes from the relational database

Due to the configuration we specified, the replication task stopped after running the full load phase. Now we restart the task to add incremental files with changes into the raw layer of the data lake.

  1. On the AWS DMS console, select the task we created and ran before.
  2. On the Actions menu, choose Resume.
  3. Choose Start task to start capturing changes.
  4. To trigger new file creation on the data lake, perform inserts, updates, or deletes on the tables of your source database using your preferred database administration tool. If using the sample database provided, you could run the following SQL commands:
UPDATE dms_sample.nfl_stadium_data_upd
SET seatin_capacity=93703
WHERE team = 'Los Angeles Rams' and sport_location_id = '31';

update  dms_sample.mlb_data 
set bats = 'R'
where mlb_id=506560 and bats='L';

update dms_sample.sporting_event 
set start_date  = current_date 
where id=11 and sold_out=0;
  1. On the AWS DMS task details page, choose the Table statistics tab to see the changes captured.
    AWS DMS CDC statistics
  2. Open the raw layer of the data lake to find a new file holding the incremental changes inside every table’s prefix, for example under the sporting_event prefix.

The record with changes for the sporting_event table looks like the following screenshot.

AWS DMS objects migrated into S3 with CDC

Notice the Op column in the beginning identified with an update (U). Also, the second date/time value is the control column added by AWS DMS with the time the change was captured.

CDC file schema on Amazon S3

Apply changes on the Iceberg tables using AWS Glue

Now we run the AWS Glue job again, and it will automatically process only the new incremental files since the job bookmark is enabled. Let’s review how it works.

The dedupCDCRecords() function performs deduplication of data because multiple changes to a single record ID could be captured within the same data file on Amazon S3. Deduplication is performed based on the last_update_time column added by AWS DMS that indicates the timestamp of when the change was captured. See the following Python code:

def dedupCDCRecords(inputDf, keylist):
    IDWindowDF = Window.partitionBy(*keylist).orderBy(inputDf.last_update_time).rangeBetween(-sys.maxsize, sys.maxsize)
    inputDFWithTS = inputDf.withColumn('max_op_date', max(inputDf.last_update_time).over(IDWindowDF))
    
    NewInsertsDF = inputDFWithTS.filter('last_update_time=max_op_date').filter("op='I'")
    UpdateDeleteDf = inputDFWithTS.filter('last_update_time=max_op_date').filter("op IN ('U','D')")
    finalInputDF = NewInsertsDF.unionAll(UpdateDeleteDf)

    return finalInputDF

On line 99, the upsertRecordsSparkSQL() function performs the upsert in a similar fashion to the initial load, but this time with a SQL MERGE command.

Review the applied changes

Open the Athena console and run a query that selects the changed records on the source database. If using the provided sample database, use one the following SQL queries:

SELECT * FROM "sports"."nfl_stadiu_data_upd"
WHERE team = 'Los Angeles Rams' and sport_location_id = 31
LIMIT 1;

Amazon Athena review cdc data loaded

Monitor table ingestion

The AWS Glue job script is coded with simple Python exception handling to catch errors during processing a specific table. The job bookmark is saved after each table finishes processing successfully, to avoid reprocessing tables if the job run is retried for the tables with errors.

The AWS Command Line Interface (AWS CLI) provides a get-job-bookmark command for AWS Glue that provides insight into the status of the bookmark for each table processed.

  1. On the AWS Glue Studio console, choose the ETL job.
  2. Choose the Job Runs tab and copy the job run ID.
  3. Run the following command on a terminal authenticated for the AWS CLI, replacing <GLUE_JOB_RUN_ID> on line 1 with the value you copied. If your CloudFormation stack is not named transactionaldl-postgresql, provide the name of your job on line 2 of the script:
jobrun=<GLUE_JOB_RUN_ID>
jobname=IcebergJob-transactionaldl-postgresql
aws glue get-job-bookmark --job-name jobname --run-id $jobrun

In this solution, when a table processing causes an exception, the AWS Glue job will not fail according to this logic. Instead, the table will be added into an array that is printed after the job is complete. In such scenario, the job will be marked as failed after it tries to process the rest of the tables detected on the raw data source. This way, tables without errors don’t have to wait until the user identifies and solves the problem on the conflicting tables. The user can quickly detect job runs that had issues using the AWS Glue job run status, and identify which specific tables are causing the problem using the CloudWatch logs for the job run.

  1. The job script implements this feature with the following Python code:
# Performed for every table
        try:
            # Table processing logic
        except Exception as e:
            logger.info(f'There is an issue with table: {tableName}')
            logger.info(f'The exception is : {e}')
            errored_table_list.append(tableName)
            continue
        job.commit()
if (len(errored_table_list)):
    logger.info('Total number of errored tables are ',len(errored_table_list))
    logger.info('Tables that failed during processing are ', *errored_table_list, sep=', ')
    raise Exception(f'***** Some tables failed to process.')

The following screenshot shows how the CloudWatch logs look for tables that cause errors on processing.

AWS Glue job monitoring with logs

Aligned with the AWS Well-Architected Framework Data Analytics Lens practices, you can adapt more sophisticated control mechanisms to identify and notify stakeholders when errors appear on the data pipelines. For example, you can use an Amazon DynamoDB control table to store all tables and job runs with errors, or using Amazon Simple Notification Service (Amazon SNS) to send alerts to operators when certain criteria is met.

Schedule incremental batch data loading

The CloudFormation stack deploys an Amazon EventBridge rule (disabled by default) that can trigger the AWS Glue job to run on a schedule. To provide your own schedule and enable the rule, complete the following steps:

  1. On the EventBridge console, choose Rules in the navigation pane.
  2. Search for the rule prefixed with the name of your CloudFormation stack followed by JobTrigger (for example, transactionaldl-postgresql-JobTrigger-randomvalue).
  3. Choose the rule.
  4. Under Event Schedule, choose Edit.

The default schedule is configured to trigger every hour.

  1. Provide the schedule you want to run the job.
  2. Additionally, you can use an EventBridge cron expression by selecting A fine-grained schedule.
    Amazon EventBridge schedule ETL job
  3. When you finish setting up the cron expression, choose Next three times, and finally choose Update Rule to save changes.

The rule is created disabled by default to allow you to run the initial data load first.

  1. Activate the rule by choosing Enable.

You can use the Monitoring tab to view rule invocations, or directly on the AWS Glue Job Run details.

Conclusion

After deploying this solution, you have automated the ingestion of your tables on a single relational data source. Organizations using a data lake as their central data platform usually need to handle multiple, sometimes even tens of data sources. Also, more and more use cases require organizations to implement transactional capabilities to the data lake. You can use this solution to accelerate the adoption of such capabilities across all your relational data sources to enable new business use cases, automating the implementation process to derive more value from your data.


About the Authors

Luis Gerardo BaezaLuis Gerardo Baeza is a Big Data Architect in the Amazon Web Services (AWS) Data Lab. He has 12 years of experience helping organizations in the healthcare, financial and education sectors to adopt enterprise architecture programs, cloud computing, and data analytics capabilities. Luis currently helps organizations across Latin America to accelerate strategic data initiatives.

SaiKiran Reddy AenuguSaiKiran Reddy Aenugu is a Data Architect in the Amazon Web Services (AWS) Data Lab. He has 10 years of experience implementing data loading, transformation, and visualization processes. SaiKiran currently helps organizations in North America to adopt modern data architectures such as data lakes and data mesh. He has experience in the retail, airline, and finance sectors.

Narendra MerlaNarendra Merla is a Data Architect in the Amazon Web Services (AWS) Data Lab. He has 12 years of experience in designing and productionalizing both real-time and batch-oriented data pipelines and building data lakes on both cloud and on-premises environments. Narendra currently helps organizations in North America to build and design robust data architectures, and has experience in the telecom and finance sectors.

Build a data lake with Apache Flink on Amazon EMR

Post Syndicated from Jianwei Li original https://aws.amazon.com/blogs/big-data/build-a-unified-data-lake-with-apache-flink-on-amazon-emr/

To build a data-driven business, it is important to democratize enterprise data assets in a data catalog. With a unified data catalog, you can quickly search datasets and figure out data schema, data format, and location. The AWS Glue Data Catalog provides a uniform repository where disparate systems can store and find metadata to keep track of data in data silos.

Apache Flink is a widely used data processing engine for scalable streaming ETL, analytics, and event-driven applications. It provides precise time and state management with fault tolerance. Flink can process bounded stream (batch) and unbounded stream (stream) with a unified API or application. After data is processed with Apache Flink, downstream applications can access the curated data with a unified data catalog. With unified metadata, both data processing and data consuming applications can access the tables using the same metadata.

This post shows you how to integrate Apache Flink in Amazon EMR with the AWS Glue Data Catalog so that you can ingest streaming data in real time and access the data in near-real time for business analysis.

Apache Flink connector and catalog architecture

Apache Flink uses a connector and catalog to interact with data and metadata. The following diagram shows the architecture of the Apache Flink connector for data read/write, and catalog for metadata read/write.

Flink Glue Architecture

For data read/write, Flink has the interface DynamicTableSourceFactory for read and DynamicTableSinkFactory for write. A different Flink connector implements two interfaces to access data in different storage. For example, the Flink FileSystem connector has FileSystemTableFactory to read/write data in Hadoop Distributed File System (HDFS) or Amazon Simple Storage Service (Amazon S3), the Flink HBase connector has HBase2DynamicTableFactory to read/write data in HBase, and the Flink Kafka connector has KafkaDynamicTableFactory to read/write data in Kafka. You can refer to Table & SQL Connectors for more information.

For metadata read/write, Flink has the catalog interface. Flink has three built-in implementations for the catalog. GenericInMemoryCatalog stores the catalog data in memory. JdbcCatalog stores the catalog data in a JDBC-supported relational database. As of this writing, MySQL and PostgreSQL databases are supported in the JDBC catalog. HiveCatalog stores the catalog data in Hive Metastore. HiveCatalog uses HiveShim to provide different Hive version compatibility. We can configure different metastore clients to use Hive Metastore or the AWS Glue Data Catalog. In this post, we configure the Amazon EMR property hive.metastore.client.factory.class to com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory (see Using the AWS Glue Data Catalog as the metastore for Hive) so that we can use the AWS Glue Data Catalog to store Flink catalog data. Refer to Catalogs for more information.

Most Flink built-in connectors, such as for Kafka, Amazon Kinesis, Amazon DynamoDB, Elasticsearch, or FileSystem, can use Flink HiveCatalog to store metadata in the AWS Glue Data Catalog. However, some connector implementations such as Apache Iceberg have their own catalog management mechanism. FlinkCatalog in Iceberg implements the catalog interface in Flink. FlinkCatalog in Iceberg has a wrapper to its own catalog implementation. The following diagram shows the relationship between Apache Flink, the Iceberg connector, and the catalog. For more information, refer to Creating catalogs and using catalogs and Catalogs.

Flink Iceberg Glue Architecture

Apache Hudi also has its own catalog management. Both HoodieCatalog and HoodieHiveCatalog implements a catalog interface in Flink. HoodieCatalog stores metadata in a file system such as HDFS. HoodieHiveCatalog stores metadata in Hive Metastore or the AWS Glue Data Catalog, depending on whether you configure hive.metastore.client.factory.class to use com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory. The following diagram shows relationship between Apache Flink, the Hudi connector, and the catalog. For more information, refer to Create Catalog.

Flink Hudi Glue Architecture

Because Iceberg and Hudi have different catalog management mechanisms, we show three scenarios of Flink integration with the AWS Glue Data Catalog in this post:

  • Read/Write to Iceberg tables in Flink with metadata in Glue Data Catalog
  • Read/Write to Hudi tables in Flink with metadata in Glue Data Catalog
  • Read/Write to other storage format in Flink with metadata in Glue Data Catalog

Solution overview

The following diagram shows the overall architecture of the solution described in this post.

Flink Glue Integration

In this solution, we enable an Amazon RDS for MySQL binlog to extract transaction changes in real time. The Amazon EMR Flink CDC connector reads the binlog data and processes the data. Transformed data can be stored in Amazon S3. We use the AWS Glue Data Catalog to store the metadata such as table schema and table location. Downstream data consumer applications such as Amazon Athena or Amazon EMR Trino access the data for business analysis.

The following are the high-level steps to set up this solution:

  1. Enable binlog for Amazon RDS for MySQL and initialize the database.
  2. Create an EMR cluster with the AWS Glue Data Catalog.
  3. Ingest change data capture (CDC) data with Apache Flink CDC in Amazon EMR.
  4. Store the processed data in Amazon S3 with metadata in the AWS Glue Data Catalog.
  5. Verify all table metadata is stored in the AWS Glue Data Catalog.
  6. Consume data with Athena or Amazon EMR Trino for business analysis.
  7. Update and delete source records in Amazon RDS for MySQL and validate the reflection of the data lake tables.

Prerequisites

This post uses an AWS Identity and Access Management (IAM) role with permissions for the following services:

  • Amazon RDS for MySQL (5.7.40)
  • Amazon EMR (6.9.0)
  • Amazon Athena
  • AWS Glue Data Catalog
  • Amazon S3

Enable binlog for Amazon RDS for MySQL and initialize the database

To enable CDC in Amazon RDS for MySQL, we need to configure binary logging for Amazon RDS for MySQL. Refer to Configuring MySQL binary logging for more information. We also create the database salesdb in MySQL and create the tables customer, order, and others to set up the data source.

  1. On the Amazon RDS console, choose Parameter groups in the navigation pane.
  2. Create a new parameter group for MySQL.
  3. Edit the parameter group you just created to set binlog_format=ROW.

RDS-Binlog-Format

  1. Edit the parameter group you just created to set binlog_row_image=full.

RDS-Binlog-Row-Image

  1. Create an RDS for MySQL DB instance with the parameter group.
  2. Note down the values for hostname, username, and password, which we use later.
  3. Download the MySQL database initialization script from Amazon S3 by running the following command:
aws s3 cp s3://emr-workshops-us-west-2/glue_immersion_day/scripts/salesdb.sql ./salesdb.sql
  1. Connect to the RDS for MySQL database and run the salesdb.sql command to initialize the database, providing the host name and user name according to your RDS for MySQL database configuration:
mysql -h <hostname> -u <username> -p
mysql> source salesdb.sql

Create an EMR cluster with the AWS Glue Data Catalog

From Amazon EMR 6.9.0, the Flink table API/SQL can integrate with the AWS Glue Data Catalog. To use the Flink and AWS Glue integration, you must create an Amazon EMR 6.9.0 or later version.

  1. Create the file iceberg.properties for the Amazon EMR Trino integration with the Data Catalog. When the table format is Iceberg, your file should have following content:
iceberg.catalog.type=glue
connector.name=iceberg
  1. Upload iceberg.properties to an S3 bucket, for example DOC-EXAMPLE-BUCKET.

For more information on how to integrate Amazon EMR Trino with Iceberg, refer to Use an Iceberg cluster with Trino.

  1. Create the file trino-glue-catalog-setup.sh to configure the Trino integration with the Data Catalog. Use trino-glue-catalog-setup.sh as the bootstrap script. Your file should have the following content (replace DOC-EXAMPLE-BUCKET with your S3 bucket name):
set -ex 
sudo aws s3 cp s3://DOC-EXAMPLE-BUCKET/iceberg.properties /etc/trino/conf/catalog/iceberg.properties

  1. Upload trino-glue-catalog-setup.sh to your S3 bucket (DOC-EXAMPLE-BUCKET).

Refer to Create bootstrap actions to install additional software to run a bootstrap script.

  1. Create the file flink-glue-catalog-setup.sh to configure the Flink integration with the Data Catalog.
  2. Use a script runner and run the flink-glue-catalog-setup.sh script as a step function.

Your file should have the following content (the JAR file name here is using Amazon EMR 6.9.0; a later version JAR name may change, so make sure to update according to your Amazon EMR version).

Note that here we use an Amazon EMR step, not a bootstrap, to run this script. An Amazon EMR step script is run after Amazon EMR Flink is provisioned.

set -ex

sudo cp /usr/lib/hive/auxlib/aws-glue-datacatalog-hive3-client.jar /usr/lib/flink/lib
sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib
sudo cp /usr/lib/hive/lib/hive-exec.jar /lib/flink/lib
sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib
sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib
sudo chmod 755 /usr/lib/flink/lib/aws-glue-datacatalog-hive3-client.jar
sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar
sudo chmod 755 /usr/lib/flink/lib/hive-exec.jar
sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar
sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar

sudo wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jar -O /lib/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar
sudo chmod 755 /lib/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar

sudo ln -s /usr/share/aws/iceberg/lib/iceberg-flink-runtime.jar /usr/lib/flink/lib/
sudo ln -s /usr/lib/hudi/hudi-flink-bundle.jar /usr/lib/flink/lib/

sudo mv /usr/lib/flink/opt/flink-table-planner_2.12-1.15.2.jar /usr/lib/flink/lib/
sudo mv /usr/lib/flink/lib/flink-table-planner-loader-1.15.2.jar /usr/lib/flink/opt/
  1. Upload flink-glue-catalog-setup.sh to your S3 bucket (DOC-EXAMPLE-BUCKET).

Refer to Configuring Flink to Hive Metastore in Amazon EMR for more information on how to configure Flink and Hive Metastore. Refer to Run commands and scripts on an Amazon EMR cluster for more details on running the Amazon EMR step script.

  1. Create an EMR 6.9.0 cluster with the applications Hive, Flink, and Trino.

You can create an EMR cluster with the AWS Command Line Interface (AWS CLI) or the AWS Management Console. Refer to the appropriate subsection for instructions.

Create an EMR cluster with the AWS CLI

To use the AWS CLI, complete the following steps:

  1. Create the file emr-flink-trino-glue.json to configure Amazon EMR to use the Data Catalog. Your file should have the following content:
[
{
"Classification": "hive-site",
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
}
},
{
"Classification": "trino-connector-hive",
"Properties": {
"hive.metastore": "glue"
}
}
]
  1. Run the following command to create the EMR cluster. Provide your local emr-flink-trino-glue.json parent folder path, S3 bucket, EMR cluster Region, EC2 key name, and S3 bucket for EMR logs.
aws emr create-cluster --release-label emr-6.9.0 \
--applications Name=Hive Name=Flink Name=Spark Name=Trino \
--region us-west-2 \
--name flink-trino-glue-emr69 \
--configurations "file:///<your configuration path>/emr-flink-trino-glue.json" \
--bootstrap-actions '[{"Path":"s3://DOC-EXAMPLE-BUCKET/trino-glue-catalog-setup.sh","Name":"Add iceberg.properties for Trino"}]' \
--steps '[{"Args":["s3://DOC-EXAMPLE-BUCKET/flink-glue-catalog-setup.sh"],"Type":"CUSTOM_JAR","ActionOnFailure":"CONTINUE","Jar":"s3://<region>.elasticmapreduce/libs/script-runner/script-runner.jar","Properties":"","Name":"Flink-glue-integration"}]' \
--instance-groups \
InstanceGroupType=MASTER,InstanceType=m6g.2xlarge,InstanceCount=1 \
InstanceGroupType=CORE,InstanceType=m6g.2xlarge,InstanceCount=2 \
--use-default-roles \
--ebs-root-volume-size 30 \
--ec2-attributes KeyName=<keyname> \
--log-uri s3://<s3-bucket-for-emr>/elasticmapreduce/

Create an EMR cluster on the console

To use the console, complete the following steps:

  1. On the Amazon EMR console, create an EMR cluster and select Use for Hive table metadata for AWS Glue Data Catalog settings.
  2. Add configuration settings with the following code:
[
{
"Classification": "trino-connector-hive",
"Properties": {
"hive.metastore": "glue"
}
}
]

EMR-6.9-Flink-Hive-Glue-1

  1. In the Steps section, add a step called Custom JAR.
  2. Set JAR location to s3://<region>.elasticmapreduce/libs/script-runner/script-runner.jar, where <region> is the region in which your EMR cluster resides.
  3. Set Arguments to the S3 path you uploaded earlier.

EMR-6.9-Flink-Hive-Glue-2

  1. In the Bootstrap Actions section, choose Custom Action.
  2. Set Script location to the S3 path you uploaded.

EMR-6.9-Flink-Hive-Glue-3

  1. Continue the subsequent steps to complete your EMR cluster creation.

Ingest CDC data with Apache Flink CDC in Amazon EMR

The Flink CDC connector supports reading database snapshots and captures updates in the configured tables. We have deployed the Flink CDC connector for MySQL by downloading flink-sql-connector-mysql-cdc-2.2.1.jar and putting it into the Flink library when we create our EMR cluster. The Flink CDC connector can use the Flink Hive catalog to store Flink CDC table schema into Hive Metastore or the AWS Glue Data Catalog. In this post, we use the Data Catalog to store our Flink CDC table.

Complete the following steps to ingest RDS for MySQL databases and tables with Flink CDC and store metadata in the Data Catalog:

  1. SSH to the EMR primary node.
  2. Start Flink on a YARN session by running the following command, providing your S3 bucket name:
flink-yarn-session -d -jm 2048 -tm 4096 -s 2 \
-D state.backend=rocksdb \
-D state.backend.incremental=true \
-D state.checkpoint-storage=filesystem \
-D state.checkpoints.dir=s3://<flink-glue-integration-bucket>/flink-checkponts/ \
-D state.checkpoints.num-retained=10 \
-D execution.checkpointing.interval=10s \
-D execution.checkpointing.mode=EXACTLY_ONCE \
-D execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION \
-D execution.checkpointing.max-concurrent-checkpoints=1
  1. Start the Flink SQL client CLI by running the following command:
/usr/lib/flink/bin/sql-client.sh embedded
  1. Create the Flink Hive catalog by specifying the catalog type as hive and providing your S3 bucket name:
CREATE CATALOG glue_catalog WITH (
'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/etc/hive/conf.dist'
);
USE CATALOG glue_catalog;
CREATE DATABASE IF NOT EXISTS flink_cdc_db WITH ('hive.database.location-uri'= 's3://<flink-glue-integration-bucket>/flink-glue-for-hive/warehouse/')
use flink_cdc_db;

Because we’re configuring the EMR Hive catalog use the AWS Glue Data Catalog, all the databases and tables created in the Flink Hive catalog are stored in the Data Catalog.

  1. Create the Flink CDC table, providing the host name, user name, and password for the RDS for MySQL instance you created earlier.

Note that because the RDS for MySQL user name and password will be stored in the Data Catalog as table properties, you should be enable AWS Glue database/table authorization with AWS Lake Formation to protect your sensitive data.

CREATE TABLE `glue_catalog`.`flink_cdc_db`.`customer_cdc` (
`CUST_ID` double NOT NULL,
`NAME` STRING NOT NULL,
`MKTSEGMENT` STRING NOT NULL,
PRIMARY KEY (`CUST_ID`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '<hostname>',
'port' = '3306',
'username' = '<username>',
'password' = '<password>',
'database-name' = 'salesdb',
'table-name' = 'CUSTOMER'
);

CREATE TABLE `glue_catalog`.`flink_cdc_db`.`customer_site_cdc` (
`SITE_ID` double NOT NULL,
`CUST_ID` double NOT NULL,
`ADDRESS` STRING NOT NULL,
`CITY` STRING NOT NULL,
`STATE` STRING NOT NULL,
`COUNTRY` STRING NOT NULL,
`PHONE` STRING NOT NULL,
PRIMARY KEY (`SITE_ID`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '<hostname>',
'port' = '3306',
'username' = '<username>',
'password' = '<password>',
'database-name' = 'salesdb',
'table-name' = 'CUSTOMER_SITE'
);

CREATE TABLE `glue_catalog`.`flink_cdc_db`.`sales_order_all_cdc` (
`ORDER_ID` int NOT NULL,
`SITE_ID` double NOT NULL,
`ORDER_DATE` TIMESTAMP NOT NULL,
`SHIP_MODE` STRING NOT NULL
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '<hostname>',
'port' = '3306',
'username' = '<username>',
'password' = '<password>',
'database-name' = 'salesdb',
'table-name' = 'SALES_ORDER_ALL',
'scan.incremental.snapshot.enabled' = 'FALSE'
);
  1. Query the table you just created:
SELECT count(O.ORDER_ID) AS ORDER_COUNT,
C.CUST_ID,
C.NAME,
C.MKTSEGMENT
FROM   customer_cdc C
JOIN customer_site_cdc CS
ON C.CUST_ID = CS.CUST_ID
JOIN sales_order_all_cdc O
ON O.SITE_ID = CS.SITE_ID
GROUP  BY C.CUST_ID,
C.NAME,
C.MKTSEGMENT;

You will get a query result like following screenshot.

Flink-SQL-CDC-Test

Store processed data in Amazon S3 with metadata in the Data Catalog

As we’re ingesting the relational database data in Amazon RDS for MySQL, raw data may be updated or deleted. To support data update and delete, we can choose data lake technologies such as Apache Iceberg or Apache Hudi to store the processed data. As we mentioned earlier, Iceberg and Hudi have different catalog management. We show both scenarios to use Flink to read/write the Iceberg and Hudi tables with metadata in the AWS Glue Data Catalog.

For non-Iceberg and non-Hudi, we use a FileSystem Parquet file to show how the Flink built-in connector uses the Data Catalog.

Read/Write to Iceberg tables in Flink with metadata in Glue Data Catalog

The following diagram shows the architecture for this configuration.

Flink Glue Integration for Iceberg

  1. Create a Flink Iceberg catalog using the Data Catalog by specifying catalog-impl as org.apache.iceberg.aws.glue.GlueCatalog.

For more information about Flink and Data Catalog integration for Iceberg, refer to Glue Catalog.

  1. In the Flink SQL client CLI, run the following command, providing your S3 bucket name:
CREATE CATALOG glue_catalog_for_iceberg WITH (
'type'='iceberg',
'warehouse'='s3://<flink-glue-integration-bucket>/flink-glue-for-iceberg/warehouse/',
'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
'io-impl'='org.apache.iceberg.aws.s3.S3FileIO',
'lock-impl'='org.apache.iceberg.aws.glue.DynamoLockManager',
'lock.table'='FlinkGlue4IcebergLockTable' );
  1. Create an Iceberg table to store processed data:
USE CATALOG glue_catalog_for_iceberg;
CREATE DATABASE IF NOT EXISTS flink_glue_iceberg_db;
USE flink_glue_iceberg_db;
CREATE TABLE `glue_catalog_for_iceberg`.`flink_glue_iceberg_db`.`customer_summary` (
`CUSTOMER_ID` bigint,
`NAME` STRING,
`MKTSEGMENT` STRING,
`COUNTRY` STRING,
`ORDER_COUNT` BIGINT,
PRIMARY KEY (`CUSTOMER_ID`) NOT Enforced
)
WITH (
'format-version'='2',
'write.upsert.enabled'='true');
  1. Insert the processed data into Iceberg:
INSERT INTO `glue_catalog_for_iceberg`.`flink_glue_iceberg_db`.`customer_summary`
SELECT CAST(C.CUST_ID AS BIGINT) CUST_ID,
C.NAME,
C.MKTSEGMENT,
CS.COUNTRY,
count(O.ORDER_ID) AS ORDER_COUNT
FROM   `glue_catalog`.`flink_cdc_db`.`customer_cdc` C
JOIN `glue_catalog`.`flink_cdc_db`.`customer_site_cdc` CS
ON C.CUST_ID = CS.CUST_ID
JOIN `glue_catalog`.`flink_cdc_db`.`sales_order_all_cdc` O
ON O.SITE_ID = CS.SITE_ID
GROUP  BY C.CUST_ID,
C.NAME,
C.MKTSEGMENT,
CS.COUNTRY;

Read/Write to Hudi tables in Flink with metadata in Glue Data Catalog

The following diagram shows the architecture for this configuration.

Flink Glue Integration for Hudi

Complete the following steps:

  1. Create a catalog for Hudi to use the Hive catalog by specifying mode as hms.

Because we already configured Amazon EMR to use the Data Catalog when we created the EMR cluster, this Hudi Hive catalog uses the Data Catalog under the hood. For more information about Flink and Data Catalog integration for Hudi, refer to Create Catalog.

  1. In the Flink SQL client CLI, run the following command, providing your S3 bucket name:
CREATE CATALOG glue_catalog_for_hudi WITH (
'type' = 'hudi',
'mode' = 'hms',
'table.external' = 'true',
'default-database' = 'default',
'hive.conf.dir' = '/etc/hive/conf.dist',
'catalog.path' = 's3://<flink-glue-integration-bucket>/flink-glue-for-hudi/warehouse/'
);
  1. Create a Hudi table using the Data Catalog, and provide your S3 bucket name:
USE CATALOG glue_catalog_for_hudi;
CREATE DATABASE IF NOT EXISTS flink_glue_hudi_db;
use flink_glue_hudi_db;
CREATE TABLE `glue_catalog_for_hudi`.`flink_glue_hudi_db`.`customer_summary` (
`CUSTOMER_ID` bigint,
`NAME` STRING,
`MKTSEGMENT` STRING,
`COUNTRY` STRING,
`ORDER_COUNT` BIGINT,
PRIMARY KEY (`CUSTOMER_ID`) NOT Enforced
)
WITH (
'connector' = 'hudi',
'write.tasks' = '4',
'path' = 's3://<flink-glue-integration-bucket>/flink-glue-for-hudi/warehouse/customer_summary',
'table.type' = 'COPY_ON_WRITE',
'read.streaming.enabled' = 'true',
'read.streaming.check-interval' = '1'
);
  1. Insert the processed data into Hudi:
INSERT INTO `glue_catalog_for_hudi`.`flink_glue_hudi_db`.`customer_summary`
SELECT CAST(C.CUST_ID AS BIGINT) CUST_ID,
C.NAME,
C.MKTSEGMENT,
CS.COUNTRY,
count(O.ORDER_ID) AS ORDER_COUNT
FROM   `glue_catalog`.`flink_cdc_db`.`customer_cdc` C
JOIN `glue_catalog`.`flink_cdc_db`.`customer_site_cdc` CS
ON C.CUST_ID = CS.CUST_ID
JOIN `glue_catalog`.`flink_cdc_db`.`sales_order_all_cdc` O
ON O.SITE_ID = CS.SITE_ID
GROUP  BY C.CUST_ID,
C.NAME,
C.MKTSEGMENT,
CS.COUNTRY;

Read/Write to other storage format in Flink with metadata in Glue Data Catalog

The following diagram shows the architecture for this configuration.

Flink Glue Integration for Parquet

We already created the Flink Hive catalog in the previous step, so we’ll reuse that catalog.

  1. In the Flink SQL client CLI, run the following command:
USE CATALOG glue_catalog;
CREATE DATABASE IF NOT EXISTS flink_hive_parquet_db;
use flink_hive_parquet_db;

We change the SQL dialect to Hive to create a table with Hive syntax.

  1. Create a table with the following SQL, and provide your S3 bucket name:
SET table.sql-dialect=hive;

CREATE TABLE `customer_summary` (
`CUSTOMER_ID` bigint,
`NAME` STRING,
`MKTSEGMENT` STRING,
`COUNTRY` STRING,
`ORDER_COUNT` BIGINT
)
STORED AS parquet
LOCATION 's3://<flink-glue-integration-bucket>/flink-glue-for-hive-parquet/warehouse/customer_summary';

Because Parquet files don’t support updated rows, we can’t consume data from CDC data. However, we can consume data from Iceberg or Hudi.

  1. Use the following code to query the Iceberg table and insert data into the Parquet table:
SET table.sql-dialect=default;
SET execution.runtime-mode = batch;
INSERT INTO `glue_catalog`.`flink_hive_parquet_db`.`customer_summary`
SELECT * from `glue_catalog_for_iceberg`.`flink_glue_iceberg_db`.`customer_summary`;

Verify all table metadata is stored in the Data Catalog

You can navigate to the AWS Glue console to verify all the tables are stored in the Data Catalog.

  1. On the AWS Glue console, choose Databases in the navigation pane to list all the databases we created.

Glue-Databases

  1. Open a database and verify that all the tables are in that database.

Glue-Tables

Consume data with Athena or Amazon EMR Trino for business analysis

You can use Athena or Amazon EMR Trino to access the result data.

Query the data with Athena

To access the data with Athena, complete the following steps:

  1. Open the Athena query editor.
  2. Choose flink_glue_iceberg_db for Database.

You should see the customer_summary table listed.

  1. Run the following SQL script to query the Iceberg result table:
select * from customer_summary order by order_count desc limit 10

The query result will look like the following screenshot.

Athena-Iceberg-Query

  1. For the Hudi table, change Database to flink_glue_hudi_db and run the same SQL query.

Athena-Hudi-Query

  1. For the Parquet table, change Database to flink_hive_parquet_db and run the same SQL query.

Athena-Parquet-Query

Query the data with Amazon EMR Trino

To access Iceberg with Amazon EMR Trino, SSH to the EMR primary node.

  1. Run the following command to start the Trino CLI:
trino-cli --catalog iceberg

Amazon EMR Trino can now query the tables in the AWS Glue Data Catalog.

  1. Run the following command to query the result table:
show schemas;
use flink_glue_iceberg_db;
show tables;
select * from customer_summary order by order_count desc limit 10;

The query result looks like the following screenshot.

EMR-Trino-Iceberg-Query

  1. Exit the Trino CLI.
  2. Start the Trino CLI with the hive catalog to query the Hudi table:
trino-cli --catalog hive
  1. Run the following command to query the Hudi table:
show schemas;
use flink_glue_hudi_db;
show tables;
select * from customer_summary order by order_count desc limit 10;

Update and delete source records in Amazon RDS for MySQL and validate the reflection of the data lake tables

We can update and delete some records in the RDS for MySQL database and then validate that the changes are reflected in the Iceberg and Hudi tables.

  1. Connect to the RDS for MySQL database and run the following SQL:
update CUSTOMER set NAME = 'updated_name' where CUST_ID=7;

delete from CUSTOMER where CUST_ID=11;
  1. Query the customer_summary table with Athena or Amazon EMR Trino.

The updated and deleted records are reflected in the Iceberg and Hudi tables.

Athena-Iceberg-Query-Updated

Clean up

When you’re done with this exercise, complete the following steps to delete your resources and stop incurring costs:

  1. Delete the RDS for MySQL database.
  2. Delete the EMR cluster.
  3. Drop the databases and tables created in the Data Catalog.
  4. Remove files in Amazon S3.

Conclusion

This post showed you how to integrate Apache Flink in Amazon EMR with the AWS Glue Data Catalog. You can use a Flink SQL connector to read/write data in a different store, such as Kafka, CDC, HBase, Amazon S3, Iceberg, or Hudi. You can also store the metadata in the Data Catalog. The Flink table API has the same connector and catalog implementation mechanism. In a single session, we can use multiple catalog instances pointing to different types, like IcebergCatalog and HiveCatalog, and use then interchangeably in your query. You can also write code with the Flink table API to develop the same solution to integrate Flink and the Data Catalog.

In our solution, we consumed the RDS for MySQL binary log directly with Flink CDC. You can also use Amazon MSK Connect to consume the binary log with MySQL Debezim and store the data in Amazon Managed Streaming for Apache Kafka (Amazon MSK). Refer to Create a low-latency source-to-data lake pipeline using Amazon MSK Connect, Apache Flink, and Apache Hudi for more information.

With the Amazon EMR Flink unified batch and streaming data processing function, you can ingest and process data with one computing engine. With Apache Iceberg and Hudi integrated in Amazon EMR, you can build an evolvable and scalable data lake. With the AWS Glue Data Catalog, you can manage all enterprise data catalogs in a unified manner and consume data easily.

Follow the steps in this post to build your unified batch and streaming solution with Amazon EMR Flink and the AWS Glue Data Catalog. Please leave a comment if you have any questions.


About the Authors

Jianwei Li is Senior Analytics Specialist TAM. He provides consultant service for AWS enterprise support customers to design and build modern data platform.


Samrat Deb is Software Development Engineer at Amazon EMR. In his spare time, he love exploring new places, different culture and food.


Prabhu Josephraj is a Senior Software Development Engineer working for Amazon EMR. He is focused on leading the team that builds solutions in Apache Hadoop and Apache Flink. In his spare time, Prabhu enjoys spending time with his family.