Tag Archives: EMR

Build incremental data pipelines to load transactional data changes using AWS DMS, Delta 2.0, and Amazon EMR Serverless

Post Syndicated from Sankar Sundaram original https://aws.amazon.com/blogs/big-data/build-incremental-data-pipelines-to-load-transactional-data-changes-using-aws-dms-delta-2-0-and-amazon-emr-serverless/

Building data lakes from continuously changing transactional data of databases and keeping data lakes up to date is a complex task and can be an operational challenge. A solution to this problem is to use AWS Database Migration Service (AWS DMS) for migrating historical and real-time transactional data into the data lake. You can then apply transformations and store data in Delta format for managing inserts, updates, and deletes.

Amazon EMR Serverless is a serverless option in Amazon EMR that makes it easy for data analysts and engineers to run open-source big data analytics frameworks without configuring, managing, and scaling clusters or servers. EMR Serverless automatically provisions and scales the compute and memory resources required by your applications, and you only pay for the resources that the applications use. EMR Serverless also provides you with more flexibility on overriding default Spark configurations, customizing EMR Serverless images, and customizing Spark driver and executor sizes to better suit specific workloads.

This post demonstrates how to implement a solution that uses AWS DMS to stream ongoing replication or change data capture (CDC) from an Amazon Aurora PostgreSQL-Compatible Edition database into Amazon Simple Storage Service (Amazon S3). We then apply transformations using Spark jobs on an EMR Serverless application and write transformed output into open-source Delta tables in Amazon S3. The Delta tables created by the EMR Serverless application are exposed through the AWS Glue Data Catalog and can be queried through Amazon Athena. Although this post uses an Aurora PostgreSQL database hosted on AWS as the data source, the solution can be extended to ingest data from any of the AWS DMS supported databases hosted on your data centers.

Solution overview

The following diagram shows the overall architecture of the solution that we implement in this post.

Architecture diagram

The solution consists of the following steps for implementing a full and incremental (CDC) data ingestion from a relational database:

  • Data storage and data generation – We create an Aurora PostgreSQL database and generate fictional trip data by running a stored procedure. The data will have attributes like trip ID (primary key), timestamp, source location, and destination location. Incremental data is generated in the PostgreSQL table by running custom SQL scripts.
  • Data ingestion – Steps 1 and 2 use AWS DMS, which connects to the source database and moves full and incremental data (CDC) to Amazon S3 in Parquet format. Let’s refer to this S3 bucket as the raw layer.
  • Data transformation – Steps 3 and 4 represent an EMR Serverless Spark application (Amazon EMR 6.9 with Apache Spark version 3.3.0) created using Amazon EMR Studio. The script reads input data from the S3 raw bucket, and then invokes Delta Lake’s MERGE statements to merge the data with the target S3 bucket (curated layer). The script also creates and updates a manifest file on Amazon S3 every time the job is run to enable data access from Athena and Amazon Redshift Spectrum.
  • Data access – The EMR Serverless job has code snippets that create a Delta table in the AWS Glue Data Catalog in Step 5. Steps 6 and 7 describe using Athena and Redshift Spectrum to query data from the Delta tables using standard SQL through the AWS Glue Data Catalog.
  • Data pipeline – Step 8 describes the process for triggering the data pipeline in a periodic manner through Airflow operators using Amazon Managed Workflows for Apache Airflow (Amazon MWAA). Refer to Submitting EMR Serverless jobs from Airflow for additional details. In this post, AWS DMS has been configured to replicate data from Amazon Aurora PostgreSQL-Compatible Edition into an S3 bucket with hourly partitions. The Airflow DAG can be configured to call an EMR Serverless job to process the past X hours of data based on specific project requirements. Implementation of the Airflow setup is not explored within the scope of this post.

The architecture has the following major features:

  • Reliability – The end-to-end architecture is made resilient with the Multi-AZ feature of EMR Serverless and using Multi-AZ deployments for AWS DMS and Amazon Aurora PostgreSQL-Compatible Edition. When you submit jobs to an EMR Serverless application, those jobs are automatically distributed to different Availability Zones in the Region. A job is run in a single Availability Zone to avoid performance implications of network traffic across Availability Zones. In case an Availability Zone is impaired, a job submitted to your EMR Serverless application is automatically run in a different (healthy) Availability Zone. When using resources in a private VPC, EMR Serverless recommends that you specify the private VPC configuration for multiple Availability Zones so that EMR Serverless can automatically select a healthy Availability Zone.
  • Cost optimization – When you run Spark or Hive applications using EMR Serverless, you pay for the amount of vCPU, memory, and storage resources consumed by your applications, leading to optimal utilization of resources. There is no separate charge for Amazon Elastic Compute Cloud (Amazon EC2) instances or Amazon Elastic Block Store (Amazon EBS) volumes. For additional details on cost, refer to Amazon EMR Serverless cost estimator.
  • Performance efficiency – You can run analytics workloads at any scale with automatic on-demand scaling that resizes resources in seconds to meet changing data volumes and processing requirements. EMR Serverless includes the Amazon EMR performance-optimized runtime for Apache Spark and Hive. The Amazon EMR runtime for Spark is 100% API-compatible with OSS Spark and is over 3.5 times as fast as the standard open-source, so your jobs run faster and incur less compute costs. With fast and fine-grained scaling in EMR Serverless, if a pipeline runs daily and needs to process 1 GB of data one day and 100 GB of data another day, EMR Serverless automatically scales to handle that load.
  • Monitoring – EMR Serverless sends metrics to Amazon CloudWatch at the application and job level every 1 minute. You can set up a single-view dashboard in CloudWatch to visualize application-level and job-level metrics using an AWS CloudFormation template provided on the EMR Serverless CloudWatch Dashboard GitHub repository. Also, EMR Serverless can store application logs in a managed storage, Amazon S3, or both based on your configuration settings. After you submit a job to an EMR Serverless application, you can view the real-time Spark UI or the Hive Tez UI for the running job from the EMR Studio console or request a secure URL using the GetDashboardForJobRun API. For completed jobs, you can view the Spark History Server or the Persistent Hive Tez UI from the EMR Studio console.

The following steps are performed to implement this solution:

  1. Connect to the Aurora PostgreSQL instance and generate a sample dataset.
    • Set up a data pipeline for loading data from Amazon Aurora PostgreSQL-Compatible Edition into Delta Lake on Amazon S3 and query using Athena:
    • Start the AWS DMS task to perform full table load and capture ongoing replication to the S3 raw layer.
    • Run the EMR Serverless Spark application to load data into Delta Lake.
    • Query the Delta tables (native tables) through Athena.
  2. Run the data pipeline to capture incremental data changes into Delta Lake:
    • Generate an incremental (CDC) dataset and insert it into the Aurora PostgreSQL database.
    • Run the EMR Serverless Spark application to merge CDC data in the S3 curated layer (incremental load).
    • Query the Delta Lake tables through Athena to validate the merged data.


We use a CloudFormation template to provision the AWS resources required for the solution. The CloudFormation template requires you to select an EC2 key pair. This key is configured on an EC2 instance that lives in the public subnet. We use this EC2 instance to connect to the Aurora PostgreSQL instance that lives in the private subnet. Make sure you have a key in the Region where you deploy the template. If you don’t have one, you can create a new key pair.

To walk through this post, we use Delta Lake version > 2.0.0, which is supported in Apache Spark 3.2.x. Choose the Delta Lake version compatible with your Spark version by visiting the Delta Lake releases page. We use an EMR Serverless application with version emr-6.9.0, which supports Spark version 3.3.0.

Deploy your resources

To provision the resources needed for the solution, complete the following steps:

  1. Choose Launch Stack:

  1. For Stack name, enter emr-serverless-deltalake-blog.
  2. For DatabaseUserName, enter the user name for logging in to Amazon Aurora PostgreSQL-Compatible Edition. Keep the default value if you don’t want to change it.
  3. For DatabasePassword, enter the password for logging in to Amazon Aurora PostgreSQL-Compatible Edition.
  4. For ClientIPCIDR, enter the IP address of your SQL client that will be used to connect to the EC2 instance. We use this EC2 instance to connect to the Aurora PostgreSQL database.
  5. For KeyName, enter the key pair to be used in your EC2 instance. This EC2 instance will be used as a proxy to connect from your SQL client to the Aurora PostgreSQL source database.
  6. For EC2ImageId, PrivateSubnet1CIDR, PrivateSubnet2CIDR, PublicSubnetCIDR, and VpcCIDR, keep the default values or choose appropriate values for the VPC and EC2 image for your specific environment.
  7. Choose Next.
  8. Choose Next again.
  9. On the review page, select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  10. Choose Create stack.

After the CloudFormation template is complete and the resources are created, the Outputs tab shows the information shown in the following screenshot.

The CloudFormation template creates all the resources needed for the solution workflow:

  • S3 raw and curated buckets
  • Aurora PostgreSQL database
  • AWS DMS migration task, replication instance, and other resources
  • EC2 instance for running data ingestion scripts
  • AWS Identity and Access Management (IAM) roles and policies needed to perform the necessary activities as part of this solution
  • VPC, subnets, security groups, and relevant network components
  • AWS Lambda functions that perform setup activities required for this workflow
  • Additional components needed for running the EMR Serverless workflow

You can find the PySpark script in the raw S3 bucket on the Amazon S3 console as shown in the following screenshot. The bucket will have the naming structure <CloudFormation template name>-rawS3bucket-<random string>. Make a note of the S3 path to the emr_delta_cdc.py script; you need this information while submitting the Spark job via the EMR Serverless application.

The preceding task for creating the resources via CloudFormation assumes that AWS Lake Formation is not enabled in the Region (which we enable later in this post). If you already have Lake Formation enabled in the Region, make sure the IAM user or role used in the CloudFormation template has the necessary permissions to create a database in the AWS Glue Data Catalog.

Connect to the Aurora PostgreSQL instance and generate a sample dataset

Connect to the Aurora PostgreSQL endpoint using your preferred client. For this post, we use the PSQL command line tool. Note that the IP address of the client machine from which you’re connecting to the database must be updated in the Aurora PostgreSQL security group. This is done by the CloudFormation template based on the input parameter value for ClientIPCIDR. If you’re accessing the database from another machine, update the security group accordingly.

  1. Connect to your EC2 instance from the command line using the public DNS of the EC2 instance from the CloudFormation template output.
  2. Log in to the EC2 instance and connect to the Aurora PostgreSQL instance using the following commands (the Aurora PostgreSQL endpoint is available on the Outputs tab of the CloudFormation stack):
    psql -h << Aurora PostgreSQL endpoint >> -p 5432 -U <<username>> -d emrdelta_source_db

  1. Run the following commands to create a schema and table for the fictional trip dataset:
    create schema delta_emr_source;
    create table delta_emr_source.travel_details (trip_id int PRIMARY KEY,tstamp timestamp, route_id varchar(2),destination varchar(50),source_location varchar(50));

  1. Create the following stored procedure to generate the records for the trip dataset and insert the records into the table.
    create or replace procedure delta_emr_source.insert_records(records int)
    language plpgsql
    as $$
    max_trip_id integer;
    --get max trip_id
    select coalesce(max(trip_id),1) into max_trip_id from delta_emr_source.travel_details;
    --insert records
    for i in max_trip_id+1..max_trip_id+records loop
    INSERT INTO delta_emr_source.travel_details (trip_id, tstamp, route_id,destination,source_location) values (i, current_timestamp, chr(65 + (i % 10)),(array['Seattle', 'New York', 'New Jersey', 'Los Angeles', 'Las Vegas',
    'Tucson', 'Washington DC', 'Philadelphia', 'Miami', 'San Francisco'])[(floor(random() * 10))+1],(array['Seattle', 'New York', 'New Jersey', 'Los Angeles', 'Las Vegas',
    'Tucson', 'Washington DC', 'Philadelphia', 'Miami', 'San Francisco'])[(floor(random() * 10))+1]);
    end loop;
    raise notice 'Inserted record count - %', records;
    end; $$;

  2. Call the preceding stored procedure to insert 20,000 records into the Aurora PostgreSQL database:
    call delta_emr_source.insert_records(20000);

  3. After the stored procedure is complete, verify that the records have been inserted successfully:
    select count(*) from delta_emr_source.travel_details;

Set up a data pipeline for loading data into Delta tables on Amazon S3 and query using Athena

In this section, we walk through the steps to set up a data pipeline that loads data from Amazon Aurora PostgreSQL-Compatible Edition into Delta tables on Amazon S3 and then query the data using Athena.

Start the AWS DMS task to perform full table load to the S3 raw layer

To perform the full table load, complete the following steps:

  1. On the AWS DMS console, choose Database migration tasks in the navigation pane.
  2. Select the task that was created by the CloudFormation template (emrdelta-postgres-s3-migration).
  3. On the Actions menu, choose Restart/Resume.

The task starts the full load and ongoing replication of data from the source database to Amazon S3.

  1. Wait for the job to complete.

You can validate that the data has been migrated successfully checking the Load state column for the AWS DMS task.

  1. Navigate to the S3 bucket created from the CloudFormation template to store raw data from AWS DMS.The bucket will have the naming structure <CloudFormation template name>-rawS3bucket-<random string>.
  2. Navigate to the folder delta_emr_source/travel_details in the raw S3 bucket. You can verify the S3 folder has Parquet data populated from the AWS DMS task.

Run the EMR Serverless Spark application to load data into Delta tables

We use EMR Studio to manage and submit jobs in an EMR Serverless application.

  1. Launch EMR Studio and create an EMR Serverless application.
  2. For Name, enter emr-delta-blog.
  3. For Type, choose Spark.
  4. For Release version, choose your release version.
  5. For Architecture, select x86_64.
  6. For Application setup options, select Choose default settings.

  1. Choose Create application and verify that the EMR application has been created successfully on the Amazon EMR console.

  1. Choose emr_delta_blog and then choose Start application. You can verify that the EMR application has started successfully on the Amazon EMR console, as shown in the following screenshot.

The application will move to Stopped status after a period of inactivity. When you submit the job to the application, it will start again and start the job. This provides cost savings because the jobs are run on demand as opposed to maintaining a running EMR cluster.

  1. While the application is in Started status, choose Submit job to submit the job to the application.

Create a new job in the Job details page

  1. For Name, enter emr-delta-load-job.
  2. For Runtime role, choose emrserverless-execution-role.
  3. For S3 URI, enter the S3 (raw bucket) path where the script emr_delta_cdc.py is uploaded.
  4. For Script arguments, enter ["I","delta_emr_source","9999-12-31-01","travel_details","route_id"].

The script arguments provide the following details to the EMR Serverless application:

  • I – The first argument represents the data load type. The allowed values are I for full load and U for incremental data load.
  • delta_emr_source – The second argument represents the source database schema from which data is being migrated through the AWS DMS task.
  • 9999-12-31-01 – The third argument represents the partition from which data needs to be loaded in an incremental fashion. This argument is used only during CDC data load; for full load, we have provided a default value (9999-12-31-01).
  • travel_details – The fourth argument represents the source database table from which data is being migrated through the AWS DMS task. Use a semicolon as a delimiter when entering multiple tables.
  • route_id – The fifth argument represents the partition keys on which the table data should be partitioned when stored in the S3 curated bucket. Use a semicolon as a delimiter when entering comma-separated partition keys for multiple tables.

With arguments, you can group a set of tables and submit the job to an EMR Serverless application. You can provide multiple table names separated by semicolons and enter the partition keys for those tables also separated by semicolon. If a particular table doesn’t have a partition key, simply enter a semicolon alone. The number of semicolon-separated values should match the table and partition key arguments for the script to run successfully.

Also, if you want to capture additional tables as part of an existing EMR Serverless job, you need to create a new EMR Serverless job to capture full load separately (set the first argument as I along with the new table names) and then change the argument list of the existing EMR Serverless job to add those new tables to capture incremental data load going forward.

EMR Serverless version 6.9.0 comes pre-installed with Delta version 2.1.0. Refer to About Amazon EMR Releases for more details about pre-installed libraries and applications for a specific Amazon EMR release. Before this, we have to upload the Delta JAR files to an S3 bucket in your account and provide the JAR file path in the application configurations using the spark.jars option. In this walkthrough, we create an EMR Serverless 6.9.0 application and use the pre-installed Delta jars from Amazon EMR.

  1. Under Spark properties, choose Edit in text and enter the following configurations:
--conf spark.jars=/usr/share/aws/delta/lib/delta-core.jar,/usr/share/aws/delta/lib/delta-storage.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog --conf spark.submit.pyFiles=/usr/share/aws/delta/lib/delta-core.jar --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory

If you want to use a different version of Delta JAR files, you can replace the S3 path of the JAR files in these configuration options.

  1. Leave the rest of the configurations at their default and choose Submit job.
  2. Wait for the job to complete successfully. You can verify this on the EMR Serverless console.

  1. Additionally, go to the S3 location (the curated bucket created by AWS CloudFormation) and verify that the Delta files are created along with the manifest file.

  1. Select a job run and then choose Spark History Server (Completed jobs) on the View Application UIs menu.

You can now use the Spark History Server UI to navigate to various tabs and analyze the job run in a detailed manner. For Spark error and output logs, you can navigate to the Executors tab and explore the driver or executor logs as required. This can help you to debug the job in case of failures by looking at the Spark logs.You can also choose Spark UI (Running jobs) to track the progress of the EMR Serverless Spark jobs while they are running.

The data load script is the same for initial and incremental data load because it can handle both the workflows through script arguments:

from pyspark.sql import SparkSession
from datetime import datetime
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import boto3
import sys
from delta import *

# S3 bucket location, auto-populated for this post. Replace for other jobs
curated_bucket= "<<curated_bucket_name>>"

spark = (
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

#Check for argument list and if it doesn't match the expected argument count, exit the program
if len(sys.argv) != 6:
print("This script requires 5 arguments for successful execution - Load_type,database_schema,CDC_path,source_table,Partition_keys")

s3 = boto3.client('s3')

# Split table names into a list if there are more than one table seperated by semicolon
tables = sys.argv[4].split(";")

schema = sys.argv[2]
load_type = sys.argv[1]
cdc_partition = sys.argv[3]
deltaHivePath = "s3://" + curated_bucket + "/" + schema + "/"
columns_to_drop = ["Op","schema_name", "table_name", "update_ts_dms", "tstamp"]
db_name = "emrserverless_delta"

# Split table partition keys into a list if there are more than one table separated by semicolon
partition_keys = sys.argv[5].split(";")
# Exit if length of table names and partition keys are different to ensure data is provided for all tables.
if len(tables)!=len(partition_keys):
print("Please enter partition keys for all tables. if partition key is not present enter empty semicolon - T1_PK;;T3PK")

i = 0
while i < len(tables):
table = tables[i]
partition_key = partition_keys[i].split(",")
if load_type == 'I':
print("Moving to Full-load logic for the table", table)

# Read the data from the raw bucket
source_df1 = spark.read.format("parquet").load(
"s3://" + raw_bucket + "/" + schema + "/" + table + "/")

# There is no target table in Delta format. Loading for the first time
# The following code segment populates Delta table in S3 and also
# updated the Glue catalog for querying with Athena.
additional_options = {"path": deltaHivePath + table + "/"}
if columns_to_drop is not None and columns_to_drop != '':
source_df1 = source_df1.drop(*columns_to_drop)

#Check for presence of partition key and before writing data to Curated bucket
if partition_key[0]:
.saveAsTable(db_name + ".spark_" + table)
.saveAsTable(db_name + ".spark_" + table)

# Generate symlink for Amazon Redshift Spectrum to read data
deltaTable = DeltaTable.forPath(spark, deltaHivePath + table + "/")

print("Moving to upsert logic, Reading data from partition - ",cdc_partition)
# The below logic will verify if the CDC path has data before proceeding with
# incremental load. if CDC path is not available for a specific table the load
# process is skipped to avoid spark read error.
resp = s3.list_objects_v2(
Prefix=schema +"/" +table +"/" +cdc_partition,
if 'CommonPrefixes' in resp:
update_df = spark.read.format("parquet").load(
"s3://" + raw_bucket + "/" + schema + "/" + table + "/" + cdc_partition + "/")

# Get recent record for each primary key to update the recent transaction to the Delta table
# This step is needed to de-dup transactions like inserts and deletes within the same batch
sort_order = Window.partitionBy(
update_df = update_df.withColumn("rec_val", row_number().over(

# upsert script using Merge operation. The below script updates/inserts data
# on all columns. In case you need to insert/update specific columns
# use whenNotMatchedInsert/whenMatchedUpdate functions and parameterize the input for each table
deltaTable = DeltaTable.forPath(spark, deltaHivePath + table + "/")
deltaTable.alias('trg') \
.merge(update_df.alias('src'),'trg.trip_id = src.trip_id')\
.whenNotMatchedInsertAll(condition="src.Op = 'I'") \
.whenMatchedUpdateAll(condition="src.Op='U'") \
.whenMatchedDelete(condition="src.Op = 'D'") \

# Generate symlink for Amazon Redshift Spectrum to read data
print("The path is empty for table -", table)
i = i + 1
print("The Job has completed execution...")

Monitor EMR Serverless application using CloudWatch dashboards

We can optionally monitor the EMR Serverless application using CloudWatch dashboards by installing the CloudFormation template from the EMR Serverless CloudWatch Dashboard GitHub repository.Follow the instructions on the Getting started section on the GitHub repository and deploy the CloudFormation template in your account.

You need to provide the EMR Serverless application ID as a parameter while deploying the CloudFormation stack, which can be obtained on the EMR Studio Applications page as shown in the following screenshot.

After the CloudFormation template is successfully deployed, navigate to the CloudWatch console to see a custom dashboard created for the EMR Serverless application ID that was provided to the CloudFormation template.

Choose the dashboard to see the different metrics for the EMR Serverless application in a single dashboard view.

You can see the available workers (one driver and two executors that were pre-initialized in the default configuration) and also the spike under successful job count that indicates the initial data load job that was completed successfully.

You could also monitor the CPU, memory, and storage allocated for the application, driver, and executor nodes separately.

The following image shows application metrics for three workers with 12 vCPUs (both driver and executor initialized with 4 vCPUs) and also the memory and storage usage. You can monitor the metrics from this dashboard and pre-initialize your application capacity that suits your specific workloads.

We can see the number of executors that were utilized for this job execution from the executor metrics section within the CloudWatch dashboard. We have used two executors and a driver for running this job.

Query the Delta tables through Athena

Previously, Delta tables were accessed through Athena by generating the manifest files (which maintain the list of data files to read for querying a Delta table). With the newly launched support in Athena for reading native Delta tables, it’s no longer required to generate and update manifest files. The Athena SQL engine version 3 can directly query native Delta tables. If you’re using an older engine version, change the engine version.

Navigate to the Athena console and start querying the data. Run a SELECT query and fetch the first 10 records to verify the data:

SELECT * FROM "AwsDataCatalog"."emrserverless_delta"."spark_travel_details" limit 10;

The table (native Delta table) has been created and updated to the AWS Glue Data Catalog from the EMR Serverless application code. You can successfully query and explore the data through Athena or Spark applications, but the schema definitions for individual columns aren’t updated in Data Catalog with this approach.

The following screenshot shows the Delta table created through code has a single array column. Athena supports reading native Delta tables and therefore we can read the data successfully even though the Data Catalog shows only a single array column.

If you need the individual column-level metadata to be available in the Data Catalog, run an AWS Glue crawler periodically to keep the AWS Glue metadata updated. For more information, refer to Introducing native Delta Lake table support with AWS Glue crawlers.

Run the data pipeline to load incremental data changes into the Delta tables

In this section, we walk through the steps to run the data pipeline.

Generate an incremental (CDC) dataset and insert it into the Aurora PostgreSQL database

  1. Log in to the EC2 instance via SSH and using the PSQL CLI, run the following SQL commands to generate CDC data on the source database:

update delta_emr_source.travel_details set destination='Tucson' where destination='Miami';
call delta_emr_source.insert_records(200);
delete from delta_emr_source.travel_details where destination='Los Angeles';

  1. Navigate to the AWS DMS console and verify whether the incremental records are populated to the S3 raw bucket by the replication task.

You can also verify in the S3 raw bucket location that the files are created under hourly partitioned folders.

Run the EMR Serverless Spark application to merge CDC data in the S3 curated layer (incremental load)

After the AWS DMS task has successfully loaded the incremental data, submit the Spark job on the EMR Serverless application to load the incremental data (CDC) with the following script arguments:

["U", "delta_emr_source", "2022-10-25-21", "travel_details","route_id"]

The partition path given here as 2022-10-25-21 should be changed as applicable in your use case. We use an example use case where the EMR Serverless job runs every hour, and the input data folder is partitioned on an hourly basis from AWS DMS. You can choose an appropriate partitioning strategy on the S3 raw bucket for your use case.

  1. Under Spark properties, choose Edit in text and enter the following configurations:
--conf spark.jars=/usr/share/aws/delta/lib/delta-core.jar,/usr/share/aws/delta/lib/delta-storage.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog --conf spark.submit.pyFiles=/usr/share/aws/delta/lib/delta-core.jar --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory

  1. When the job is successful, verify in Amazon S3 that more files are created in the _delta_log folder, capturing the changes from the current run.

Query the Delta tables through Athena to validate the merged data

Go to the Athena console to query the data and validate count to ensure that the table contains the most recent data:

SELECT destination, count(*) FROM "AwsDataCatalog"."emrserverless_delta"."spark_travel_details" group by destination;

If you also want to query this data from Amazon Redshift, you can create external tables in Redshift Spectrum for Delta tables. For more information, refer to Creating external tables for data managed in Delta Lake. Redshift Spectrum currently supports querying Delta tables through the manifest file option. A Delta table manifest contains a list of files that make up a consistent snapshot of the Delta table. The code snippet given in this post updates the manifest files every time new data is loaded in the Delta tables to ensure only the latest data is read from the Delta tables.

Clean up

To avoid incurring ongoing charges, clean up your infrastructure by deleting the stack from the AWS CloudFormation console. Delete the EMR Serverless application and any other resources you created during this exercise.


In this post, we demonstrated how to create a transactional data lake with Delta table format using EMR Serverless and AWS DMS. With the flexibility provided by EMR Serverless, you can use the latest version of open-source Delta framework on EMR Serverless (with the latest version of Spark) in order to support a wider range of transactional data lake needs based on various use cases.

Now you can build a transactional data lake for your organization with Delta table format and access data using Athena and Redshift Spectrum for various analytical workloads. You could use this high-level architecture for any other use cases where you need to use the latest version of Spark on EMR Serverless.

About the Authors

Sankar Sundaram is a Data Lab Architect at AWS, where he helps customers build and modernize data architectures and help them build secure, scalable, and performant data lake, database, and data warehouse solutions.

Monjumi Sarma is a Data Lab Solutions Architect at AWS. She helps customers architect data analytics solutions, which gives them an accelerated path towards modernization initiatives.

A new Spark plugin for CPU and memory profiling

Post Syndicated from Bo Xiong original https://aws.amazon.com/blogs/devops/a-new-spark-plugin-for-cpu-and-memory-profiling/


Have you ever wondered if there are low-hanging optimization opportunities to improve the performance of a Spark app? Profiling can help you gain visibility regarding the runtime characteristics of the Spark app to identify its bottlenecks and inefficiencies. We’re excited to announce the release of a new Spark plugin that enables profiling for JVM based Spark apps via Amazon CodeGuru. The plugin is open sourced on GitHub and published to Maven.


This post shows how you can onboard this plugin with two steps in under 10 minutes.

  • Step 1: Create a profiling group in Amazon CodeGuru Profiler and grant permission to your Amazon EMR on EC2 role, so that profiler agents can emit metrics to CodeGuru. Detailed instructions can be found here.
  • Step 2: Reference codeguru-profiler-for-spark when submitting your Spark job, along with PROFILING_CONTEXT and ENABLE_AMAZON_PROFILER defined.


Your app is built against Spark 3 and run on Amazon EMR release 6.x or newer. It doesn’t matter if you’re using Amazon EMR on Amazon Elastic Compute Cloud (Amazon EC2) or on Amazon Elastic Kubernetes Service (Amazon EKS).

Illustrative Example

For the purposes of illustration, consider the following example where profiling results are collected by the plugin and emitted to the “CodeGuru-Spark-Demo” profiling group.

spark-submit \
--master yarn \
--deploy-mode cluster \
--class \
--packages software.amazon.profiler:codeguru-profiler-for-spark:1.0 \
--conf spark.plugins=software.amazon.profiler.AmazonProfilerPlugin \
--conf spark.executorEnv.PROFILING_CONTEXT="{\\\"profilingGroupName\\\":\\\"CodeGuru-Spark-Demo\\\"}" \
--conf spark.executorEnv.ENABLE_AMAZON_PROFILER=true \
--conf spark.dynamicAllocation.enabled=false \t

An alternative way to specify PROFILING_CONTEXT and ENABLE_AMAZON_PROFILER is under the yarn-env.export classification for instance groups in the Amazon EMR web console. Note that PROFILING_CONTEXT, if configured in the web console, must escape all of the commas on top of what’s for the above spark-submit command.

    "classification": "yarn-env",
    "properties": {},
    "configurations": [
        "classification": "export",
        "properties": {
          "ENABLE_AMAZON_PROFILER": "true",
          "PROFILING_CONTEXT": "{\\\"profilingGroupName\\\":\\\"CodeGuru-Spark-Demo\\\"\\,\\\"driverEnabled\\\":\\\"true\\\"}"
        "configurations": []

Once the job above is launched on Amazon EMR, profiling results should show up in your CodeGuru web console in about 10 minutes, similar to the following screenshot. Internally, it has helped us identify issues, such as thread contentions (revealed by the BLOCKED state in the latency flame graph), and unnecessarily create AWS Java clients (revealed by the CPU Hotspots view).

Go to your profiling group under the Amazon CodeGuru web console. Click the “Visualize CPU” button to render a flame graph displaying CPU usage. Switch to the latency view to identify latency bottlenecks, and switch to the heap summary view to identify objects consuming most memory.


To help with troubleshooting, use a sample Spark app provided in the plugin to check if everything is set up correctly. Note that the profilingGroupName value specified in PROFILING_CONTEXT should match what’s created in CodeGuru.

spark-submit \
--master yarn \
--deploy-mode cluster \
--class software.amazon.profiler.SampleSparkApp \
--packages software.amazon.profiler:codeguru-profiler-for-spark:1.0 \
--conf spark.plugins=software.amazon.profiler.AmazonProfilerPlugin \
--conf spark.executorEnv.PROFILING_CONTEXT="{\\\"profilingGroupName\\\":\\\"CodeGuru-Spark-Demo\\\"}" \
--conf spark.executorEnv.ENABLE_AMAZON_PROFILER=true \
--conf spark.yarn.appMasterEnv.PROFILING_CONTEXT="{\\\"profilingGroupName\\\":\\\"CodeGuru-Spark-Demo\\\",\\\"driverEnabled\\\":\\\"true\\\"}" \
--conf spark.yarn.appMasterEnv.ENABLE_AMAZON_PROFILER=true \
--conf spark.dynamicAllocation.enabled=false \

Running the command above from the master node of your EMR cluster should produce logs similar to the following:

21/11/21 21:27:21 INFO Profiler: Starting the profiler : ProfilerParameters{profilingGroupName='CodeGuru-Spark-Demo', threadSupport=BasicThreadSupport (default), excludedThreads=[Signal Dispatcher, Attach Listener], shouldProfile=true, integrationMode='', memoryUsageLimit=104857600, heapSummaryEnabled=true, stackDepthLimit=1000, samplingInterval=PT1S, reportingInterval=PT5M, addProfilerOverheadAsSamples=true, minimumTimeForReporting=PT1M, dontReportIfSampledLessThanTimes=1}
21/11/21 21:27:21 INFO ProfilingCommandExecutor: Profiling scheduled, sampling rate is PT1S
21/11/21 21:27:23 INFO ProfilingCommand: New agent configuration received : AgentConfiguration(AgentParameters={MaxStackDepth=1000, MinimumTimeForReportingInMilliseconds=60000, SamplingIntervalInMilliseconds=1000, MemoryUsageLimitPercent=10, ReportingIntervalInMilliseconds=300000}, PeriodInSeconds=300, ShouldProfile=true)
21/11/21 21:32:23 INFO ProfilingCommand: Attempting to report profile data: start=2021-11-21T21:27:23.227Z end=2021-11-21T21:32:22.765Z force=false memoryRefresh=false numberOfTimesSampled=300
21/11/21 21:32:23 INFO javaClass: [HeapSummary] Processed 20 events.
21/11/21 21:32:24 INFO ProfilingCommand: Successfully reported profile

Note that the CodeGuru Profiler agent uses a reporting interval of five minutes. Therefore, any executor process shorter than five minutes won’t be reflected by the profiling result. If the right profiling group is not specified, or it’s associated with a wrong EC2 role in CodeGuru, then the log will show a message similar to “CodeGuruProfilerSDKClient: Exception while calling agent orchestration” along with a stack trace including a 403 status code. To rule out any network issues (e.g., your EMR job running in a VPC without an outbound gateway or a misconfigured outbound security group), then you can remote into an EMR host and ping the CodeGuru endpoint in your Region (e.g., ping codeguru-profiler.us-east-1.amazonaws.com).

Cleaning up

To avoid incurring future charges, you can delete the profiling group configured in CodeGuru and/or set the ENABLE_AMAZON_PROFILER environment variable to false.


In this post, we describe how to onboard this plugin with two steps. Consider to give it a try for your Spark app? You can find the Maven artifacts here. If you have feature requests, bug reports, feedback of any kind, or would like to contribute, please head over to the GitHub repository.


Bo Xiong

Bo Xiong is a software engineer with Amazon Ads, leveraging big data technologies to process petabytes of data for billing and reporting. His main interests include performance tuning and optimization for Spark on Amazon EMR, and data mining for actionable business insights.