Build a high-performance, ACID compliant, evolving data lake using Apache Iceberg on Amazon EMR

Post Syndicated from Sekar Srinivasan original https://aws.amazon.com/blogs/big-data/build-a-high-performance-acid-compliant-evolving-data-lake-using-apache-iceberg-on-amazon-emr/

Amazon EMR is a cloud big data platform for running large-scale distributed data processing jobs, interactive SQL queries, and machine learning (ML) applications using open-source analytics frameworks such as Apache Spark, Apache Hive, and Presto.

Apache Iceberg is an open table format for huge analytic datasets. Table formats typically indicate the format and location of individual table files. Iceberg adds functionality on top of that to help manage petabyte-scale datasets as well as newer data lake requirements such as transactions, upsert/merge, time travel, and schema and partition evolution. Iceberg adds tables to compute engines including Spark, Trino, PrestoDB, Flink, and Hive using a high-performance table format that works just like a SQL table.

Amazon EMR release 6.5.0 and later includes Apache Iceberg so you can reliably work with huge tables with full support for ACID (Atomic, Consistent, Isolated, Durable) transactions in a highly concurrent and performant manner without getting locked into a single file format.

In this post, we discuss the modern data lake requirements and the challenges—including support for ACID transactions and concurrent writers, partition and schema evolution—that come with these. We also discuss how Iceberg solves these challenges. Additionally, we provide a step-by-step guide on how to get started with an Iceberg notebook in Amazon EMR Studio. You can access this sample notebook from the GitHub repo. You can also find this notebook in your EMR Studio workspace under Notebook Examples.

Modern data lake challenges

Amazon EMR integrates with Amazon Simple Storage Service (Amazon S3) natively for persistent data storage, and allows you to independently scale your data in Amazon S3 and compute on your EMR cluster. This enables you to bring in data from multiple sources (for example, transactional data from operational databases, social media feeds, and SaaS data sources) using different tools, and each data source has its own transient EMR cluster to perform transformation and ingestion in parallel. You can now keep one central copy of your data and share it with multiple user groups that run analytics and even make in-place updates on a data lake. We’re increasingly seeing the following requirements (and challenges) emerge as mainstream:

  • Consistent reads and writes across multiple concurrent users – There are two primary concerns:
    • Reader-writer isolation – When a job is updating a huge dataset, another job accessing the same data frequently works on a partially updated dataset, leaving the data in an inconsistent state.
    • Concurrent writes on the same dataset – Table formats relying on coarse-grained locks slow down the system. This limitation is even more telling in real-time streaming workloads.
  • Consistent table updates across multiple files or partitions – With Hive tables, writing to multiple partitions at once isn’t an atomic operation. If you’re overwriting a partition, for instance, you might delete several files across partitions without having any guarantees that you will replace them, potentially resulting in data loss. For huge tables, it’s not practical to use global locks and keep the readers and writers waiting. Common workarounds (such as rewriting all the data in all the partitions that need to be changed at the same time and then pointing to the new locations) cause huge data duplication and redundant extract, transform, and load (ETL) jobs.
  • Continuous schema evolution – Simple DDL commands often render the data unusable. For instance, say a data engineer renames a column and writes some data. The consuming analytics tool now can’t read it because the metastore can’t track former names for columns. That rename operation has effectively dropped a column and added a new column. Now there is data written in both schemas. Historically, schema changes required expensive backfills and redundant ETL processes.
  • Different query patterns on the same data – If you change the partitioning to optimize your query after a year, say from daily to hourly, you have to rewrite the table with the new hour column as the partition. In addition, you have to rewrite queries to use the new partition column in your table.
  • ACID transactions, streaming upserts, file size optimization, and data snapshots – Existing tools that support these features lock you into specific file formats, complicating interoperability across the analytics ecosystem.
  • Support for mixed file formats – With existing solutions, if you rename a column in one file format (say Parquet, ORC, or Avro), you get a different behavior than if you rename a column in a different file format. There is inconsistency in data types supported by different file formats. These limitations necessitate additional ETL steps.

The problem

When multiple users share the same data, varied requirements ensue. The data platform needs to be transactional to handle concurrent upserts and reads.

Table formats such as Hive track a list of partitions inside the table within a data catalog. However, the underlying files are still not tracked transactionally, because we’re relying on an immutable object storage that is just not designed to be transactional. After the specific partitions to be updated or inserted have been identified, we still need to list all the files in those partitions at the leaf level of the partition hierarchy before we can filter out which of those files are relevant. For huge analytic datasets with thousands of files in each partition, listing all those files each time you run a query slows it down considerably. Furthermore, doing atomic commits—getting thousands of files in the table live in exactly the same moment—becomes impractical.

Apache Iceberg on Amazon EMR

Iceberg development was started by Netflix in December 2017 and was donated to the Apache software foundation in November 2018 as an incubator project. In May 2020, it graduated from the incubator.

Iceberg on Amazon EMR comes completely integrated and tested for running in production backed by Enterprise Support. This means you get 24/7 technical support from Amazon EMR experts, tools and technology to automatically manage the health of your environment, and consultative architectural, performance, and troubleshooting guidance on Iceberg issues.

Iceberg has integrations with other AWS services. For example, you can use the AWS Glue Data Catalog as the metastore for Iceberg tables. Iceberg also supports other catalog types such as Hive, Hadoop, Amazon DynamoDB, Amazon Relational Database Service (Amazon RDS), and other custom implementations. When using AWS Glue as the data catalog, the AWS Glue database serves as your Iceberg namespace. Similarly, the AWS Glue table and AWS Glue TableVersion serve as the Iceberg table and table version, respectively. Your AWS Glue Data Catalog could be in the same or different account or even a different Region, making multi-account, multi-Region pipelines easily deployable. Amazon Athena supports read, time travel, write, and DDL queries for Apache Iceberg tables that use the Apache Parquet format for data and the AWS Glue Data Catalog for their metastore.

How Iceberg addresses these challenges

Iceberg tracks individual data files in a table instead of simply maintaining a pointer to high-level table or partition locations. This allows writers to create data files in-place and only adds files to the table in an explicit commit. Every time new datasets are ingested into this table, a new point-in-time snapshot gets created. At query time, there is no need to list a directory to find the files we need to work with, because the snapshot already has that information pre-populated during the write time. Because of this design, Iceberg solves the problems listed earlier in the following ways:

  • Consistent reads and writes across multiple concurrent users – Iceberg relies on optimistic concurrency to support concurrent reads and writes from multiple user groups. If two operations are running at the same time, only one of them will be successful. The other job will retry, but that retry will be implicit to the user and that will be done at the metadata level. If Iceberg detects that the second update is not in conflict, it will commit it successfully.
  • Consistent table updates across multiple partitions – In Iceberg, the partition of a file isn’t determined by the physical location of the files within directories or prefixes. Instead, Iceberg stores partition information within manifests of the data files. Therefore, updates across multiple partitions entail a simple, atomic metadata change.
  • Continuous schema evolution – Iceberg tracks columns by using unique IDs and not by the column name, which enables easy schema evolution. You can safely add, drop, rename, or even reorder columns. You can also update column data types if the update is safe (such as widening from INT to BIGINT or float to double)
  • Different query patterns on the same data – Iceberg keeps track of the relationship between partitioning values and the column that they came from. Logical data is decoupled from physical layout, which enables easy partition evolution as well. Partition values can be implicitly derived using a transform such as day(timestamp) or hour(timestamp) of an existing column.
  • ACID transactions, streaming upserts, file size optimization, and data snapshots – Iceberg supports ACID transactions with serializable isolation. Furthermore, Iceberg supports deletes, upserts, change data capture (CDC), time travel (getting the state of the data from a past time regardless of the current state of the data), and compaction (consolidating small files into larger files to reduce metadata overhead and improve query speed). Table changes are atomic, and readers never see partial or uncommitted changes.
  • Support for mixed file formats – Because schema fields are tracked by unique IDs independent of the underlying file format, you can have consistent queries across file formats such as Avro, Parquet, and ORC.

Using Apache Iceberg with Amazon EMR

In this post, we demonstrate creating an Amazon EMR cluster that supports Iceberg using the AWS Command Line Interface (AWS CLI). You can also create the cluster from the Amazon EMR console. We use Amazon EMR Studio to run notebook code on our EMR cluster. To set up an EMR Studio, refer to Set up an EMR Studio. First, we note down the subnets that we specified when we created our EMR Studio. Now we launch our EMR cluster using the AWS CLI:

aws emr create-cluster \
--name iceberg-emr-cluster \
--use-default-roles \
--release-label emr-6.6.0 \
--instance-count 1 \
--instance-type r5.4xlarge \
--applications Name=Hadoop Name=Livy Name=Spark Name=JupyterEnterpriseGateway \
--ec2-attributes SubnetId=<EMR-STUDIO-SUBNET>\
--configurations '[{"Classification":"iceberg-
defaults","Properties":{"iceberg.enabled":"true"}},{"Classification":"spark-hive-
site","Properties":{"hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.met
astore.AWSGlueDataCatalogHiveClientFactory"}}]'

We choose emr-6.6.0 as the release label. This release comes with Iceberg version 0.13.1 pre-installed. We launch a single-node EMR cluster with the instance type R5.4xlarge and with the following applications installed: Hadoop, Spark, Livy, and Jupyter Enterprise Gateway. Make sure that you replace <EMR-STUDIO-SUBNET> with a subnet ID from the list of EMR Studio’s subnets you noted earlier. We need to enable Iceberg and the AWS Glue Data Catalog on our cluster. To do this, we use the following configuration classifications:

[
  {
    "Classification": "iceberg-defaults ",
    "Properties": {
      "iceberg.enabled":"true"
    }
  },
  {
    "Classification": "spark-hive-site ",
    "Properties": {
      "hive.metastore.client.factory.class":        
         "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
    }
  }
]

Initial setup

Let’s first create an S3 bucket location in the same Region as the EMR cluster to save a sample dataset that we’re going to create and work with. In this post, we use the placeholder bucket name YOUR-BUCKET-NAME. Remember to replace this with a globally unique bucket name when testing this out in your environment. From our EMR Studio workspace, we attach our cluster and use the PySpark kernel.

You can upload the sample notebook from the GitHub repo or use the Iceberg example under Notebook Examples in your own EMR Studio workspace and run the cells following the instructions in the notebook.

Configure a Spark session

In this command, we set our AWS Glue Data Catalog name as glue_catalog1. You can replace it with a different name. But if you do so, remember to change the Data Catalog name throughout this example, because we use the fully qualified table name including the Data Catalog name in all of our commands going forward. In the following command, remember to replace YOUR-BUCKET-NAME with your own bucket name:

%%configure -f
{
    "conf":  {
             "spark.sql.catalog.glue_catalog1": "org.apache.iceberg.spark.SparkCatalog",
             "spark.sql.catalog.glue_catalog1.warehouse": 
                   "s3://YOUR-BUCKET-NAME/iceberg/glue_catalog1/tables/",
             "spark.sql.catalog.glue_catalog1.catalog-impl":    "org.apache.iceberg.aws.glue.GlueCatalog",
             "spark.sql.catalog.glue_catalog1.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
             "spark.sql.catalog.glue_catalog1.lock-impl": "org.apache.iceberg.aws.glue.DynamoLockManager",
             "spark.sql.catalog.glue_catalog1.lock.table": "myGlueLockTable",
  "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
           } 
}

Let’s assume that the name of your catalog is glue_catalog1. The preceding code has the following components:

  • glue_catalog1.warehouse points to the Amazon S3 path where you want to store your data and metadata.
  • To make the catalog an AWS Glue Data Catalog, set glue_catalog1.catalog-impl to org.apache.iceberg.aws.glue.GlueCatalog. This key is required to point to an implementation class for any custom catalog implementation.
  • Use org.apache.iceberg.aws.s3.S3FileIO as the glue_catalog1.io-impl in order to take advantage of Amazon S3 multipart upload for high parallelism.
  • We use an Amazon DynamoDB table for lock implementation. This is optional, and is recommended for high concurrency workloads. To do that, we set lock-impl for our catalog to org.apache.iceberg.aws.glue.DynamoLockManager and we set lock.table to myGlueLockTable as the table name so that for every commit, the Data Catalog first obtains a lock using this table and then tries to safely modify the AWS Glue table. If you choose this option, the table gets created in your own account. Note that you need to have the necessary access permissions to create and use a DynamoDB table. Furthermore, additional DynamoDB charges apply.

Now that you’re all set with your EMR cluster for compute, S3 bucket for data, and AWS Glue Data Catalog for metadata, you can start creating a table and running the DML statements.

For all commands going forward, we use the %%sql cell magic to run Spark SQL commands in our EMR Studio notebook. However, for brevity, we don’t show the cell magic command. But you may need to use that in your Studio notebook for the SQL commands to work.

Create an Iceberg table in the AWS Glue Data Catalog

The default catalog is the AwsDataCatalog. Let’s switch to our AWS Glue catalog glue_catalog1, which has support for Iceberg tables. There are no namespaces as yet. A namespace in Iceberg is the same thing as a database in AWS Glue.

%%sql
use glue_catalog1

Let’s create a table called orders. The DDL syntax looks the same as creating a Hive table, for example, except that we include USING iceberg:

CREATE TABLE glue_catalog1.salesdb.orders
    (
      order_id              int,
      product_name          string,
      product_category      string,
      qty                   int,
      unit_price            decimal(7,2),
      order_datetime        timestamp
    )
USING iceberg
PARTITIONED BY (days(order_datetime))

Note that we’re also partitioning this table by extracting the day out of the order_datetime column. We don’t have to create a separate column for the partition.

DML statements

We then insert records to our table. Here is an example:

INSERT INTO glue_catalog1.salesdb.orders VALUES 
    (
        1, 
        'Harry Potter and the Prisoner of Azkaban',
        'Books',
        2,
        7.99,
        current_timestamp()
    )

DML statements result in snapshots getting created. Note the snapshot_id and the timestamp column called committed_at:

SELECT * FROM glue_catalog1.salesdb.orders.snapshots;

We now insert four more records and then query the orders table and confirm that the five records are present:

SELECT * FROM glue_catalog1.salesdb.orders

Querying from Athena

Because Iceberg on Amazon EMR comes pre-integrated with the AWS Glue Data Catalog, we can now query the Iceberg tables from AWS analytics services that support Iceberg. Let’s query the salesdb/orders table from Athena as shown in the following screenshot.

Upserts

The notebook then gives examples for updates and deletes, and even upserts. We use the MERGE INTO statement for upserts, which uses the source table orders_update with new and updated records:

MERGE INTO glue_catalog1.salesdb.orders target 
USING glue_catalog1.salesdb.orders_update source          
ON target.order_id = source.order_id              
WHEN MATCHED THEN 
    UPDATE SET
        order_id = source.order_id,
        product_name = source.product_name,
        product_category = source.product_category,
        qty = source.qty,
        unit_price = source.unit_price,
        order_datetime = source.order_datetime
WHEN NOT MATCHED THEN
    INSERT *
select * from glue_catalog1.salesdb.orders;

Schema evolution

We then walk through schema evolution using simple ALTER TABLE commands to add, rename, and drop columns. The following example how simple it is to rename a column:

ALTER TABLE glue_catalog1.salesdb.orders RENAME COLUMN qty TO quantity
DESC table glue_catalog1.salesdb.orders

Time travel

Iceberg also allows us to travel backward or forward by storing point-in-time snapshots. We can travel using timestamps when the snapshots were created or directly using the snapshot_id. The following is an example of a CALL statement that uses rollback_to_snapshot:

%%sql
CALL glue_catalog1.system.rollback_to_snapshot('salesdb.orders', 8008410363488501197)

We then travel forward in time by calling set_current_snapshot:

%%sql
CALL glue_catalog1.system.set_current_snapshot('salesdb.orders', 8392090950225782953)

Partition evolution

The notebook ends with an example that shows how partition evolution works in Iceberg. Iceberg stores partition information as part of the metadata. Because there is no separate partition column in the data itself, changing the partitioning scheme to hourly partitions for example is just a matter of calling a different partition transform hours(…) on an existing column order_datetime as shown in the following example:

%%sql
ALTER TABLE glue_catalog1.salesdb.orders ADD PARTITION FIELD hours(order_datetime)

You can continue to use the old partition on the old data. New data is written using the new spec in a new layout. Metadata for each of the partition versions is kept separately.

The notebook shows how you can query the table using the new hourly partition:

%%sql
SELECT * FROM glue_catalog1.salesdb.orders where hour(order_datetime)=1

You can continue to query your old data using the day() transform. There is only the original order_datetime column in the table.

%%sql
SELECT * FROM glue_catalog1.salesdb.orders where day(order_datetime)>=14

You don’t have to store additional columns to accommodate multiple partitioning schemes. The partition definitions are in the metadata, providing the flexibility to evolve and change the partition definitions in the future.

Conclusion

In this post, we introduced Apache Iceberg and explained how Iceberg solves some challenges in modern data lakes. We then walked you through how to run Iceberg on Amazon EMR using the AWS Glue Data Catalog as the metastore, and query the data using Athena. You can also run upserts on this data from Athena. There is no additional cost to using Iceberg with Amazon EMR.

For more information about Iceberg, refer to How Iceberg works. Iceberg on Amazon EMR, with its integration with AWS Analytics services, can simplify the way you process, upsert, and delete data, with full support for ACID transactions in Amazon S3. You can also implement schema evolution, partition evolution, time travel, and compaction of data.


About the Author

Sekar Srinivasan is a Sr. Specialist Solutions Architect at AWS focused on Big Data and Analytics. Sekar has over 20 years of experience working with data. He is passionate about helping customers build scalable solutions modernizing their architecture and generating insights from their data. In his spare time he likes to work on non-profit projects, especially those focused on underprivileged Children’s education.