Use the AWS Glue connector to read and write Apache Iceberg tables with ACID transactions and perform time travel

Post Syndicated from Tomohiro Tanaka original https://aws.amazon.com/blogs/big-data/use-the-aws-glue-connector-to-read-and-write-apache-iceberg-tables-with-acid-transactions-and-perform-time-travel/

Nowadays, many customers have built their data lakes as the core of their data analytic systems. In a typical use case of data lakes, many concurrent queries run to retrieve consistent snapshots of business insights by aggregating query results. A large volume of data constantly comes from different data sources into the data lakes. There is also a common demand to reflect the changes occurring in the data sources into the data lakes. This means that not only inserts but also updates and deletes need to be replicated into the data lakes.

Apache Iceberg provides the capability of ACID transactions on your data lakes, which allows concurrent queries to add or delete records isolated from any existing queries with read-consistency for queries. Iceberg is an open table format designed for large analytic workloads on huge datasets. You can perform ACID transactions against your data lakes by using simple SQL expressions. It also enables time travel, rollback, hidden partitioning, and schema evolution changes, such as adding, dropping, renaming, updating, and reordering columns.

AWS Glue is one of the key elements to building data lakes. It extracts data from multiple sources and ingests your data to your data lake built on Amazon Simple Storage Service (Amazon S3) using both batch and streaming jobs. To expand the accessibility of your AWS Glue extract, transform, and load (ETL) jobs to Iceberg, AWS Glue provides an Apache Iceberg connector. The connector allows you to build Iceberg tables on your data lakes and run Iceberg operations such as ACID transactions, time travel, rollbacks, and so on from your AWS Glue ETL jobs.

In this post, we give an overview of how to set up the Iceberg connector for AWS Glue and configure the relevant resources to use Iceberg with AWS Glue jobs. We also demonstrate how to run typical Iceberg operations on AWS Glue interactive sessions with an example use case.

Apache Iceberg connector for AWS Glue

With the Apache Iceberg connector for AWS Glue, you can take advantage of the following Iceberg capabilities:

  • Basic operations on Iceberg tables – This includes creating Iceberg tables in the AWS Glue Data Catalog and inserting, updating, and deleting records with ACID transactions in the Iceberg tables
  • Inserting and updating records – You can run UPSERT (update and insert) queries for your Iceberg table
  • Time travel on Iceberg tables – You can read a specific version of an Iceberg table from table snapshots that Iceberg manages
  • Rollback of table versions – You can revert an Iceberg table back to a specific version of the table

Iceberg offers additional useful capabilities such as hidden partitioning; schema evolution with add, drop, update, and rename support; automatic data compaction; and more. For more details about Iceberg, refer to the Apache Iceberg documentation.

Next, we demonstrate how the Apache Iceberg connector for AWS Glue works for each Iceberg capability based on an example use case.

Overview of example customer scenario

Let’s assume that an ecommerce company sells products on their online platform. Customers can buy products and write reviews to each product. Customers can add, update, or delete their reviews at any time. The customer reviews are an important source for analyzing customer sentiment and business trends.

In this scenario, we have the following teams in our organization:

  • Data engineering team – Responsible for building and managing data platforms.
  • Data analyst team – Responsible for analyzing customer reviews and creating business reports. This team queries the reviews daily, creates a business intelligence (BI) report, and shares it with sales team.
  • Customer support team – Responsible for replying to customer inquiries. This team queries the reviews when they get inquiries about the reviews.

Our solution has the following requirements:

  • Query scalability is important because the website is huge.
  • Individual customer reviews can be added, updated, and deleted.
  • The data analyst team needs to use both notebooks and ad hoc queries for their analysis.
  • The customer support team sometimes needs to view the history of the customer reviews.
  • Customer reviews can always be added, updated, and deleted, even while one of the teams is querying the reviews for analysis. This means that any result in a query isn’t affected by uncommitted customer review write operations.
  • Any changes in customer reviews that are made by the organization’s various teams need to be reflected in BI reports and query results.

In this post, we build a data lake of customer review data on top of Amazon S3. To meet these requirements, we introduce Apache Iceberg to enable adding, updating, and deleting records; ACID transactions; and time travel queries. We also use an AWS Glue Studio notebook to integrate and query the data at scale. First, we set up the connector so we can create an AWS Glue connection for Iceberg.

Set up the Apache Iceberg connector and create the Iceberg connection

We first set up Apache Iceberg connector for AWS Glue to use Apache Iceberg with AWS Glue jobs. Particularly, in this section, we set up the Apache Iceberg connector for AWS Glue and create an AWS Glue job with the connector. Complete the following steps:

  1. Navigate to the Apache Iceberg connector for AWS Glue page in AWS Marketplace.
  2. Choose Continue to Subscribe.

  1. Review the information under Terms and Conditions, and choose Accept Terms to continue.

  1. When the subscription is complete, choose Continue to Configuration.

  1. For Fulfillment option, choose Glue 3.0. (1.0 and 2.0 are also available options.)
  2. For Software version, choose the latest software version.

As of this writing, 0.12.0-2 is the latest version of the Apache Iceberg connector for AWS Glue.

  1. Choose Continue to Launch.

  1. Choose Usage instructions.
  2. Choose Activate the Glue connector from AWS Glue Studio.

You’re redirected to AWS Glue Studio.

  1. For Name, enter a name for your connection (for example, iceberg-connection).

  1. Choose Create connection and activate connector.

A message appears that the connection was successfully added, and the connection is now visible on the AWS Glue Studio console.

Configure resources and permissions

We use a provided AWS CloudFormation template to set up Iceberg configuration for AWS Glue. AWS CloudFormation creates the following resources:

  • An S3 bucket to store an Iceberg configuration file and actual data
  • An AWS Lambda function to generate an Iceberg configuration file based on parameters provided by a user for the CloudFormation template, and to clean up the resources created through this post
  • AWS Identity and Access Management (IAM) roles and policies with necessary permissions
  • An AWS Glue database in the Data Catalog to register Iceberg tables

To deploy the CloudFormation template, complete the following steps:

  1. Choose Launch Stack:

Launch Button

  1. For DynamoDBTableName, enter a name for an Amazon DynamoDB table that is created automatically when AWS Glue creates an Iceberg table.

This table is used for an AWS Glue job to obtain a commit lock to avoid concurrently modifying records in Iceberg tables. For more details about commit locking, refer to DynamoDB for Commit Locking. Note that you shouldn’t specify the name of an existing table.

  1. For IcebergDatabaseName, enter a name for the AWS Glue database that is created in the Data Catalog and used for registering Iceberg tables.
  2. Choose Next.

  1. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  2. Choose Create stack.

Start an AWS Glue Studio notebook to use Apache Iceberg

After you launch the CloudFormation stack, you create an AWS Glue Studio notebook to perform Iceberg operations. Complete the following steps:

  1. Download the Jupyter notebook file.
  2. On the AWS Glue console, choose Jobs in the navigation pane.
  3. Under Create job, select Jupyter Notebook.

  1. Select Upload and edit an existing notebook and upload iceberg-with-glue.ipynb.

  1. Choose Create.
  2. For Job name, enter a name.
  3. For IAM role, choose IcebergConnectorGlueJobRole, which was created via the CloudFormation template.
  4. Choose Start notebook job.

The process takes a few minutes to complete, after which you can see an AWS Glue Studio notebook view.

  1. Choose Save to save the notebook.

Set up the Iceberg configuration

To set up the Iceberg configuration, complete the following steps:

  1. Run the following cells with multiple options (magics). Note that you set your connection name for the %connections magic in the cell.

For more information, refer to Configuring AWS Glue Interactive Sessions for Jupyter and AWS Glue Studio notebooks.

A message Session <session-id> has been created appears when your AWS Glue Studio notebook is ready.

In the last cell in this section, you load your Iceberg configuration, which you specified when launching the CloudFormation stack. The Iceberg configuration includes a warehouse path for Iceberg actual data, a DynamoDB table name for commit locking, a database name for your Iceberg tables, and more.

To load the configuration, set the S3 bucket name that was created via the CloudFormation stack.

  1. On the AWS CloudFormation console, choose Stacks in the navigation pane.
  2. Choose the stack you created.
  3. On the Outputs tab, copy the S3 bucket name.

  1. Set the S3 name as the S3_BUCKET parameter in your notebook.

  1. Run the cell and load the Iceberg configuration that you set.

Initialize the job with Iceberg configurations

We continue to run cells to initiate a SparkSession in this section.

  1. Set an Iceberg warehouse path and a DynamoDB table name for Iceberg commit locking from the user_config parameter.
  2. Initialize a SparkSession by setting the Iceberg configurations.
  3. With the SparkSession object, create SparkContext and GlueContext objects.

The following screenshot shows the relevant section in the notebook.

We provide the details of each parameter that you configure for the SparkSession in the appendix of this post.

For this post, we demonstrate setting the Spark configuration for Iceberg. You can also set the configuration as AWS Glue job parameters. For more information, refer to the Usage Information section in the Iceberg connector product page.

Use case walkthrough

To walk through our use case, we use two tables; acr_iceberg and acr_iceberg_report. The table acr_iceberg contains the customer review data. The table acr_iceberg_report contains BI analysis results based on the customer review data. All changes to acr_iceberg also impact acr_iceberg_report. The table acr_iceberg_report needs to be updated daily, right before sharing business reports with stakeholders.

To demonstrate this use case, we walk through the following typical steps:

  1. A data engineering team registers the acr_iceberg and acr_iceberg_report tables in the Glue Data Catalog.
  2. Customers (ecommerce users) add reviews to products in the Industrial_Supplies category. These reviews are added to the Iceberg table.
  3. A customer requests to update their reviews. We simulate updating the customer review in the acr_iceberg table.
  4. We reflect the customer’s request of the updated review in acr_iceberg into acr_iceberg_report.
  5. We revert the customer’s request of the updated review for the customer review table acr_iceberg, and reflect the reversion in acr_iceberg_report.

1. Create Iceberg tables of customer reviews and BI reports

In this step, the data engineering team creates the acr_iceberg Iceberg table for customer reviews data (based on the Amazon Customer Reviews Dataset), and the team creates the acr_iceberg_report Iceberg table for BI reports.

Create the acr_iceberg table for customer reviews

The following code initially extracts the Amazon customer reviews, which are stored in a public S3 bucket. Then it creates an Iceberg table of the customer reviews and loads these reviews into your specified S3 bucket (created via CloudFormation stack). Note that the script loads partial datasets to avoid taking a lot of time to load the data.

# Loading the dataset and creating an Iceberg table. This will take about 3-5 minutes.
spark.read \
    .option('basePath', INPUT_BASE_PATH) \
    .parquet(*INPUT_CATEGORIES) \
    .writeTo(f'{CATALOG}.{DATABASE}.{TABLE}') \
    .tableProperty('format-version', '2') \
    .create()

Regarding the tableProperty parameter, we specify format version 2 to make the table version compatible with Amazon Athena. For more information about Athena support for Iceberg tables, refer to Considerations and limitations. To learn more about the difference between Iceberg table versions 1 and 2, refer to Appendix E: Format version changes.

Let’s run the following cells. Running the second cell takes around 3–5 minutes.

After you run the cells, the acr_iceberg table is available in your specified database in the Glue Data Catalog.

You can also see the actual data and metadata of the Iceberg table in the S3 bucket that is created through the CloudFormation stack. Iceberg creates the table and writes actual data and relevant metadata that includes table schema, table version information, and so on. See the following objects in your S3 bucket:

$ aws s3 ls 's3://your-bucket/data/' --recursive
YYYY-MM-dd hh:mm:ss   83616660 data/iceberg_blog_default.db/acr_iceberg/data/00000-44-c2983230-c43a-4f4a-9b89-1f7c13e59645-00001.parquet
YYYY-MM-dd hh:mm:ss   83247771 
...
YYYY-MM-dd hh:mm:ss       5134 data/iceberg_blog_default.db/acr_iceberg/metadata/00000-bc5d3ea2-280f-4e28-a71f-4c2b749ed637.metadata.json
YYYY-MM-dd hh:mm:ss     116950 data/iceberg_blog_default.db/acr_iceberg/metadata/411308cd-1f4d-4535-9444-f6b56a56697f-m0.avro
YYYY-MM-dd hh:mm:ss       3821 data/iceberg_blog_default.db/acr_iceberg/metadata/snap-6122957686233868728-1-411308cd-1f4d-4535-9444-f6b56a56697f.avro

The job tries to create a DynamoDB table, which you specified in the CloudFormation stack (in the following screenshot, its name is myGlueLockTable), if it doesn’t exist already. As we discussed earlier, the DynamoDB table is used for commit locking for Iceberg tables.

Create the acr_iceberg_report Iceberg table for BI reports

The data engineer team also creates the acr_iceberg_report table for BI reports in the Glue Data Catalog. This table initially has the following records.

comment_count avg_star product_category
1240 4.20729367860598 Camera
95 4.80167540490342 Industrial_Supplies
663 3.80123467540571 PC

To create the table, run the following cell.

The two Iceberg tables have been created. Let’s check the acr_iceberg table records by running a query.

Determine the average star rating for each product category by querying the Iceberg table

You can see the Iceberg table records by using a SELECT statement. In this section, we query the acr_iceberg table to simulate seeing a current BI report data by running an ad hoc query.

Run the following cell in the notebook to get the aggregated number of customer comments and mean star rating for each product_category.

The cell output has the following results.

Another way to query Iceberg tables is using Amazon Athena (when you use the Athena with Iceberg tables, you need to set up the Iceberg environment) or Amazon EMR.

2. Add customer reviews in the Iceberg table

In this section, customers add comments for some products in the Industrial Supplies product category, and we add these comments to the acr_iceberg table. To demonstrate this scenario, we create a Spark DataFrame based on the following new customer reviews and then add them to the table with an INSERT statement.

marketplace customer_id review_id product_id product_
parent
product_
title
star_
rating
helpful_
votes
total_
votes
vine verified_
purchase
review_
headline
review_
body
review_
date
year product_
category
US 12345689 ISB35E4556F144 I00EDBY7X8 989172340 plastic containers 5 0 0 N Y Five Stars Great product! 2022-02-01 2022 Industrial_
Supplies
US 78901234 IS4392CD4C3C4 I00D7JFOPC 952000001 battery tester 3 0 0 N Y nice one, but
it broke
some days later
nope 2022-02-01 2022 Industrial_
Supplies
US 12345123 IS97B103F8B24C I002LHA74O 818426953 spray bottle 2 1 1 N N Two Stars the bottle isn’t
as big as pictured.
2022-02-01 2022 Industrial_
Supplies
US 23000093 ISAB4268D46F3X I00ARPLCGY 562945918 3d printer 5 3 3 N Y Super great very useful 2022-02-01 2022 Industrial_
Supplies
US 89874312 ISAB4268137V2Y I80ARDQCY 564669018 circuit board 4 0 0 Y Y Great, but
a little bit expensive
you should buy this,
but note the price
2022-02-01 2022 Industrial_
Supplies

Run the following cells in the notebook to insert the customer comments to the Iceberg table. The process takes about 1 minute.

Run the next cell to see an addition to the product category Industrial_Supplies with 5 under comment_count.

3. Update a customer review in the Iceberg table

In the previous section, we added new customer reviews to the acr_iceberg Iceberg table. In this section, a customer requests an update of their review. Specifically, customer 78901234 requests the following update of the review ID IS4392CD4C3C4.

  • change star_rating from 3 to 5
  • update the review_headline from nice one, but it broke some days later to very good

We update the customer comment by using an UPDATE query by running the following cell.

We can review the updated record by running the next cell as follows.

Also, when you run this cell for the reporting table, you can see the updated avg_star column value for the Industrial_Supplies product category. Specifically, the avg_star value has been updated from 3.8 to 4.2 as a result of the star_rating changing from 3 to 5:

4. Reflect changes in the customer reviews table in the BI report table with a MERGE INTO query

In this section, we reflect the changes in the acr_iceberg table into the BI report table acr_iceberg_report. To do so, we run the MERGE INTO query and combine the two tables based on the condition of the product_category column in each table. This query works as follows:

  • When the product_category column in each table is the same, the query returns the sum of each column record
  • When the column in each table is not the same, the query just inserts a new record

This MERGE INTO operation is also referred to as an UPSERT (update and insert).

Run the following cell to reflect the update of customer reviews in the acr_iceberg table into the acr_iceberg_report BI table.

After the MERGE INTO query is complete, you can see the updated acr_iceberg_report table by running the following cell.

The MERGE INTO query performed the following changes:

  • In the Camera, Industrial_Supplies, and PC product categories, each comment_count is the sum between the initial value of the acr_iceberg_report table and the aggregated table value. For example, in the Industrial_Supplies product category row, the comment_count 100 is calculated by 95 (in the initial version of acr_iceberg_report) + 5 (in the aggregated report table).
  • In addition to comment_count, the avg_star in the Camera, Industrial_Supplies, or PC product category row is also computed by averaging between each avg_star value in acr_iceberg_report and in the aggregated table.
  • In other product categories, each comment_count and avg_star is the same as each value in the aggregated table, which means that each value in the aggregated table is inserted into the acr_iceberg_report table.

5. Roll back the Iceberg tables and reflect changes in the BI report table

In this section, the customer who requested the update of the review now requests to revert the updated review.

Iceberg stores versioning tables through the operations for Iceberg tables. We can see the information of each version of table by inspecting tables, and we can also time travel or roll back tables to an old table version.

To complete the customer request to revert the updated review, we need to revert the table version of acr_iceberg to the earlier version when we first added the reviews. Additionally, we need to update the acr_iceberg_report table to reflect the rollback of the acr_iceberg table version. Specifically, we need to perform the following three steps to complete these operations:

  1. Check the history of table changes of acr_iceberg and acr_iceberg_report to get each table snapshot.
  2. Roll back acr_iceberg to the version when first we inserted records, and also roll back the acr_iceberg_report table to the initial version to reflect the customer review update.
  3. Merge the acr_iceberg table with the acr_iceberg_report table again.

Get the metadata of each report table

As a first step, we check table versions by inspecting the table. Run the following cells.

Now you can see the following table versions in acr_iceberg and acr_iceberg_report:

  • acr_iceberg has three versions:
    • The oldest one is the initial version of this table, which shows the append operation
    • The second oldest one is the record insertion, which shows the append operation
    • The latest one is the update, which shows the overwrite operation
  • acr_iceberg_report has two versions:
    • The oldest one is the initial version of this table, which shows the append operation
    • The other one is from the MERGE INTO query in the previous section, which shows the overwrite operation

As shown in the following screenshot, we roll back to the acr_iceberg table version, inserting records based on the customer revert request. We also roll back to the acr_iceberg_report table version in the initial version to discard the MERGE INTO operation in the previous section.

Roll back the acr_iceberg and acr_iceberg_report tables

Based on your snapshot IDs, you can roll back each table version:

  • For acr_iceberg, use the second-oldest snapshot_id (in this example, 5440744662350048750) and replace <Type snapshot_id in ace_iceberg table> in the following cell with this snapshot_id.
  • For acr_iceberg_report table, use the initial snapshot_id (in this example, 7958428388396549892) and replace <Type snaphost_id in ace_iceberg_report table> in the following cell with this snapshot_id.

After you specify the snapshot_id for each rollback query, run the following cells.

When this step is complete, you can see the previous and current snapshot IDs of each table.

Each Iceberg table has been reverted to the specific version now.

Reflect changes in acr_iceberg into acr_iceberg_report again

We reflect the acr_iceberg table reversion into the current acr_iceberg_report table. To complete this, run the following cell.

After you rerun the MERGE INTO query, run the following cell to see the new table records. When we compare the table records, we observe that the avg_star value in Industrial_Supplies is lower than the value of the previous table avg_star.

You were able to reflect a customer’s request of reverting their updated review on the BI report table. Specifically, you can get the updated avg_star record in the Industrial_Supplies product category.

Clean up

To clean up all resources that you created, delete the CloudFormation stack.

Conclusion

In this post, we walked through using the Apache Iceberg connector with AWS Glue ETL jobs. We created an Iceberg table built on Amazon S3, and ran queries such as reading the Iceberg table data, inserting a record, merging two tables, and time travel.

The operations for the Iceberg table that we demonstrated in this post aren’t all of the operations Iceberg supports. Refer to the Apache Iceberg documentation for information about more operations.

Appendix: Spark configurations to use Apache Iceberg on AWS Glue

As we mentioned earlier, the notebook sets up a Spark configuration to integrate Iceberg with AWS Glue. The following table shows what each parameter defines.

Spark configuration key Value Description
spark.sql.catalog.{CATALOG} org.apache.iceberg.spark.SparkCatalog Specifies a Spark catalog interface that communicates with Iceberg tables.
spark.sql.catalog.{CATALOG}.warehouse {WAREHOUSE_PATH} A warehouse path for jobs to write iceberg metadata and actual data.
spark.sql.catalog.{CATALOG}.catalog-impl org.apache.iceberg.aws.
glue.GlueCatalog
The implementation of the Spark catalog class to communicate between Iceberg tables and the AWS Glue Data Catalog.
spark.sql.catalog.{CATALOG}.io-impl org.apache.iceberg.aws.s3.S3FileIO Used for Iceberg to communicate with Amazon S3.
spark.sql.catalog.{CATALOG}.lock-impl org.apache.iceberg.aws.glue.
DynamoLockManager
Used for Iceberg to manage table locks.
spark.sql.catalog.{CATALOG}.lock.table {DYNAMODB_TABLE} A DynamoDB table name to store table locks.
spark.sql.extensions org.apache.icerberg.spark.extensions.
IcebergSparkSessionExtensions
The implementation that enables Spark to run Iceberg-specific SQL commands.
spark.sql.session.timeZone UTC Sets the time zone of the Spark environment to UTC for further Iceberg time travel queries. The epoch time is in the UTC time zone.

About the Author

Tomohiro Tanaka is a Cloud Support Engineer at Amazon Web Services. He builds Glue connectors such as Apache Iceberg connector and TPC-DS connector. He’s passionate about helping customers build data lakes using ETL workloads. In his free time, he also enjoys coffee breaks with his colleagues and making coffee at home.