All posts by Amir Shenavandeh

Get a quick start with Apache Hudi, Apache Iceberg, and Delta Lake with Amazon EMR on EKS

Post Syndicated from Amir Shenavandeh original https://aws.amazon.com/blogs/big-data/get-a-quick-start-with-apache-hudi-apache-iceberg-and-delta-lake-with-amazon-emr-on-eks/

A data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale. You can keep your data as is in your object store or file-based storage without having to first structure the data. Additionally, you can run different types of analytics against your loosely formatted data lake—from dashboards and visualizations to big data processing, real-time analytics, and machine learning (ML) to guide better decisions. Due to the flexibility and cost effectiveness that a data lake offers, it’s very popular with customers who are looking to implement data analytics and AI/ML use cases.

Due to the immutable nature of the underlying storage in the cloud, one of the challenges in data processing is updating or deleting a subset of identified records from a data lake. Another challenge is making concurrent changes to the data lake. Implementing these tasks is time consuming and costly.

In this post, we explore three open-source transactional file formats: Apache Hudi, Apache Iceberg, and Delta Lake to help us to overcome these data lake challenges. We focus on how to get started with these data storage frameworks via real-world use case. As an example, we demonstrate how to handle incremental data change in a data lake by implementing a Slowly Changing Dimension Type 2 solution (SCD2) with Hudi, Iceberg, and Delta Lake, then deploy the applications with Amazon EMR on EKS.

ACID challenge in data lakes

In analytics, the data lake plays an important role as an immutable and agile data storage layer. Unlike traditional data warehouses or data mart implementations, we make no assumptions on the data schema in a data lake and can define whatever schemas required by our use cases. It’s up to the downstream consumption layer to make sense of that data for their own purposes.

One of the most common challenges is supporting ACID (Atomicity, Consistency, Isolation, Durability) transactions in a data lake. For example, how do we run queries that return consistent and up-to-date results while new data is continuously being ingested or existing data is being modified?

Let’s try to understand the data problem with a real-world scenario. Assume we centralize customer contact datasets from multiple sources to an Amazon Simple Storage Service (Amazon S3)-backed data lake, and we want to keep all the historical records for analysis and reporting. We face the following challenges:

  • We keep creating append-only files in Amazon S3 to track the contact data changes (insert, update, delete) in near-real time.
  • Consistency and atomicity aren’t guaranteed because we just dump data files from multiple sources without knowing whether the entire operation is successful or not.
  • We don’t have an isolation guarantee whenever multiple workloads are simultaneously reading and writing to the same target contact table.
  • We track every single activity at source, including duplicates caused by the retry mechanism and accidental data changes that are then reverted. This leads to the creation of a large volume of append-only files. The performance of extract, transform, and load (ETL) jobs decreases as all the data files are read each time.
  • We have to shorten the file retention period to reduce the data scan and read performance.

In this post, we walk through a simple SCD2 ETL example designed for solving the ACID transaction problem with the help of Hudi, Iceberg, and Delta Lake. We also show how to deploy the ACID solution with EMR on EKS and query the results by Amazon Athena.

Custom library dependencies with EMR on EKS

By default, Hudi and Iceberg are supported by Amazon EMR as out-of-the-box features. For this demonstration, we use EMR on EKS release 6.8.0, which contains Apache Iceberg 0.14.0-amzn-0 and Apache Hudi 0.11.1-amzn-0. To find out the latest and past versions that Amazon EMR supports, check out the Hudi release history and the Iceberg release history tables. The runtime binary files of these frameworks can be found in the Spark’s class path location within each EMR on EKS image. See Amazon EMR on EKS release versions for the list of supported versions and applications.

As of this writing, Amazon EMR does not include Delta Lake by default. There are two ways to make it available in EMR on EKS:

  • At the application level – You install Delta libraries by setting a Spark configuration spark.jars or --jars command-line argument in your submission script. The JAR files will be downloaded and distributed to each Spark Executor and Driver pod when starting a job.
  • At the Docker container level – You can customize an EMR on EKS image by packaging Delta dependencies into a single Docker container that promotes portability and simplifies dependency management for each workload

Other custom library dependencies can be managed the same way as for Delta Lake—passing a comma-separated list of JAR files in the Spark configuration at job submission, or packaging all the required libraries into a Docker image.

Solution overview

The solution provides two sample CSV files as the data source: initial_contact.csv and update_contacts.csv. They were generated by a Python script with the Faker package. For more details, check out the tutorial on GitHub.

The following diagram describes a high-level architecture of the solution and different services being used.

The workflow steps are as follows:

  1. Ingest the first CSV file from a source S3 bucket. The data is being processed by running a Spark ETL job with EMR on EKS. The application contains either the Hudi, Iceberg, or Delta framework.
  2. Store the initial table in Hudi, Iceberg, or Delta file format in a target S3 bucket (curated). We use the AWS Glue Data Catalog as the hive metastore. Optionally, you can configure Amazon DynamoDB as a lock manager for the concurrency controls.
  3. Ingest a second CSV file that contains new records and some changes to the existing ones.
  4. Perform SCD2 via Hudi, Iceberg, or Delta in the Spark ETL job.
  5. Query the Hudi, Iceberg, or Delta table stored on the target S3 bucket in Athena

To simplify the demo, we have accommodated steps 1–4 into a single Spark application.

Prerequisites

Install the following tools:

curl -fsSL -o get_helm.sh \
https://raw.githubusercontent.com/helm/helm/master/scripts/get-helm-3

chmod 700 get_helm.sh
export DESIRED_VERSION=v3.8.2
./get_helm.sh 
helm version

For a quick start, you can use AWS CloudShell which includes the AWS CLI and kubectl already.

Clone the project

Download the sample project either to your computer or the CloudShell console:

git clone https://github.com/aws-samples/emr-on-eks-hudi-iceberg-delta
cd emr-on-eks-hudi-iceberg-delta

Set up the environment

Run the following blog_provision.sh script to set up a test environment. The infrastructure deployment includes the following resources:

  • A new S3 bucket to store sample data and job code.
  • An Amazon Elastic Kubernetes Service (Amazon EKS) cluster (version 1.21) in a new VPC across two Availability Zones.
  • An EMR virtual cluster in the same VPC, registered to the emr namespace in Amazon EKS.
  • An AWS Identity and Access Management (IAM) job execution role contains DynamoDB access, because we use DynamoDB to provide concurrency controls that ensure atomic transaction with the Hudi and Iceberg tables.
export AWS_REGION=us-east-1
export EKSCLUSTER_NAME=eks-quickstart
./blog_provision.sh
# Upload sample contact data to S3
export ACCOUNTID=$(aws sts get-caller-identity --query Account --output text)
aws s3 sync data s3://emr-on-eks-quickstart-${ACCOUNTID}-${AWS_REGION}/blog/data

Job execution role

The provisioning includes an IAM job execution role called emr-on-eks-quickstart-execution-role that allows your EMR on EKS jobs access to the required AWS services. It contains AWS Glue permissions because we use the Data Catalog as our metastore.

See the following code:

 {
    "Effect": "Allow",
    "Action": ["glue:Get*","glue:BatchCreatePartition","glue:UpdateTable","glue:CreateTable"],
    "Resource": [
        "arn:aws:glue:${AWS_REGION}:${ACCOUNTID}:catalog",
        "arn:aws:glue:${AWS_REGION}:${ACCOUNTID}:database/*",
        "arn:aws:glue:${AWS_REGION}:${ACCOUNTID}:table/*"
    ]
}

Additionally, the role contains DynamoDB permissions, because we use the service as the lock manager. It provides concurrency controls that ensure atomic transaction with our Hudi and Iceberg tables. If a DynamoDB table with the given name doesn’t exist, a new table is created with the billing mode set as pay-per-request. More details can be found in the following framework examples.

{
    "Sid": "DDBLockManager",
    "Effect": "Allow",
    "Action": [
        "dynamodb:DescribeTable",
        "dynamodb:CreateTable",
        "dynamodb:Query",
        "dynamodb:Scan",
        "dynamodb:PutItem",
        "dynamodb:UpdateItem",
        "dynamodb:DeleteItem",
        "dynamodb:BatchWriteItem",
        "dynamodb:GetItem",
        "dynamodb:BatchGetItem"
    ],
    "Resource": [
       "arn:aws:dynamodb:${AWS_REGION}:${ACCOUNTID}:table/myIcebergLockTable",
        "arn:aws:dynamodb:${AWS_REGION}:${ACCOUNTID}:table/myIcebergLockTable/index/*",
        "arn:aws:dynamodb:${AWS_REGION}:${ACCOUNTID}:table/myHudiLockTable"
        "arn:aws:dynamodb:${AWS_REGION}:${ACCOUNTID}:table/myHudiLockTable/index/*",
    ]
}

Example 1: Run Apache Hudi with EMR on EKS

The following steps provide a quick start for you to implement SCD Type 2 data processing with the Hudi framework. To learn more, refer to Build Slowly Changing Dimensions Type 2 (SCD2) with Apache Spark and Apache Hudi on Amazon EMR.

The following code snippet demonstrates the SCD type2 implementation logic. It creates Hudi tables in a default database in the Glue Data Catalog. The full version is in the script hudi_scd_script.py.

# Read incremental contact CSV file with extra SCD columns
delta_csv_df = spark.read.schema(contact_schema).format("csv")\
.load(f"s3://{S3_BUCKET_NAME}/.../update_contacts.csv")\
.withColumn("ts", lit(current_timestamp()).cast(TimestampType()))\
.withColumn("valid_from", lit(current_timestamp()).cast(TimestampType()))\
.withColumn("valid_to", lit("").cast(TimestampType()))\
.withColumn("checksum",md5(concat(col("name"),col("email"),col("state"))))\
.withColumn('iscurrent', lit(1).cast("int"))

## Find existing records to be expired
join_cond = [initial_hudi_df.checksum != delta_csv_df.checksum,
             initial_hudi_df.id == delta_csv_df.id,
             initial_hudi_df.iscurrent == 1]
contact_to_update_df = (initial_hudi_df.join(delta_csv_df, join_cond)
                      .select(initial_hudi_df.id,
                                ....
                              initial_hudi_df.valid_from,
                              delta_csv_df.valid_from.alias('valid_to'),
                              initial_hudi_df.checksum
                              )
                      .withColumn('iscurrent', lit(0).cast("int"))
                      )
                      
merged_contact_df = delta_csv_df.unionByName(contact_to_update_df)

# Upsert
merged_contact_df.write.format('org.apache.hudi')\
                    .option('hoodie.datasource.write.operation', 'upsert')\
                    .options(**hudiOptions) \
                    .mode('append')\
                    .save(TABLE_LOCATION)

In the job script, the hudiOptions were set to use the AWS Glue Data Catalog and enable the DynamoDB-based Optimistic Concurrency Control (OCC). For more information about concurrency control and alternatives for lock providers, refer to Concurrency Control.

hudiOptions = {
    ....
    # sync to Glue catalog
    "hoodie.datasource.hive_sync.mode":"hms",
    ....
    # DynamoDB based locking mechanisms
    "hoodie.write.concurrency.mode":"optimistic_concurrency_control", #default is SINGLE_WRITER
    "hoodie.cleaner.policy.failed.writes":"LAZY", #Hudi will delete any files written by failed writes to re-claim space
    "hoodie.write.lock.provider":"org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider",
    "hoodie.write.lock.dynamodb.table":"myHudiLockTable",
    "hoodie.write.lock.dynamodb.partition_key":"tablename",
    "hoodie.write.lock.dynamodb.region": REGION,
    "hoodie.write.lock.dynamodb.endpoint_url": f"dynamodb.{REGION}.amazonaws.com"
}
  1. Upload the job scripts to Amazon S3:
    export AWS_REGION=us-east-1
    export ACCOUNTID=$(aws sts get-caller-identity --query Account --output text)
    aws s3 sync hudi/ s3://emr-on-eks-quickstart-${ACCOUNTID}-${AWS_REGION}/blog/

  2. Submit Hudi jobs with EMR on EKS to create SCD2 tables:
    export EMRCLUSTER_NAME=emr-on-eks-quickstart
    export AWS_REGION=us-east-1
    
    ./hudi/hudi_submit_cow.sh
    ./hudi/hudi_submit_mor.sh

    Hudi supports two tables types: Copy on Write (CoW) and Merge on Read (MoR). The following is the code snippet to create a CoW table. For the complete job scripts for each table type, refer to hudi_submit_cow.sh and hudi_submit_mor.sh.

    aws emr-containers start-job-run \
      --virtual-cluster-id $VIRTUAL_CLUSTER_ID \
      --name em68-hudi-cow \
      --execution-role-arn $EMR_ROLE_ARN \
      --release-label emr-6.8.0-latest \
      --job-driver '{
      "sparkSubmitJobDriver": {
          "entryPoint": "s3://'$S3BUCKET'/blog/hudi_scd_script.py",
          "entryPointArguments":["'$AWS_REGION'","'$S3BUCKET'","COW"],
          "sparkSubmitParameters": "--jars local:///usr/lib/hudi/hudi-spark-bundle.jar,local:///usr/lib/spark/external/lib/spark-avro.jar --conf spark.executor.memory=2G --conf spark.executor.cores=2"}}' \
      --configuration-overrides '{
        "applicationConfiguration": [
          {
            "classification": "spark-defaults", 
            "properties": {
              "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
              "spark.hadoop.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
             }}
        ]}'

  3. Check the job status on the EMR virtual cluster console.
  4. Query the output in Athena:

    select * from hudi_contact_cow where id=103

    select * from hudi_contact_mor_rt where id=103

Example 2: Run Apache Iceberg with EMR on EKS

Starting with Amazon EMR version 6.6.0, you can use Apache Spark 3 on EMR on EKS with the Iceberg table format. For more information on how Iceberg works in an immutable data lake, see Build a high-performance, ACID compliant, evolving data lake using Apache Iceberg on Amazon EMR.

The sample job creates an Iceberg table iceberg_contact in the default database of AWS Glue. The full version is in the iceberg_scd_script.py script. The following code snippet shows the SCD2 type of MERGE operation:

# Read incremental CSV file with extra SCD2 columns
spark.read.schema(contact_schema)\
.format("csv").options(header=False,delimiter=",")\
.load(f"s3://{S3_BUCKET_NAME}/blog/data/update_contacts.csv")\
.withColumn(……)\
.createOrReplaceTempView('iceberg_contact_update')

# Update existing records which are changed in the update file
contact_update_qry = """
    WITH contact_to_update AS (
          SELECT target.*
          FROM glue_catalog.default.iceberg_contact AS target
          JOIN iceberg_contact_update AS source 
          ON target.id = source.id
          WHERE target.checksum != source.checksum
            AND target.iscurrent = 1
        UNION
          SELECT * FROM iceberg_contact_update
    ),contact_updated AS (
        SELECT *, LEAD(valid_from) OVER (PARTITION BY id ORDER BY valid_from) AS eff_from
        FROM contact_to_update
    )
    SELECT id,name,email,state,ts,valid_from,
        CAST(COALESCE(eff_from, null) AS Timestamp) AS valid_to,
        CASE WHEN eff_from IS NULL THEN 1 ELSE 0 END AS iscurrent,
        checksum
    FROM contact_updated
"""
# Upsert
spark.sql(f"""
    MERGE INTO glue_catalog.default.iceberg_contact tgt
    USING ({contact_update_qry}) src
    ON tgt.id = src.id
    AND tgt.checksum = src.checksum
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
""")

As demonstrated earlier when discussing the job execution role, the role emr-on-eks-quickstart-execution-role granted access to the required DynamoDB table myIcebergLockTable, because the table is used to obtain locks on Iceberg tables, in case of multiple concurrent write operations against a single table. For more information on Iceberg’s lock manager, refer to DynamoDB Lock Manager.

  1. Upload the application scripts to the example S3 bucket:
    export AWS_REGION=us-east-1
    export ACCOUNTID=$(aws sts get-caller-identity --query Account --output text)
    aws s3 sync iceberg/ s3://emr-on-eks-quickstart-${ACCOUNTID}-${AWS_REGION}/blog/

  2. Submit the job with EMR on EKS to create an SCD2 Iceberg table:
    export EMRCLUSTER_NAME=emr-on-eks-quickstart
    export AWS_REGION=us-east-1
    
    ./iceberg/iceberg_submit.sh

    The full version code is in the iceberg_submit.sh script. The code snippet is as follows:

    aws emr-containers start-job-run \
    --virtual-cluster-id $VIRTUAL_CLUSTER_ID \
    --name em68-iceberg \
    --execution-role-arn $EMR_ROLE_ARN \
    --release-label emr-6.8.0-latest \
    --job-driver '{
        "sparkSubmitJobDriver": {
        "entryPoint": "s3://'$S3BUCKET'/blog/iceberg_scd_script.py",
        "entryPointArguments":["'$S3BUCKET'"],
        "sparkSubmitParameters": "--jars local:///usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar --conf spark.executor.memory=2G --conf spark.executor.cores=2"}}' \
    --configuration-overrides '{
        "applicationConfiguration": [
        {
        "classification": "spark-defaults",
        "properties": {
        "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
        "spark.sql.catalog.glue_catalog": "org.apache.iceberg.spark.SparkCatalog",
        "spark.sql.catalog.glue_catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
        "spark.sql.catalog.glue_catalog.warehouse": "s3://'$S3BUCKET'/iceberg/",
        "spark.sql.catalog.glue_catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
        "spark.sql.catalog.glue_catalog.lock-impl": "org.apache.iceberg.aws.glue.DynamoLockManager",
        "spark.sql.catalog.glue_catalog.lock.table": "myIcebergLockTable"
        }}
    ]}'

  3. Check the job status on the EMR on EKS console.
  4. When the job is complete, query the table in Athena:
    select * from iceberg_contact where id=103

Example 3: Run open-source Delta Lake with EMR on EKS

Delta Lake 2.1.x is compatible with Apache Spark 3.3.x. Check out the compatibility list for other versions of Delta Lake and Spark. In this post, we use Amazon EMR release 6.8 (Spark 3.3.0) to demonstrate the SCD2 implementation in a data lake.

The following is the Delta code snippet to load initial dataset; the incremental load MERGE logic is highly similar to the Iceberg example. As a one-off task, there should be two tables set up on the same data:

  • The Delta table delta_table_contact – Defined on the TABLE_LOCATION at ‘s3://{S3_BUCKET_NAME}/delta/delta_contact’. The MERGE/UPSERT operation must be implemented on the Delta destination table. Athena can’t query this table directly, instead it reads from a manifest file stored in the same location, which is a text file containing a list of data files to read for querying a table. It is described as an Athena table below.
  • The Athena table delta_contact – Defined on the manifest location s3://{S3_BUCKET_NAME}/delta/delta_contact/_symlink_format_manifest/. All read operations from Athena must use this table.

The full version code is in the delta_scd_script.py  script. The code snippet is as follows:

# Read initial contact CSV file and create a Delta table with extra SCD2 columns
df_intial_csv = spark.read.schema(contact_schema)\
 .format("csv")\
 .options(header=False,delimiter=",")\
 .load(f"s3://{S3_BUCKET_NAME}/.../initial_contacts.csv")\
 .withColumn(.........)\
 .write.format("delta")\
 .mode("overwrite")\
 .save(TABLE_LOCATION)

spark.sql(f"""CREATE TABLE IF NOT EXISTS delta_table_contact USING DELTA LOCATION '{TABLE_LOCATION}'""")
spark.sql("GENERATE symlink_format_manifest FOR TABLE delta_table_contact")
spark.sql("ALTER TABLE delta_table_contact SET TBLPROPERTIES(delta.compatibility.symlinkFormatManifest.enabled=true)")

# Create a queriable table in Athena
spark.sql(f"""
    CREATE EXTERNAL TABLE IF NOT EXISTS default.delta_contact (
     ....
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '{TABLE_LOCATION}/_symlink_format_manifest/'""")

The SQL statement GENERATE symlink_format_manifest FOR TABLE ... is a required step to set up the Athena for Delta Lake. Whenever the data in a Delta table is updated, you must regenerate the manifests. Therefore, we use ALTER TABLE .... SET TBLPROPERTIES(delta.compatibility.symlinkFormatManifest.enabled=true) to automate the manifest refresh as a one-off setup.

  1. Upload the Delta sample scripts to the S3 bucket:
    export AWS_REGION=us-east-1
    export ACCOUNTID=$(aws sts get-caller-identity --query Account --output text)
    aws s3 sync delta/ s3://emr-on-eks-quickstart-${ACCOUNTID}-${AWS_REGION}/blog/

  2. Submit the job with EMR on EKS:
    export EMRCLUSTER_NAME=emr-on-eks-quickstart
    export AWS_REGION=us-east-1
    ./delta/delta_submit.sh

    The full version code is in the delta_submit.sh script. The open-source Delta JAR files must be included in the spark.jars. Alternatively, follow the instructions in How to customize Docker images and build a custom EMR on EKS image to accommodate the Delta dependencies.

    "spark.jars": "https://repo1.maven.org/maven2/io/delta/delta-core_2.12/2.0.0/delta-core_2.12-2.0.0.jar,https://repo1.maven.org/maven2/io/delta/delta-storage/2.0.0/delta-storage-2.0.0.jar"

    The code snippet is as follows:

    aws emr-containers start-job-run \
    --virtual-cluster-id $VIRTUAL_CLUSTER_ID \
    --name em68-delta \
    --execution-role-arn $EMR_ROLE_ARN \
    --release-label emr-6.8.0-latest \
    --job-driver '{
       "sparkSubmitJobDriver": {
       "entryPoint": "s3://'$S3BUCKET'/blog/delta_scd_script.py",
       "entryPointArguments":["'$S3BUCKET'"],
       "sparkSubmitParameters": "--conf spark.executor.memory=2G --conf spark.executor.cores=2"}}' \
    --configuration-overrides '{
       "applicationConfiguration": [
       {
        "classification": "spark-defaults",
        "properties": {
        "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
    "spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog",
    "spark.serializer":"org.apache.spark.serializer.KryoSerializer",
    “spark.hadoop.hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
    "spark.jars": "https://repo1.maven.org/maven2/io/delta/delta-core_2.12/2.1.0/delta-core_2.12-2.1.0.jar,https://repo1.maven.org/maven2/io/delta/delta-storage/2.1.0/delta-storage-2.1.0.jar"
        }}
    ]}‘

  3. Check the job status from the EMR on EKS console.
  4. When the job is complete, query the table in Athena:
    select * from delta_contact where id=103

Clean up

To avoid incurring future charges, delete the resources generated if you don’t need the solution anymore. Run the following cleanup script (change the Region if necessary):

export EMRCLUSTER_NAME=emr-on-eks-quickstart
export AWS_REGION=us-east-1
./clean_up.sh

Conclusion

Implementing an ACID-compliant data lake with EMR on EKS enables you focus more on delivering business value, instead of worrying about managing complexities and reliabilities at the data storage layer.

This post presented three different transactional storage frameworks that can meet your ACID needs. They ensure you never read partial data (Atomicity). The read/write isolation allows you to see consistent snapshots of the data, even if an update occurs at the same time (Consistency and Isolation). All the transactions are stored directly to the underlying Amazon S3-backed data lake, which is designed for 11 9’s of durability (Durability).

For more information, check out the sample GitHub repository used in this post and the EMR on EKS Workshop. They will get you started with running your familiar transactional framework with EMR on EKS. If you want dive deep into each storage format, check out the following posts:


About the authors

Amir Shenavandeh is a Sr Analytics Specialist Solutions Architect and Amazon EMR subject matter expert at Amazon Web Services. He helps customers with architectural guidance and optimisation. He leverages his experience to help people bring their ideas to life, focusing on distributed processing and big data architectures.

Melody Yang is a Senior Big Data Solutions Architect for Amazon EMR at AWS. She is an experienced analytics leader working with AWS customers to provide best practice guidance and technical advice in order to assist their success in data transformation. Her areas of interests are open-source frameworks and automation, data engineering, and DataOps.

Amit Maindola is a Data Architect focused on big data and analytics at Amazon Web Services. He helps customers in their digital transformation journey and enables them to build highly scalable, robust, and secure cloud-based analytical solutions on AWS to gain timely insights and make critical business decisions.

Stream Apache HBase edits for real-time analytics

Post Syndicated from Amir Shenavandeh original https://aws.amazon.com/blogs/big-data/stream-apache-hbase-edits-for-real-time-analytics/

Apache HBase is a non-relational database. To use the data, applications need to query the database to pull the data and changes from tables. In this post, we introduce a mechanism to stream Apache HBase edits into streaming services such as Apache Kafka or Amazon Kinesis Data Streams. In this approach, changes to data are pushed and queued into a streaming platform such as Kafka or Kinesis Data Streams for real-time processing, using a custom Apache HBase replication endpoint.

We start with a brief technical background on HBase replication and review a use case in which we store IoT sensor data into an HBase table and enrich rows using periodic batch jobs. We demonstrate how this solution enables you to enrich the records in real time and in a serverless way using AWS Lambda functions.

Common scenarios and use cases of this solution are as follows:

  • Auditing data, triggers, and anomaly detection using Lambda
  • Shipping WALEdits via Kinesis Data Streams to Amazon OpenSearch Service (successor to Amazon Elasticsearch Service) to index records asynchronously
  • Triggers on Apache HBase bulk loads
  • Processing streamed data using Apache Spark Streaming, Apache Flink, Amazon Kinesis Data Analytics, or Kinesis analytics
  • HBase edits or change data capture (CDC) replication into other storage platforms, such as Amazon Simple Storage Service (Amazon S3), Amazon Relational Database Service (Amazon RDS), and Amazon DynamoDB
  • Incremental HBase migration by replaying the edits from any point in time, based on configured retention in Kafka or Kinesis Data Streams

This post progresses into some common use cases you might encounter, along with their design options and solutions. We review and expand on these scenarios in separate sections, in addition to the considerations and limits in our application designs.

Introduction to HBase replication

At a very high level, the principle of HBase replication is based on replaying transactions from a source cluster to the destination cluster. This is done by replaying WALEdits or Write Ahead Log entries on the RegionServers of the source cluster into the destination cluster. To explain WALEdits, in HBase, all the mutations in data like PUT or DELETE are written to MemStore of their specific region and are appended to a WAL file as WALEdits or entries. Each WALEdit represents a transaction and can carry multiple write operations on a row. Because the MemStore is an in-memory entity, in case a region server fails, the lost data can be replayed and restored from the WAL files. Having a WAL is optional, and some operations may not require WALs or can request to bypass WALs for quicker writes. For example, records in a bulk load aren’t recorded in WAL.

HBase replication is based on transferring WALEdits to the destination cluster and replaying them so any operation that bypasses WAL isn’t replicated.

When setting up a replication in HBase, a ReplicationEndpoint implementation needs to be selected in the replication configuration when creating a peer, and on every RegionServer, an instance of ReplicationEndpoint runs as a thread. In HBase, a replication endpoint is pluggable for more flexibility in replication and shipping WALEdits to different versions of HBase. You can also use this to build replication endpoints for sending edits to different platforms and environments. For more information about setting up replication, see Cluster Replication.

HBase bulk load replication HBASE-13153

In HBase, bulk loading is a method to directly import HFiles or Store files into RegionServers. This avoids the normal write path and WALEdits. As a result, far less CPU and network resources are used when importing big portions of data into HBase tables.

You can also use HBase bulk loads to recover data when an error or outage causes the cluster to lose track of regions and Store files.

Because bulk loads skip WAL creation, all new records aren’t replicated to the secondary cluster. In HBASE-13153, which is an enhancement, a bulk load is represented as a bulk load event, carrying the location of the imported files. You can activate this by setting hbase.replication.bulkload.enabled to true and setting hbase.replication.cluster.id to a unique value as a prerequisite.

Custom streaming replication endpoint

We can use HBase’s pluggable endpoints to stream records into platforms such as Kinesis Data Streams or Kafka. Transferred records can be consumed by Lambda functions, processed by a Spark Streaming application or Apache Flink on Amazon EMR, Kinesis Data Analytics, or any other big data platform.

In this post, we demonstrate an implementation of a custom replication endpoint that allows replicating WALEdits in Kinesis Data Streams or Kafka topics.

In our example, we built upon the BaseReplicationEndpoint abstract class, inheriting the ReplicationEndpoint interface.

The main method to implement and override is the replicate method. This method replicates a set of items it receives every time it’s called and blocks until all those records are replicated to the destination.

For our implementation and configuration options, see our GitHub repository.

Use case: Enrich records in real time

We now use the custom streaming replication endpoint implementation to stream HBase edits to Kinesis Data Streams.

The provided AWS CloudFormation template demonstrates how we can set up an EMR cluster with replication to either Kinesis Data Streams or Apache Kafka and consume the replicated records, using a Lambda function to enrich data asynchronously, in real time. In our sample project, we launch an EMR cluster with an HBase database. A sample IoT traffic generator application runs as a step in the cluster and puts records, containing a registration number and an instantaneous speed, into a local HBase table. Records are replicated in real time into a Kinesis stream or Kafka topic based on the selected option at launch, using our custom HBase replication endpoint. When the step starts putting the records into the stream, a Lambda function is provisioned and starts digesting records from the beginning of the stream and catches up with the stream. The function calculates a score per record, based on a formula on variation from minimum and maximum speed limits in the use case, and persists the result as a score qualifier into a different column family, out of replication scope in the source table, by running HBase puts on RowKey.

The following diagram illustrates this architecture.

To launch our sample environment, you can use our template on GitHub.

The template creates a VPC, public and private subnets, Lambda functions to consume records and prepare the environment, an EMR cluster with HBase, and a Kinesis data stream or an Amazon Managed Streaming for Apache Kafka (Amazon MSK) cluster depending on the selected parameters when launching the stack.

Architectural design patterns

Traditionally, Apache HBase tables are considered as data stores, where consumers get or scan the records from tables. It’s very common in modern databases to react to database logs or CDC for real-time use cases and triggers. With our streaming HBase replication endpoint, we can project table changes into message delivery systems like Kinesis Data Streams or Apache Kafka.

We can trigger Lambda functions to consume messages and records from Apache Kafka or Kinesis Data Streams to consume the records in a serverless design or the wider Amazon Kinesis ecosystems such as Kinesis Data Analytics or Amazon Kinesis Data Firehose for delivery into Amazon S3. You could also pipe in Amazon OpenSearch Service.

A wide range of consumer ecosystems, such as Apache Spark, AWS Glue, and Apache Flink, is available to consume from Kafka and Kinesis Data Streams.

Let’s review few other common use cases.

Index HBase rows

Apache HBase rows are retrievable by RowKey. Writing a row into HBase with the same RowKey overwrites or creates a new version of the row. To retrieve a row, it needs to be fetched by the RowKey or a range of rows needs to be scanned if the RowKey is unknown.

In some use cases, scanning the table for a specific qualifier or value is expensive if we index our rows in another parallel system like Elasticsearch asynchronously. Applications can use the index to find the RowKey. Without this solution, a periodic job has to scan the table and write them into an indexing service like Elasticsearch to hydrate the index, or the producing application has to write in both HBase and Elasticsearch directly, which adds overhead to the producer.

Enrich and audit data

A very common use case for HBase streaming endpoints is enriching data and storing the enriched records in a data store, such as Amazon S3 or RDS databases. In this scenario, a custom HBase replication endpoint streams the records into a message distribution system such as Apache Kafka or Kinesis Data Streams. Records can be serialized, using the AWS Glue Schema Registry for schema validation. A consumer on the other end of the stream reads the records, enriches them, and validates against a machine learning model in Amazon SageMaker for anomaly detection. The consumer persists the records in Amazon S3 and potentially triggers an alert using Amazon Simple Notification Service (Amazon SNS). Stored data on Amazon S3 can be further digested on Amazon EMR, or we can create a dashboard on Amazon QuickSight, interfacing Amazon Athena for queries.

The following diagram illustrates our architecture.

Store and archive data lineage

Apache HBase comes with the snapshot feature. You can freeze the state of tables into snapshots and export them to any distributed file system like HDFS or Amazon S3. Recovering snapshots restores the entire table to the snapshot point.

Apache HBase also supports versioning at the row level. You can configure column families to keep row versions, and the default versioning is based on timestamps.

However, when using this approach to stream records into Kafka or Kinesis Data Streams, records are retained inside the stream, and you can partially replay a period. Recovering snapshots only recovers up to the snapshot point and the future records aren’t present.

In Kinesis Data Streams, by default records of a stream are accessible for up to 24 hours from the time they are added to the stream. This limit can be increased to up to 7 days by enabling extended data retention, or up to 365 days by enabling long-term data retention. See Quotas and Limits for more information.

In Apache Kafka, record retention has virtually no limits based on available resources and disk space configured on the Kafka cluster, and can be configured by setting log.retention.

Trigger on HBase bulk load

The HBase bulk load feature uses a MapReduce job to output table data in HBase’s internal data format, and then directly loads the generated Store files into the running cluster. Using bulk load uses less CPU and network resources than loading via the HBase API, as HBase bulk load bypasses WALs in the write path and the records aren’t seen by replication. However, since HBASE-13153, you can configure HBase to replicate a meta record as an indication of a bulk load event.

A Lambda function processing replicated WALEdits can listen to this event to trigger actions, such as automatically refreshing a read replica HBase cluster on Amazon S3 whenever a bulk load happens. The following diagram illustrates this workflow.

Considerations for replication into Kinesis Data Streams

Kinesis Data Streams is a massively scalable and durable real-time data streaming service. Kinesis Data Streams can continuously capture gigabytes of data per second from hundreds of thousands of sources with very low latency. Kinesis is fully managed and runs your streaming applications without requiring you to manage any infrastructure. It’s durable, because records are synchronously replicated across three Availability Zones, and you can increase data retention to 365 days.

When considering Kinesis Data Streams for any solution, it’s important to consider service limits. For instance, as of this writing, the maximum size of the data payload of a record before base64-encoding is up to 1 MB, so we must make sure the records or serialized WALEdits remain within the Kinesis record size limit. To be more efficient, you can enable the hbase.replication.compression-enabled attribute to GZIP compress the records before sending them to the configured stream sink.

Kinesis Data Streams retains the order of the records within the shards as they arrive, and records can be read or processed in the same order. However, in this sample custom replication endpoint, a random partition key is used so that the records are evenly distributed between the shards. We can also use a hash function to generate a partition key when putting records into the stream, for example based on the Region ID so that all the WALEdits from the same Region land in the same shard and consumers can assume Region locality per shards.

For delivering records in KinesisSinkImplemetation, we use the Amazon Kinesis Producer Library (KPL) to put records into Kinesis data streams. The KPL simplifies producer application development; we can achieve high write throughput to a Kinesis data stream. We can use the KPL in either synchronous or asynchronous use cases. We suggest using the higher performance of the asynchronous interface unless there is a specific reason to use synchronous behavior. KPL is very configurable and has retry logic built in. You can also perform record aggregation for maximum throughput. In KinesisSinkImplemetation, by default records are asynchronously replicated to the stream. We can change to synchronous mode by setting hbase.replication.kinesis.syncputs to true. We can enable record aggregation by setting hbase.replication.kinesis.aggregation-enabled to true.

The KPL can incur an additional processing delay because it buffers records before sending them to the stream based on a user-configurable attribute of RecordMaxBufferedTime. Larger values of RecordMaxBufferedTime results in higher packing efficiencies and better performance. However, applications that can’t tolerate this additional delay may need to use the AWS SDK directly.

Kinesis Data Streams and the Kinesis family are fully managed and easily integrate with the rest of the AWS ecosystem with minimum development effort with services such as the AWS Glue Schema Registry and Lambda. We recommend considering Kinesis Data Streams for low-latency, real-time use cases on AWS.

Considerations for replication into Apache Kafka

Apache Kafka is a high-throughput, scalable, and highly available open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

AWS offers Amazon MSK as a fully managed Kafka service. Amazon MSK provides the control plane operations and runs open-source versions of Apache Kafka. Existing applications, tooling, and plugins from partners and the Apache Kafka community are supported without requiring changes to application code.

You can configure this sample project for Apache Kafka brokers directly or just point towards an Amazon MSK ARN for replication.

Although there is virtually no limit on the size of the messages in Kafka, the default maximum message size is set to 1 MB by default, so we must make sure the records, or serialized WALEdits, remain within the maximum message size for topics.

The Kafka producer tries to batch records together whenever possible to limit the number of requests for more efficiency. This is configurable by setting batch.size, linger.ms, and delivery.timeout.ms.

In Kafka, topics are partitioned, and partitions are distributed between different Kafka brokers. This distributed placement allows for load balancing of consumers and producers. When a new event is published to a topic, it’s appended to one of the topic’s partitions. Events with the same event key are written to the same partition, and Kafka guarantees that any consumer of a given topic partition can always read that partition’s events in exactly the same order as they were written. KafkaSinkImplementation uses a random partition key to distribute the messages evenly between the partitions. This could be based on a heuristic function, for example based on Region ID, if the order of the WALEdits or record locality is important by the consumers.

Semantic guarantees

Like any streaming application, it’s important to consider semantic guarantee from the producer of messages, to acknowledge or fail the status of delivery of messages in the message queue and checkpointing on the consumer’s side. Based on our use cases, we need to consider the following:

  • At most once delivery – Messages are never delivered more than once, and there is a chance of losing messages
  • At least once delivery – Messages can be delivered more than once, with no loss of messages
  • Exactly once delivery – Every message is delivered only once, and there is no loss of messages

After changes are persisted as WALs and in MemStore, the replicate method in ReplicationEnpoint is called to replicate a collection of WAL entries and returns a Boolean (true/false) value. If the returned value is true, the entries are considered successfully replicated by HBase and the replicate method is called for the next batch of WAL entries. Depending on configuration, both KPL and Kafka producers might buffer the records for longer if configured for asynchronous writes. Failures can cause loss of entries, retries, and duplicate delivery of records to the stream, which could be determinantal for synchronous or asynchronous message delivery.

If our operations aren’t idempotent, you can checkpoint or check for unique sequence numbers on the consumer side. For a simple HBase record replication, RowKey operations are idempotent and they carry a timestamp and sequence ID.

Summary

Replication of HBase WALEdits into streams is a powerful tool that you can use in multiple use cases and in combination with other AWS services. You can create practical solutions to further process records in real time, audit the data, detect anomalies, set triggers on ingested data, or archive data in streams to be replayed on other HBase databases or storage services from a point in time. This post outlined some common use cases and solutions, along with some best practices when implementing your custom HBase streaming replication endpoints.

Review, clone, and try our HBase replication endpoint implementation from our GitHub repository and launch our sample CloudFormation template.

We like to learn about your use cases. If you have questions or suggestions, please leave a comment.


About the Authors

Amir Shenavandeh is a Senior Hadoop systems engineer and Amazon EMR subject matter expert at Amazon Web Services. He helps customers with architectural guidance and optimization. He leverages his experience to help people bring their ideas to life, focusing on distributed processing and big data architectures.

Maryam Tavakoli is a Cloud Engineer and Amazon OpenSearch subject matter expert at Amazon Web Services. She helps customers with their Analytics and Streaming workload optimization and is passionate about solving complex problems with simplistic user experience that can empower customers to be more productive.