Tag Archives: Amazon EMR

Amazon S3 Storage Lens adds performance metrics, support for billions of prefixes, and export to S3 Tables

Post Syndicated from Veliswa Boya original https://aws.amazon.com/blogs/aws/amazon-s3-storage-lens-adds-performance-metrics-support-for-billions-of-prefixes-and-export-to-s3-tables/

Today, we’re announcing three new capabilities for Amazon S3 Storage Lens that give you deeper insights into your storage performance and usage patterns. With the addition of performance metrics, support for analyzing billions of prefixes, and direct export to Amazon S3 Tables, you have the tools you need to optimize application performance, reduce costs, and make data-driven decisions about your Amazon S3 storage strategy.

New performance metric categories
S3 Storage Lens now includes eight new performance metric categories that help identify and resolve performance constraints across your organization. These are available at organization, account, bucket, and prefix levels. For example, the service helps you identify small objects in a bucket or prefix that can  slow down application performance. This can be mitigated by batching small objects or using the Amazon S3 Express One Zone storage class for higher performance small object workloads.

To access the new performance metrics, you need to enable performance metrics in the S3 Storage Lens advanced tier when creating a new Storage Lens dashboard or editing an existing configuration.

Metric category Details Use case Mitigation
Read request size Distribution of read request sizes (GET) by day Identify dataset with small read request patterns that slow down performance Small request: Batch small objects or use Amazon S3 Express One Zone for high-performance small object workloads
Write request size Distribution of write request sizes (PUT, POST, COPY, and UploadPart) by day Identify dataset with small write request patterns that slow down performance Large request: Parallelize requests, use MPU or use AWS CRT
Storage size Distribution of object sizes Identify dataset with small small objects that slow down performance Small object sizes: Consider bundling small objects
Concurrent PUT 503 errors Number of 503s due to concurrent PUT operation on same object Identify prefixes with concurrent PUT throttling that slow down performance For single writer, modify retry behavior or use Amazon S3 Express One Zone. For multiple writers, use consensus mechanism or use Amazon S3 Express One Zone
Cross-Region data transfer Bytes transferred and requests sent across Region, in Region Identify potential performance and cost degradation due to cross-Region data access Co-locate compute with data in the same AWS Region
Unique objects accessed Number or percentage of unique objects accessed per day Identify datasets where small subset of objects are being frequently accessed. These can be moved to higher performance storage tier for better performance Consider moving active data to Amazon S3 Express One Zone or other caching solutions
FirstByteLatency (existing Amazon CloudWatch metric) Daily average of first byte latency metric The daily average per-request time from the complete request being received to when the response starts to be returned
TotalRequestLatency (existing Amazon CloudWatch metric) Daily average of Total Request Latency The daily average elapsed per request time from the first byte received to the last byte sent

How it works
On the Amazon S3 console I choose Create Storage Lens dashboard to create a new dashboard. You can also edit an existing dashboard configuration. I then configure general settings such as providing a Dashboard name, Status, and the optional Tags. Then, I choose Next.


Next, I define the scope of the dashboard by selecting Include all Regions and Include all buckets and specifying the Regions and buckets to be included.


I opt in to the Advanced tier in the Storage Lens dashboard configuration, select Performance metrics, then choose Next.


Next, I select Prefix aggregation as an additional metrics aggregation, then leave the rest of the information as default before I choose Next.


I select the Default metrics report, then General purpose bucket as the bucket type, and then select the Amazon S3 bucket in my AWS account as the Destination bucket. I leave the rest of the information as default, then select Next.


I review all the information before I choose Submit to finalize the process.


After it’s enabled, I’ll receive daily performance metrics directly in the Storage Lens console dashboard. You can also choose to export report in CSV or Parquet format to any bucket in your account or publish to Amazon CloudWatch. The performance metrics are aggregated and published daily and will be available at multiple levels: organization, account, bucket, and prefix. In this dropdown menu, I choose the % concurrent PUT 503 error for the Metric, Last 30 days for the Date range, and 10 for the Top N buckets.


The Concurrent PUT 503 error count metric tracks the number of 503 errors generated by simultaneous PUT operations to the same object. Throttling errors can degrade application performance. For a single writer, modify retry behavior or use higher performance storage tier such as Amazon S3 Express One Zone to mitigate concurrent PUT 503 errors. For multiple writers scenario, use a consensus mechanism to avoid concurrent PUT 503 errors or use higher performance storage tier such as Amazon S3 Express One Zone.

Complete analytics for all prefixes in your S3 buckets
S3 Storage Lens now supports analytics for all prefixes in your S3 buckets through a new Expanded prefixes metrics report. This capability removes previous limitations that restricted analysis to prefixes meeting a 1% size threshold and a maximum depth of 10 levels. You can now track up to billions of prefixes per bucket for analysis at the most granular prefix level, regardless of size or depth.

The Expanded prefixes metrics report includes all existing S3 Storage Lens metric categories: storage usage, activity metrics (requests and bytes transferred), data protection metrics, and detailed status code metrics.

How to get started
I follow the same steps outlined in the How it works section to create or update the Storage Lens dashboard. In Step 4 on the console, where you select export options, you can select the new Expanded prefixes metrics report. Thereafter, I can export the expanded prefixes metrics report in CSV or Parquet format to any general purpose bucket in my account for efficient querying of my Storage Lens data.


Good to know
This enhancement addresses scenarios where organizations need granular visibility across their entire prefix structure. For example, you can identify prefixes with incomplete multipart uploads to reduce costs, track compliance across your entire prefix structure for encryption and replication requirements, and detect performance issues at the most granular level.

Export S3 Storage Lens metrics to S3 Tables
S3 Storage Lens metrics can now be automatically exported to S3 Tables, a fully managed feature on AWS with built-in Apache Iceberg support. This integration provides daily automatic delivery of metrics to AWS managed S3 Tables for immediate querying without requiring additional processing infrastructure.

How to get started
I start by following the process outlined in Step 5 on the console, where I choose the export destination. This time, I choose Expanded prefixes metrics report. In addition to General purpose bucket, I choose Table bucket.

The new Storage Lens metrics are exported to new tables in an AWS managed bucket aws-s3.


I select the expanded_prefixes_activity_metrics table to view API usage metrics for expanded prefix reports.


I can preview the table on the Amazon S3 console or use Amazon Athena to query the table.


Good to know
S3 Tables integration with S3 Storage Lens simplifies metric analysis using familiar SQL tools and AWS analytics services such as Amazon Athena, Amazon QuickSight, Amazon EMR, and Amazon Redshift, without requiring a data pipeline. The metrics are automatically organized for optimal querying, with custom retention and encryption options to suit your needs.

This integration enables cross-account and cross-Region analysis, custom dashboard creation, and data correlation with other AWS services. For example, you can combine Storage Lens metrics with S3 Metadata to analyze prefix-level activity patterns and identify objects in prefixes with cold data that are eligible for transition to lower-cost storage tiers.

For your agentic AI workflows, you can use natural language to query S3 Storage Lens metrics in S3 Tables with the S3 Tables MCP Server. Agents can ask questions such as ‘which buckets grew the most last month?’ or ‘show me storage costs by storage class’ and get instant insights from your observability data.

Now available
All three enhancements are available in all AWS Regions where S3 Storage Lens is currently offered (except the China Regions and AWS GovCloud (US)).

These features are included in the Amazon S3 Storage Lens Advanced tier at no additional charge beyond standard advanced tier pricing. For the S3 Tables export, you pay only for S3 Tables storage, maintenance, and queries. There is no additional charge for the export functionality itself.

To learn more about Amazon S3 Storage Lens performance metrics, support for billions of prefixes, and export to S3 Tables, refer to the Amazon S3 user guide. For pricing details, visit the Amazon S3 pricing page.

Veliswa Boya.

Run Apache Spark and Iceberg 4.5x faster than open source Spark with Amazon EMR

Post Syndicated from Atul Payapilly original https://aws.amazon.com/blogs/big-data/run-apache-spark-and-iceberg-4-5x-faster-than-open-source-spark-with-amazon-emr/

This post shows how Amazon EMR 7.12 can make your Apache Spark and Iceberg workloads up to 4.5x faster performance.

The Amazon EMR runtime for Apache Spark provides a high-performance runtime environment with full API compatibility with open source Apache Spark and Apache Iceberg. Amazon EMR on EC2, Amazon EMR Serverless, Amazon EMR on Amazon EKS, Amazon EMR on AWS Outposts and AWS Glue use the optimized runtimes.

Our benchmarks show Amazon EMR 7.12 runs TPC-DS 3 TB workloads 4.5x faster than open source Spark 3.5.6 with Iceberg 1.10.0.

Performance improvements include optimizations for metadata caching, parallel I/O, adaptive query planning, data type handling, and fault tolerance. There were also some Iceberg specific regressions around data scans that we identified and fixed.

These optimizations let you match Parquet performance on Amazon EMR while keeping the key features of Iceberg key features: ACID transactions, time travel, and schema evolution.

Benchmark results compared to open source

To assess the performance of the Spark engine with the Iceberg table format, we performed benchmark tests using the 3 TB TPC-DS dataset, version 2.13, a popular industry standard benchmark. Benchmark tests for the Amazon EMR runtime for Apache Spark and Apache Iceberg were conducted on Amazon EMR 7.12 EC2 clusters compared to open source Apache Spark 3.5.6 and Apache Iceberg 1.10.0 on EC2 clusters.

Note: Our results derived from the TPC-DS dataset are not directly comparable to the official TPC-DS results due to setup differences.

The setup instructions and technical details are available in our GitHub repository. To minimize the influence of external catalogs like AWS Glue and Hive, we used the Hadoop catalog for the Iceberg tables. This uses the underlying file system, specifically Amazon S3, as the catalog. We can define this setup by configuring the property spark.sql.catalog.<catalog_name>.type. The fact tables used the default partitioning by the date column, which vary from 200–2,100 partitions. No precalculated statistics were used for these tables.

We ran a total of 104 SparkSQL queries in 3 sequential rounds, and the average runtime of each query across these rounds was taken for comparison. The average runtime for the 3 rounds on Amazon EMR 7.12 with Iceberg enabled was 0.37 hours, demonstrating a 4.5x speed increase compared to open source Spark 3.5.6 and Iceberg 1.10.0. The following figure presents the total runtimes in seconds.

The following table summarizes the metrics.

Metric Amazon EMR 7.12 on EC2 Amazon EMR 7.5 on EC2 Open source Apache Spark 3.5.6 and Apache Iceberg 1.10.0
Average runtime in seconds 1349.62 1535.62 6113.92
Geometric mean over queries in seconds 7.45910 8.30046 22.31854
Cost* $4.81 $5.47 $17.65

*Detailed cost estimates are discussed later in this post.

The following chart demonstrates the per-query performance improvement of Amazon EMR 7.12 relative to open source Spark 3.5.6 and Iceberg 1.10.0. The extent of the speedup varies from one query to another, with the fastest up to 13.6x faster for q23b, with Amazon EMR outperforming open source Spark with Iceberg tables. The horizontal axis arranges the TPC-DS 3TB benchmark queries in descending order based on the performance improvement seen with Amazon EMR, and the vertical axis depicts the magnitude of this speedup as a ratio.

Cost comparison breakdown

Our benchmark provides the total runtime and geometric mean data to assess the performance of Spark and Iceberg in a complex, real-world decision support scenario. For additional insights, we also examine the cost aspect. We calculate cost estimates using formulas that account for EC2 On-Demand instances, Amazon Elastic Block Store (Amazon EBS), and Amazon EMR expenses.

  • Amazon EC2 cost (includes SSD cost) = number of instances * r5d.4xlarge hourly rate * job runtime in hours
    • 4xlarge hourly rate = $1.152 per hour
  • Root Amazon EBS cost = number of instances * Amazon EBS per GB-hourly rate * root EBS volume size * job runtime in hours
  • Amazon EMR cost = number of instances * r5d.4xlarge Amazon EMR cost * job runtime in hours
    • 4xlarge Amazon EMR cost = $0.27 per hour
  • Total cost = Amazon EC2 cost + root Amazon EBS cost + Amazon EMR cost

The calculations reveal that the Amazon EMR 7.12 benchmark yields a 3.6x cost efficiency improvement over open source Spark 3.5.6 and Iceberg 1.10.0 in running the benchmark job.

Metric Amazon EMR 7.12 Amazon EMR 7.5 Open source Apache Spark 3.5.6 and Apache Iceberg 1.10.0
Runtime in seconds 1349.62 1535.62 6113.92

Number of EC2 instances

(Includes primary node)

9 9 9
Amazon EBS Size 20gb 20gb 20gb

Amazon EC2

(Total runtime cost)

$3.89 $4.42 $17.61
Amazon EBS cost $0.01 $0.01 $0.04
Amazon EMR cost $0.91 $1.04 $0
Total cost $4.81 $5.47 $17.65
Cost savings Amazon EMR 7.12 is 3.6x better Amazon EMR 7.5 is 3.2x better Baseline

In addition to the time-based metrics discussed so far, data from Spark event logs show that Amazon EMR scanned approximately 4.3x less data from Amazon S3 and 5.3x fewer records than the open source version in the TPC-DS 3 TB benchmark. This reduction in Amazon S3 data scanning contributes directly to cost savings for Amazon EMR workloads.

Run open source Apache Spark benchmarks on Apache Iceberg tables

We used separate EC2 clusters, each equipped with 9 r5d.4xlarge instances, for testing both open source Spark 3.5.6 and Amazon EMR 7.12 for Iceberg workload. The primary node was equipped with 16 vCPU and 128 GB of memory, and the 8 worker nodes together had 128 vCPU and 1024 GB of memory. We conducted tests using the Amazon EMR default settings to showcase the typical user experience and minimally adjusted the settings of Spark and Iceberg to maintain a balanced comparison.

The following table summarizes the Amazon EC2 configurations for the primary node and 8 worker nodes of type r5d.4xlarge.

EC2 Instance vCPU Memory (GiB) Instance storage (GB) EBS root volume (GB)
r5d.4xlarge 16 128 2 x 300 NVMe SSD 20 GB

Prerequisites

The following prerequisites are required to run the benchmarking:

  1. Using the instructions in the emr-spark-benchmark GitHub repository, set up the TPC-DS source data in your S3 bucket and on your local computer.
  2. Build the benchmark application following the steps provided in Steps to build spark-benchmark-assembly application and copy the benchmark application to your S3 bucket. Alternatively, copy spark-benchmark-assembly-3.5.6.jar to your S3 bucket.
  3. Create Iceberg tables from the TPC-DS source data. Follow the instructions on GitHub to create Iceberg tables using the Hadoop catalog. For example, the following code uses an Amazon EMR 7.12 cluster with Iceberg enabled to create the tables:
aws emr add-steps --cluster-id <cluster-id> --steps Type=Spark,Name="Create Iceberg Tables",
Args=[--class,com.amazonaws.eks.tpcds.CreateIcebergTables,--conf,spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,
--conf,spark.sql.catalog.hadoop_catalog=org.apache.iceberg.spark.SparkCatalog,
--conf,spark.sql.catalog.hadoop_catalog.type=hadoop,
--conf,spark.sql.catalog.hadoop_catalog.warehouse=s3://<bucket>/<warehouse_path>/,
--conf,spark.sql.catalog.hadoop_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO,
s3://<bucket>/<jar_location>/spark-benchmark-assembly-3.5.6.jar,s3://blogpost-sparkoneks-us-east-1/blog/BLOG_TPCDS-TEST-3T-partitioned/,
/home/hadoop/tpcds-kit/tools,parquet,3000,true,<database_name>,true,true],ActionOnFailure=CONTINUE --region <AWS region>

Note: The Hadoop catalog warehouse location and database name from the preceding step. We use the same Iceberg tables to run benchmarks with Amazon EMR 7.12 and open source Spark.

This benchmark application is built from the branch tpcds-v2.13_iceberg. If you’re building a new benchmark application, switch to the correct branch after downloading the source code from the GitHub repository.

Create and configure a YARN cluster on Amazon EC2

To compare Iceberg performance between Amazon EMR on Amazon EC2 and open source Spark on Amazon EC2, follow the instructions in the emr-spark-benchmark GitHub repository to create an open source Spark cluster on Amazon EC2 using Flintrock with 8 worker nodes.

Based on the cluster selection for this test, the following configurations are used:

Make sure to replace the placeholder <private ip of primary node>, in the yarn-site.xml file, with the primary node’s IP address of your Flintrock cluster.

Run the TPC-DS benchmark with Apache Spark 3.5.6 and Apache Iceberg 1.10.0

Complete the following steps to run the TPC-DS benchmark:

  1. Log in to the open source cluster primary node using flintrock login $CLUSTER_NAME.
  2. Submit your Spark job:
    1. Choose the correct Iceberg catalog warehouse location and database that has the created Iceberg tables.
    2. The results are created in s3://<YOUR_S3_BUCKET>/benchmark_run.
    3. You can track progress in /media/ephemeral0/spark_run.log.
spark-submit \
--master yarn \
--deploy-mode client \
--class com.amazonaws.eks.tpcds.BenchmarkSQL \
--conf spark.driver.cores=4 \
--conf spark.driver.memory=10g \
--conf spark.executor.cores=16 \
--conf spark.executor.memory=100g \
--conf spark.executor.instances=8 \
--conf spark.network.timeout=2000 \
--conf spark.executor.heartbeatInterval=300s \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.shuffle.service.enabled=false \
--conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.InstanceProfileCredentialsProvider \
--conf spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.jars.packages=org.apache.hadoop:hadoop-aws:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.10.0,org.apache.iceberg:iceberg-aws-bundle:1.10.0 \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions   \
--conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog    \
--conf spark.sql.catalog.local.type=hadoop  \
--conf spark.sql.catalog.local.warehouse=s3a://<YOUR_S3_BUCKET>/<warehouse_path>/ \
--conf spark.sql.defaultCatalog=local   \
--conf spark.sql.catalog.local.io-impl=org.apache.iceberg.aws.s3.S3FileIO   \
spark-benchmark-assembly-3.5.6.jar   \
s3://<YOUR_S3_BUCKET>/benchmark_run 3000 1 false  \
q1-v2.13,q10-v2.13,q11-v2.13,q12-v2.13,q13-v2.13,q14a-v2.13,q14b-v2.13,q15-v2.13,q16-v2.13,\
q17-v2.13,q18-v2.13,q19-v2.13,q2-v2.13,q20-v2.13,q21-v2.13,q22-v2.13,q23a-v2.13,q23b-v2.13,\
q24a-v2.13,q24b-v2.13,q25-v2.13,q26-v2.13,q27-v2.13,q28-v2.13,q29-v2.13,q3-v2.13,q30-v2.13,\
q31-v2.13,q32-v2.13,q33-v2.13,q34-v2.13,q35-v2.13,q36-v2.13,q37-v2.13,q38-v2.13,q39a-v2.13,\
q39b-v2.13,q4-v2.13,q40-v2.13,q41-v2.13,q42-v2.13,q43-v2.13,q44-v2.13,q45-v2.13,q46-v2.13,\
q47-v2.13,q48-v2.13,q49-v2.13,q5-v2.13,q50-v2.13,q51-v2.13,q52-v2.13,q53-v2.13,q54-v2.13,\
q55-v2.13,q56-v2.13,q57-v2.13,q58-v2.13,q59-v2.13,q6-v2.13,q60-v2.13,q61-v2.13,q62-v2.13,\
q63-v2.13,q64-v2.13,q65-v2.13,q66-v2.13,q67-v2.13,q68-v2.13,q69-v2.13,q7-v2.13,q70-v2.13,\
q71-v2.13,q72-v2.13,q73-v2.13,q74-v2.13,q75-v2.13,q76-v2.13,q77-v2.13,q78-v2.13,q79-v2.13,\
q8-v2.13,q80-v2.13,q81-v2.13,q82-v2.13,q83-v2.13,q84-v2.13,q85-v2.13,q86-v2.13,q87-v2.13,\
q88-v2.13,q89-v2.13,q9-v2.13,q90-v2.13,q91-v2.13,q92-v2.13,q93-v2.13,q94-v2.13,q95-v2.13,\
q96-v2.13,q97-v2.13,q98-v2.13,q99-v2.13,ss_max-v2.13    \
true <database> > /media/ephemeral0/spark_run.log 2>&1 &!

Summarize the results

After the Spark job finishes, retrieve the test result file from the output S3 bucket at s3://<YOUR_S3_BUCKET>/benchmark_run/timestamp=xxxx/summary.csv/xxx.csv. This can be done either through the Amazon S3 console by navigating to the specified bucket location or by using the Amazon Command Line Interface (AWS CLI). The Spark benchmark application organizes the data by creating a timestamp folder and placing a summary file within a folder labeled summary.csv. The output CSV files contain 4 columns without headers:

  • Query name
  • Median time
  • Minimum time
  • Maximum time

With the data from 3 separate test runs with 1 iteration each time, we can calculate the average and geometric mean of the benchmark runtimes.

Run the TPC-DS benchmark with Amazon EMR runtime for Apache Spark

Most of the instructions are similar to Steps to run Spark Benchmarking with a few Iceberg-specific details.

Prerequisites

Complete the following prerequisite steps:

  1. Run aws configure to configure the AWS CLI shell to point to the benchmarking AWS account. Refer to Configure the AWS CLI for instructions.
  2. Upload the benchmark application JAR file to Amazon S3.

Deploy Amazon EMR cluster and run the benchmark job

Complete the following steps to run the benchmark job:

  1. Use the AWS CLI command as shown in Deploy EMR on EC2 Cluster and run benchmark job to deploy an Amazon EMR on EC2 cluster. Make sure to enable Iceberg. See Create an Iceberg cluster for more details. Choose the correct Amazon EMR version, root volume size, and same resource configuration as the open source Flintrock setup. Refer to create-cluster for a detailed description of the AWS CLI options.
  2. Store the cluster ID from the response. We need this for the next step.
  3. Submit the benchmark job in Amazon EMR using add-steps from the AWS CLI:
    1. Replace <cluster ID> with the cluster ID from Step 2.
    2. The benchmark application is at s3://<your-bucket>/spark-benchmark-assembly-3.5.6.jar.
    3. Choose the correct Iceberg catalog warehouse location and database that has the created Iceberg tables. This should be the same as the one used for the open source TPC-DS benchmark run.
    4. The results will be in s3://<your-bucket>/benchmark_run.
aws emr add-steps   --cluster-id <cluster-id>
--steps Type=Spark,Name="SPARK Iceberg EMR TPCDS Benchmark Job",
Args=[--class,com.amazonaws.eks.tpcds.BenchmarkSQL,
--conf,spark.driver.cores=4,
--conf,spark.driver.memory=10g,
--conf,spark.executor.cores=16,
--conf,spark.executor.memory=100g,
--conf,spark.executor.instances=8,
--conf,spark.network.timeout=2000,
--conf,spark.executor.heartbeatInterval=300s,
--conf,spark.dynamicAllocation.enabled=false,
--conf,spark.shuffle.service.enabled=false,
--conf,spark.sql.iceberg.data-prefetch.enabled=true,
--conf,spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,
--conf,spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog,
--conf,spark.sql.catalog.local.type=hadoop,
--conf,spark.sql.catalog.local.warehouse=s3://<your-bucket>/<warehouse-path>,
--conf,spark.sql.defaultCatalog=local,
--conf,spark.sql.catalog.local.io-impl=org.apache.iceberg.aws.s3.S3FileIO,
s3://<your-bucket>/spark-benchmark-assembly-3.5.6.jar,
s3://<your-bucket>/benchmark_run,3000,1,false,
'q1-v2.13\,q10-v2.13\,q11-v2.13\,q12-v2.13\,q13-v2.13\,q14a-v2.13\,q14b-v2.13\,q15-v2.13\,q16-v2.13\,q17-v2.13\,q18-v2.13\,q19-v2.13\,q2-v2.13\,q20-v2.13\,q21-v2.13\,q22-v2.13\,q23a-v2.13\,q23b-v2.13\,q24a-v2.13\,q24b-v2.13\,q25-v2.13\,q26-v2.13\,q27-v2.13\,q28-v2.13\,q29-v2.13\,q3-v2.13\,q30-v2.13\,q31-v2.13\,q32-v2.13\,q33-v2.13\,q34-v2.13\,q35-v2.13\,q36-v2.13\,q37-v2.13\,q38-v2.13\,q39a-v2.13\,q39b-v2.13\,q4-v2.13\,q40-v2.13\,q41-v2.13\,q42-v2.13\,q43-v2.13\,q44-v2.13\,q45-v2.13\,q46-v2.13\,q47-v2.13\,q48-v2.13\,q49-v2.13\,q5-v2.13\,q50-v2.13\,q51-v2.13\,q52-v2.13\,q53-v2.13\,q54-v2.13\,q55-v2.13\,q56-v2.13\,q57-v2.13\,q58-v2.13\,q59-v2.13\,q6-v2.13\,q60-v2.13\,q61-v2.13\,q62-v2.13\,q63-v2.13\,q64-v2.13\,q65-v2.13\,q66-v2.13\,q67-v2.13\,q68-v2.13\,q69-v2.13\,q7-v2.13\,q70-v2.13\,q71-v2.13\,q72-v2.13\,q73-v2.13\,q74-v2.13\,q75-v2.13\,q76-v2.13\,q77-v2.13\,q78-v2.13\,q79-v2.13\,q8-v2.13\,q80-v2.13\,q81-v2.13\,q82-v2.13\,q83-v2.13\,q84-v2.13\,q85-v2.13\,q86-v2.13\,q87-v2.13\,q88-v2.13\,q89-v2.13\,q9-v2.13\,q90-v2.13\,q91-v2.13\,q92-v2.13\,q93-v2.13\,q94-v2.13\,q95-v2.13\,q96-v2.13\,q97-v2.13\,q98-v2.13\,q99-v2.13\,ss_max-v2.13',
true,<database>],ActionOnFailure=CONTINUE --region <aws-region>

Summarize the results

After the step is complete, you can see the summarized benchmark result at s3://<YOUR_S3_BUCKET>/benchmark_run/timestamp=xxxx/summary.csv/xxx.csv in the same way as the previous run and compute the average and geometric mean of the query runtimes.

Clean up

To help prevent future charges, delete the resources you created by following the instructions provided in the Cleanup section of the GitHub repository.

Summary

Amazon EMR optimizes the runtime for Spark when used with Iceberg tables, achieving 4.5x faster performance than open source Apache Spark 3.5.6 and Apache Iceberg 1.10.0 with Amazon EMR 7.12 on TPC-DS 3 TB, v2.13. This represents a significant advancement from Amazon EMR 7.5, which delivered 3.6x faster performance and closes the gap to parquet performance on Amazon EMR so customers can use the benefits of Iceberg without a performance penalty.

We encourage you to keep up to date with the latest Amazon EMR releases to fully benefit from ongoing performance improvements.

To stay informed, subscribe to the RSS feed for the AWS Big Data Blog, where you can find updates on the Amazon EMR runtime for Spark and Iceberg, as well as tips on configuration best practices and tuning recommendations.


About the authors

Atul Felix Payapilly is a software development engineer for Amazon EMR at Amazon Web Services.

Akshaya KP is a software development engineer for Amazon EMR at Amazon Web Services.

Hari Kishore Chaparala is a software development engineer for Amazon EMR at Amazon Web Services.

Giovanni Matteo is the Senior Manager for the Amazon EMR Spark and Iceberg group.

Apache Spark encryption performance improvement with Amazon EMR 7.9

Post Syndicated from Sonu Kumar Singh original https://aws.amazon.com/blogs/big-data/apache-spark-encryption-performance-improvement-with-amazon-emr-7-9/

The Amazon EMR runtime for Apache Spark is a performance-optimized runtime for Apache Spark that is 100% API compatible with open source Apache Spark. With Amazon EMR release 7.9.0, the EMR runtime for Apache Spark introduces significant performance improvements for encrypted workloads, supporting Spark version 3.5.5.

For compliance and security requirements, many customers need to enable Apache Spark’s local storage encryption (spark.io.encryption.enabled = true) in addition to Amazon Simple Storage Service (Amazon S3) encryption (such as server-side encryption (SSE) or AWS Key Management Service (AWS KMS)). This feature encrypts shuffle files, cached data, and other intermediate data written to local disk during Spark operations, protecting sensitive data at rest on Amazon EMR cluster instances.

Industries subject to regulations such as the Health Insurance Portability and Accountability Act (HIPAA) for healthcare, Payment Card Industry Data Security Standard (PCI-DSS) for financial services, General Data Protection Regulation (GDPR) for personal data, and Federal Risk and Authorization Management Program (FedRAMP) for government often require encryption of all data at rest, including temporary files on local storage. While Amazon S3 encryption protects data in object storage, Spark’s I/O encryption secures the intermediate shuffle and spill data that Spark writes to local disk during distributed processing—data that never reaches Amazon S3 but might contain sensitive information extracted from source datasets. Generally, encrypted operations require additional computational overhead that can impact overall job performance.

With the built-in encryption optimizations of Amazon EMR 7.9.0, customers might see significant performance improvements in their Apache Spark applications without requiring any application changes. In our performance benchmark tests, derived from TPC-DS performance tests at 3 TB scale, we observed up to 20% faster performance with the EMR 7.9 optimized Spark runtime compared to Spark without these optimizations. Individual results may vary depending on specific workloads and configurations.

In this post, we analyze the results from our benchmark tests comparing the Amazon EMR 7.9 optimized Spark runtime against Spark 3.5.5 without encryption optimizations. We walk through a detailed cost analysis and provide step-by-step instructions to reproduce the benchmark.

Results observed

To evaluate the performance improvements, we used an open source Spark performance test utility derived from the TPC-DS performance test toolkit. We ran the tests on two nine-node (eight core nodes and one primary node) r5d.4xlarge Amazon EMR 7.9.0 clusters, comparing two configurations:

  • Baseline: EMR 7.9.0 cluster with a bootstrap action installing Spark 3.5.5 without encryption optimizations
  • Optimized: EMR 7.9.0 cluster using the EMR Spark 3.5.5 runtime with encryption optimizations

Both tests used data stored in Amazon Simple Storage Service (Amazon S3). All data processing was configured identically except for the Spark runtime version.

To maintain benchmarking consistency and ensure a consistent, equivalent comparison, we disabled Dynamic Resource Allocation (DRA) in both test configurations. This approach eliminates variability from dynamic scaling and so we can measure pure computational performance improvements.

The following table shows the total job runtime for all queries (in seconds) in the 3 TB query dataset between the baseline and Amazon EMR 7.9 optimized configurations:

Configuration Total runtime (seconds) Geometric mean (seconds) Performance improvement
Baseline (Spark 3.5.5 without optimization) 1,485 10.24
EMR 7.9 (with encryption optimization) 1,176 8.15 20% faster

We observed that our TPC-DS tests with the Amazon EMR 7.9 optimized Spark runtime completed about 20% faster based on total runtime and 20% faster based on geometric mean compared to the baseline configuration.

The encryption optimizations in Amazon EMR 7.9 deliver performance benefits through:

  • Improved shuffle and decryption operations reducing overhead during data exchange without compromising security
  • Better memory management for intermediate results

Cost analysis

The performance improvements of the Amazon EMR 7.9 optimized Spark runtime directly translate to lower costs. We realized an approximately 20% cost savings running the benchmark application with encryption optimizations compared to the baseline configuration, because of reduced hours of EMR, Amazon Elastic Compute Cloud (Amazon EC2) and Amazon Elastic Block Store (Amazon EBS) using General Purpose SSD (gp2).

The following table summarizes the cost comparison in the us-east-1 AWS Region:

Configuration Runtime (hours) Estimated cost Total EC2 instances Total vCPU Total memory (GiB) Root device (EBS)
Baseline: Spark 3.5.5 without optimization, 1 primary and 8 core nodes 0.41 $5.28 9 144 1152 64 GiB gp2
Amazon EMR 7.9 with optimization, 1 primary and 8 core nodes 0.33 $4.25 9 144 1152 64 GiB gp2

Cost breakdown

Formulas used:

  • Amazon EMR cost – Number of instances × EMR hourly rate × Runtime hours
  • Amazon EC2 cost – Number of instances × EC2 hourly rate × Runtime hour)
  • Amazon EBS cost(EBS cost per GB per month ÷ hours in a month) × EBS volume size × number of instances × runtime hours

Note: EBS is priced monthly ($0.1 per GB per month), so we divide by 730 hours to convert to an hourly rate. EMR and EC2 are already priced hourly, so no conversion is needed.

Baseline configuration (0.41 hours):

  • Amazon EMR cost – 9 × $0.27 × 0.41 = $1.00
  • Amazon EC2 cost – 9 × $1.152 × 0.41 = $4.25
  • Amazon EBS cost – ($0.1/730 × 64 × 9 × 0.41) = $0.032
  • Total cost – $5.28

EMR 7.9 optimized configuration (0.33 hours):

  • Amazon EMR cost – (9 × $0.27 × 0.33) = $0.80
  • Amazon EC2 cost – (9 × $1.152 × 0.33) = $3.42
  • Amazon EBS cost – ($0.1/730 × 64 × 9 × 0.33) = $0.025
  • Total cost: $4.25

Total cost savings: 20% per benchmark run, which scales linearly with your production workload frequency.

Set up EMR benchmarking

For detailed instructions and scripts, see the companion GitHub repository.

Prerequisites

To set up Amazon EMR benchmarking, start by completing the following prerequisite steps:

  1. Configure your AWS Command Line Interface (AWS CLI) by running aws configure to point to your benchmarking account,
  2. Create an S3 bucket for test data and results.
  3. Copy the TPC-DS 3TB source data from a publicly available dataset to your S3 bucket using the following command:
    aws s3 cp s3://blogpost-sparkoneks-us-east-1/blog/BLOG_TPCDS-TEST-3T-partitioned s3://<YOUR-BUCKET-NAME>/BLOG_TPCDS-TEST-3T-partitioned --recursive

    Replace <YOUR-BUCKET-NAME> with the name of the S3 bucket you created in step 2.

  4. Build or download the benchmark application JAR file (spark-benchmark-assembly-3.3.0.jar)
  5. Ensure you have appropriate AWS Identity Access Management (IAM) roles for EMR cluster creation and Amazon S3 access

Deploy the baseline EMR cluster (without optimization)

Step 1: Launch EMR 7.9.0 cluster with bootstrap action

The baseline configuration uses a bootstrap action to install Spark 3.5.5 without encryption optimizations. We have made the bootstrap script publicly available in an S3 bucket for your convenience.

Create the default Amazon EMR roles:

aws emr create-default-roles

Now create the cluster:

aws emr create-cluster \
  --name "EMR-7.9-Baseline-Spark-3.5.5" \
  --release-label emr-7.9.0 \
  --applications Name=Spark \
  --ec2-attributes SubnetId=<YOUR-SUBNET-ID>,InstanceProfile=EMR_EC2_DefaultRole  \
  --service-role EMR_DefaultRole
  --instance-groups \
    InstanceGroupType=MASTER,InstanceCount=1,InstanceType=r5d.4xlarge \
    InstanceGroupType=CORE,InstanceCount=8,InstanceType=r5d.4xlarge \
  --bootstrap-actions \
    Path=s3://spark-ba/install-spark-3-5-5-no-encryption.sh,Name="install spark 3.5.5 without encryption optimization" \
  --use-default-roles \
  --log-uri s3://<YOUR-BUCKET-NAME>/logs/baseline/

Note: The bootstrap script is available in a public S3 bucket at s3://spark-ba/install-spark-3-5-5-no-encryption.sh. This script installs Apache Spark 3.5.5 without the encryption optimizations present in the Amazon EMR runtime.

Step 2: Submit the benchmark job to the baseline cluster

Next submit the Spark job using the following commands:

aws emr add-steps \
  --cluster-id <YOUR-BASELINE-CLUSTER-ID> \  
  --steps 'Type=Spark,Name="EMR-7.9-Baseline-Spark-3.5.5 Step",ActionOnFailure=CONTINUE,Args=["--deploy-mode","client","--conf","spark.io.encryption.enabled=false","--class","com.amazonaws.eks.tpcds.BenchmarkSQL","s3://<YOUR-BUCKET-NAME>/jar/spark-benchmark-assembly-3.3.0.jar","s3:// <YOUR-BUCKET-NAME>/blog/BLOG_TPCDS-TEST-3T-partitioned","s3:// <YOUR-BUCKET-NAME>/blog/BASELINE_TPCDS-TEST-3T-RESULT","/opt/tpcds-kit/tools","parquet","3000","3","false","q1-v2.4,q10-v2.4,q11-v2.4,q12-v2.4,q13-v2.4,q14a-v2.4,q14b-v2.4,q15-v2.4,q16-v2.4,q17-v2.4,q18-v2.4,q19-v2.4,q2-v2.4,q20-v2.4,q21-v2.4,q22-v2.4,q23a-v2.4,q23b-v2.4,q24a-v2.4,q24b-v2.4,q25-v2.4,q26-v2.4,q27-v2.4,q28-v2.4,q29-v2.4,q3-v2.4,q30-v2.4,q31-v2.4,q32-v2.4,q33-v2.4,q34-v2.4,q35-v2.4,q36-v2.4,q37-v2.4,q38-v2.4,q39a-v2.4,q39b-v2.4,q4-v2.4,q40-v2.4,q41-v2.4,q42-v2.4,q43-v2.4,q44-v2.4,q45-v2.4,q46-v2.4,q47-v2.4,q48-v2.4,q49-v2.4,q5-v2.4,q50-v2.4,q51-v2.4,q52-v2.4,q53-v2.4,q54-v2.4,q55-v2.4,q56-v2.4,q57-v2.4,q58-v2.4,q59-v2.4,q6-v2.4,q60-v2.4,q61-v2.4,q62-v2.4,q63-v2.4,q64-v2.4,q65-v2.4,q66-v2.4,q67-v2.4,q68-v2.4,q69-v2.4,q7-v2.4,q70-v2.4,q71-v2.4,q72-v2.4,q73-v2.4,q74-v2.4,q75-v2.4,q76-v2.4,q77-v2.4,q78-v2.4,q79-v2.4,q8-v2.4,q80-v2.4,q81-v2.4,q82-v2.4,q83-v2.4,q84-v2.4,q85-v2.4,q86-v2.4,q87-v2.4,q88-v2.4,q89-v2.4,q9-v2.4,q90-v2.4,q91-v2.4,q92-v2.4,q93-v2.4,q94-v2.4,q95-v2.4,q96-v2.4,q97-v2.4,q98-v2.4,q99-v2.4,ss_max-v2.4","true"]'

Deploy the optimized EMR cluster (with encryption optimization)

Step 1: Launch EMR 7.9.0 cluster with Spark runtime

The optimized configuration uses the EMR 7.9.0 Spark runtime without any bootstrap actions:

aws emr create-cluster \
  --name "EMR-7.9-Optimized-Native-Spark" \
  --release-label emr-7.9.0 \
  --applications Name=Spark \
  --ec2-attributes SubnetId=<YOUR-SUBNET-ID>,InstanceProfile=EMR_EC2_DefaultRole \
  --service-role EMR_DefaultRole
  --instance-groups \
    InstanceGroupType=MASTER,InstanceCount=1,InstanceType=r5d.4xlarge \
    InstanceGroupType=CORE,InstanceCount=8,InstanceType=r5d.4xlarge \
  --use-default-roles \
  --log-uri s3://<YOUR-BUCKET-NAME>/logs/optimized/

Example:

aws emr create-cluster \
--name "EMR-7.9-Optimized-Native-Spark" \
--release-label emr-7.9.0 \
--applications Name=Spark \
--ec2-attributes SubnetId=subnet-08a5f71f92bc8a801 \
--instance-groups \
InstanceGroupType=MASTER,InstanceCount=1,InstanceType=r5d.4xlarge \
InstanceGroupType=CORE,InstanceCount=8,InstanceType=r5d.4xlarge \
--bootstrap-actions \
Path=s3://spark-ba/install-spark-3-5-5-no-encryption.sh,Name="install spark 3.5.5 without encryption optimization" \
--use-default-roles \
--log-uri s3://aws-logs-123456789012-us-west-2/elasticmapreduce/

Step 2: Submit the benchmark job to optimized cluster

ext submit the Spark job using the following commands:

aws emr add-steps \
  --cluster-id <YOUR-OPTIMIZED-CLUSTER-ID> \ 
  --steps 'Type=Spark,Name="EMR-7.9-Optimized-Native-Spark Step",ActionOnFailure=CONTINUE,Args=["--deploy-mode","client","--conf","spark.io.encryption.enabled=true","--class","com.amazonaws.eks.tpcds.BenchmarkSQL","s3://<YOUR-BUCKET-NAME>/jar/spark-benchmark-assembly-3.3.0.jar","s3://<YOUR-BUCKET-NAME>/blog/BLOG_TPCDS-TEST-3T-partitioned","s3://<YOUR-BUCKET-NAME>/blog/BASELINE_TPCDS-TEST-3T-RESULT","/opt/tpcds-kit/tools","parquet","3000","3","false","q1-v2.4,q10-v2.4,q11-v2.4,q12-v2.4,q13-v2.4,q14a-v2.4,q14b-v2.4,q15-v2.4,q16-v2.4,q17-v2.4,q18-v2.4,q19-v2.4,q2-v2.4,q20-v2.4,q21-v2.4,q22-v2.4,q23a-v2.4,q23b-v2.4,q24a-v2.4,q24b-v2.4,q25-v2.4,q26-v2.4,q27-v2.4,q28-v2.4,q29-v2.4,q3-v2.4,q30-v2.4,q31-v2.4,q32-v2.4,q33-v2.4,q34-v2.4,q35-v2.4,q36-v2.4,q37-v2.4,q38-v2.4,q39a-v2.4,q39b-v2.4,q4-v2.4,q40-v2.4,q41-v2.4,q42-v2.4,q43-v2.4,q44-v2.4,q45-v2.4,q46-v2.4,q47-v2.4,q48-v2.4,q49-v2.4,q5-v2.4,q50-v2.4,q51-v2.4,q52-v2.4,q53-v2.4,q54-v2.4,q55-v2.4,q56-v2.4,q57-v2.4,q58-v2.4,q59-v2.4,q6-v2.4,q60-v2.4,q61-v2.4,q62-v2.4,q63-v2.4,q64-v2.4,q65-v2.4,q66-v2.4,q67-v2.4,q68-v2.4,q69-v2.4,q7-v2.4,q70-v2.4,q71-v2.4,q72-v2.4,q73-v2.4,q74-v2.4,q75-v2.4,q76-v2.4,q77-v2.4,q78-v2.4,q79-v2.4,q8-v2.4,q80-v2.4,q81-v2.4,q82-v2.4,q83-v2.4,q84-v2.4,q85-v2.4,q86-v2.4,q87-v2.4,q88-v2.4,q89-v2.4,q9-v2.4,q90-v2.4,q91-v2.4,q92-v2.4,q93-v2.4,q94-v2.4,q95-v2.4,q96-v2.4,q97-v2.4,q98-v2.4,q99-v2.4,ss_max-v2.4","true"]'

Benchmark command parameters explained

The Amazon EMR Spark step uses the following parameters:

  • EMR step configuration:
    • Type=Spark: Specifies this is a Spark application step
    • Name=”EMR-7.9-Baseline-Spark-3.5.5″: Human-readable name for the step
    • ActionOnFailure=CONTINUE: Continue with other steps if this one fails
  • Spark submit arguments:
    • –deploy-mode client: Run the driver on the master node (not cluster mode)
    • –class com.amazonaws.eks.tpcds.BenchmarkSQL: Main class for the TPC-DS benchmark
  • Application parameters:
    • JAR file: s3://<YOUR-BUCKET-NAME>/jar/spark-benchmark-assembly-3.3.0.jar
    • Input data: s3://<YOUR-BUCKET-NAME>/blog/BLOG_TPCDS-TEST-3T-partitioned (3 TB TPC-DS dataset)
    • Output location: s3://<YOUR-BUCKET-NAME>/blog/BASELINE_TPCDS-TEST-3T-RESULT (S3 path for results)
    • TPC-DS tools path: /opt/tpcds-kit/tools(local path on EMR nodes)
    • Format: parquet (output format)
    • Scale factor: 3000 (3 TB dataset size)
    • Iterations: 3 (run each query 3 times for averaging)
    • Collect results: false (don’t collect results to driver)
    • Query list: "q1-v2.4,q10-v2.4,...,ss_max-v2.4" (all 104 TPC-DS queries)
    • Final parameter: true (enable detailed logging and metrics)
  • Query coverage:
    • All 104 standard TPC-DS benchmark queries (q1-v2.4 through q99-v2.4)
    • Plus the ss_max-v2.4 query for additional testing
    • Each query runs 3 times to calculate average performance

Summarize the results

  1. Download the test result files from both output S3 locations:
    # Baseline results
    aws s3 cp s3://<YOUR-BUCKET-NAME>/blog/BASELINE_TPCDS-TEST-3T-RESULT/timestamp=xxxx/summary.csv/xxx.csv ./baseline-results.csv
       
    # Optimized results
    aws s3 cp s3://<YOUR-BUCKET-NAME>/blog/OPTIMIZED_TPCDS-TEST-3T-RESULT/timestamp=xxxx/summary.csv/xxx.csv ./optimized-results.csv

  2. The CSV files contain four columns (without headers):
    • Query name
    • Median time (seconds)
    • Minimum time (seconds)
    • Maximum time (seconds)
  3. Calculate performance metrics for comparison:
    • Average time per query: AVERAGE(median, min, max) for each query
    • Total runtime: Sum of all median times
    • Geometric mean: GEOMEAN(average times) across all queries
    • Speedup: Calculate the ratio between baseline and optimized for each query
  4. Create comparison analysis:Speedup = (Baseline Time - Optimized Time) / Baseline Time * 100%

Testing configuration details

The following table summarizes the test environment used for this post:

Parameter Value
EMR release emr-7.9.0 (both configurations)
Baseline Spark version 3.5.5 (installed through bootstrap action)
Baseline bootstrap script s3://spark-ba/install-spark-3-5-5-no-encryption.sh (public)
Optimized spark version Amazon EMR Spark runtime
Cluster size 9 nodes (1 primary and 8 core)
Instance type r5d.4xlarge
vCPUs per node 16
Memory per node 128 GB
Instance storage 600 GB SSD
EBS volume 64 GB gp2 (2 volumes per instance)
Total vCPUs 144 (9 × 16)
Total memory 1152 GB (9 × 128)
Dataset TPC-DS 3TB (Parquet format)
Queries 104 queries (TPC-DS v2.4)
Iterations 3 runs per query
DRA Disabled for consistent benchmarking

Clean up

To avoid incurring future charges, delete the resources you created:

  1. Terminate both EMR clusters:
    aws emr terminate-clusters --cluster-ids <YOUR-BASELINE-CLUSTER-ID> <YOUR-OPTIMIZED-CLUSTER-ID>

  2. Delete S3 test results if no longer needed:
    aws s3 rm s3://<YOUR-BUCKET-NAME>/blog/BASELINE_TPCDS-TEST-3T-RESULT/ --recursive
    aws s3 rm s3://<YOUR-BUCKET-NAME>/blog/OPTIMIZED_TPCDS-TEST-3T-RESULT/ --recursive
    aws s3 rm s3://<YOUR-BUCKET-NAME>/logs/ --recursive

  3. Remove IAM roles if created specifically for testing

Key findings

  • Up to 20% performance improvement using the Amazon EMR 7.9’s Spark runtime with no code changes required
  • 20% cost savings because of reduced runtime
  • Significant gains for shuffle-heavy, join-intensive workloads
  • 100% API compatibility with open source Apache Spark
  • Simple migration from custom Spark builds to EMR runtime
  • Easy benchmarking using publicly available bootstrap scripts

Conclusion

You can run your Apache Spark workloads up to 20% faster and at lower cost without making any changes to your applications by using the Amazon EMR 7.9.0 optimized Spark runtime. This improvement is achieved through numerous optimizations in the EMR Spark runtime, including enhanced encryption handling, improved data serialization, and optimized shuffle operations.

To learn more about Amazon EMR 7.9 and best practices, see the EMR documentation. For configuration guidance and tuning advice, subscribe to the AWS Big Data Blog.

Related resources:

If you’re running Spark workloads on Amazon EMR today, we encourage you to test the EMR 7.9 Spark runtime with your production workloads and measure the improvements specific to your use case.


About the authors

Sonu Kumar Singh

Sonu Kumar Singh

Sonu is a Senior Solutions Architect with more than 13 years of experience, with a specialization in Analytics and Healthcare domain. He has been instrumental in catalyzing transformative shifts in organizations by enabling data-driven decision-making thereby fueling innovation and growth. He enjoys it when something he designed or created brings a positive impact.

Roshin Babu

Roshin Babu

Roshin is a Sr. Specialist Solutions architect at AWS, where he collaborates with the sales team to support public sector clients. His role focuses on developing innovative solutions that solve complex business challenges while driving increased adoption of AWS analytics services. When he’s not working, Roshin is passionate about exploring new destinations, discovering great food, and enjoying soccer both as a player and fan.Polaris Jhandi

Polaris Jhandi

Polaris Jhandi

Polaris is a Cloud Application Architect with AWS Professional Services. He has a background in AI/ML and big data. He is currently working with customers to migrate their legacy mainframe applications to the AWS Cloud.Zheng Yuan

Zheng Yuan

Zheng Yuan

Zheng is a Software Engineer on the Amazon EMR Spark team, where he focuses on improving the performance of the Spark execution engine across various use cases.

Run Apache Spark and Apache Iceberg write jobs 2x faster with Amazon EMR

Post Syndicated from Atul Payapilly original https://aws.amazon.com/blogs/big-data/run-apache-spark-and-apache-iceberg-write-jobs-2x-faster-with-amazon-emr/

Amazon EMR runtime for Apache Spark offers a high-performance runtime environment while maintaining API compatibility with open source Apache Spark and Apache Iceberg table format. Amazon EMR on EC2, Amazon EMR Serverless, Amazon EMR on Amazon EKS, Amazon EMR on AWS Outposts and AWS Glue use the optimized runtimes.

In this post, we demonstrate the write performance benefits of using the Amazon EMR 7.12 runtime for Spark and Iceberg compares to open source Spark 3.5.6 with Iceberg 1.10.0 tables on a 3TB merge workload.

Write Benchmark Methodology

Our benchmarks demonstrate that Amazon EMR 7.12 can run 3TB merge workloads over 2 times faster than open source Spark 3.5.6 with Iceberg 1.10.0, delivering significant improvements for data ingestion and ETL pipelines while providing the advanced features of Iceberg including ACID transactions, time travel, and schema evolution.

Benchmark workload

To evaluate the write performance improvements in Amazon EMR 7.12, we chose a merge workload that reflects common data ingestion and ETL patterns. The benchmark consists of 37 basic merge operations on TPC-DS 3TB tables, testing the performance of INSERT, UPDATE, and DELETE operations. The workload is inspired by established benchmarking approaches from the open source community, including Delta Lake’s merge benchmark methodology and the LST-Bench framework. We combined and adapted these approaches to create a comprehensive test of Iceberg write performance on AWS. We also started with an initial focus on copy-on-write performance only.

Workload characteristics

The benchmark executes 37 basic sequential merge queries that modify TPC-DS fact tables. The 37 queries are organized into three categories:

  • Inserts (queries m1-m6): Adding new records to tables with varying data volumes. These queries use source tables with 5-100% new records and zero matches, testing pure insert performance at different scales.
  • Upserts (queries m8-m16): Modifying existing records while inserting new ones. These upsert operations combine different ratios of matched and non-matched records—for example, 1% matches with 10% inserts, or 99% matches with 1% inserts—representing typical scenarios where data is both updated and augmented.
  • Deletes (queries m7, m17-m37): Removing records with varying selectivity. These range from small, targeted deletes affecting 5% of files and rows to large-scale deletions, including partition-level deletes that can be optimized to metadata-only operations.

The queries operate on the table state created by previous operations, simulating real ETL pipelines where subsequent steps depend on earlier transformations. For example, the first six queries insert between 607,000 and 11.9 million records into the web_returns table. Later queries then update and delete from this modified table, testing read-after-write performance. Source tables were generated by sampling the TPC-DS web_returns table with controlled match/non-match ratios for consistent test conditions across the benchmark runs.

The merge operations vary in scale and complexity:

  • Small operations affecting 607,000 records
  • Large operations modifying over 12 million records
  • Selective deletes requiring file rewrites
  • Partition-level deletes optimized to metadata operations

Benchmark configuration

We ran the benchmark on identical hardware for both Amazon EMR 7.12 and open source Spark 3.5.6 with Iceberg 1.10.0:

  • Cluster: 9 r5d.4xlarge instances (1 primary, 8 workers)
  • Compute: 144 total vCPUs, 1,152 GB memory
  • Storage: 2 x 300 GB NVMe SSD per instance
  • Catalog: Hadoop Catalog
  • Data format: Parquet files on Amazon S3
  • Table format: Apache Iceberg (default: copy-on-write mode)

Benchmark results

We compared benchmark results for Amazon EMR 7.12 to open source Spark 3.5.6 and Iceberg 1.10.0. We ran the 37 merge queries in three sequential iterations, and the average runtime across these iterations was taken for comparison. The following table shows the results averaged across three iterations:

Amazon EMR 7.12 (seconds) Open Source Spark 3.5.6 + Iceberg 1.10.0 (seconds) Speedup
443.58 926.63 2.08x

The average runtime for the three iterations on Amazon EMR 7.12 with Iceberg enabled was 443.58 seconds, demonstrating a 2.08x speed increase compared to open source Spark 3.5.6 and Iceberg 1.10.0. The following figure presents the total runtimes in seconds.

The following table summarizes the metrics.

Metric Amazon EMR 7.12 on EC2 Open source Spark 3.5.6 and Iceberg 1.10.0
Average runtime in seconds 443.58 926.63
Geometric mean over queries in seconds 6.40746 18.50945
Cost* $1.58 $2.68

*Detailed cost estimates are discussed later in this post.

The following chart demonstrates the per-query performance improvement of Amazon EMR 7.12 relative to open source Spark 3.5.6 and Iceberg 1.10.0. The extent of the speedup varies from one query to another, with the fastest up to 13.3 times faster for query m31, with Amazon EMR outperforming open source Spark with Iceberg tables. The horizontal axis arranges the TPC-DS 3TB benchmark queries in descending order based on the performance improvement seen with Amazon EMR, and the vertical axis depicts the magnitude of this speedup as a ratio.

Performance optimizations in Amazon EMR

Amazon EMR 7.12 achieves over 2x faster write performance through systematic optimizations across the write execution pipeline. These improvements span multiple areas:

  • Metadata-only delete operations: When deleting entire partitions, EMR can now optimize these operations to metadata-only changes, eliminating the need to rewrite data files. This significantly reduces the time and cost for partition-level delete operations.
  • Bloom filter joins for merge operations: Enhanced join strategies using bloom filters reduce the amount of data that needs to be read and processed during merge operations, particularly benefiting queries with selective predicates.
  • Parallel file write out: Optimized parallelism during the write phase of merge operations improves throughput when writing filtered results back to Amazon S3, reducing overall merge operation time. We balanced the parallelism with read performance for overall optimized performance on the entire workload.

These optimizations work together to deliver consistent performance improvements across diverse write patterns. The result is significantly faster data ingestion and ETL pipeline execution while maintaining Iceberg’s ACID assurances and data consistency of Iceberg.

Cost comparison

Our benchmark provides the total runtime and geometric mean data to assess the performance of Spark and Iceberg in a complex, real-world decision support scenario. For additional insights, we also examine the cost aspect. We calculate cost estimates using formulas that account for EC2 On-Demand instances, Amazon Elastic Block Store (Amazon EBS), and Amazon EMR expenses.

  • Amazon EC2 cost (includes SSD cost) = number of instances * r5d.4xlarge hourly rate * job runtime in hours
    • 4xlarge hourly rate = $1.152 per hour
  • Root Amazon EBS cost = number of instances * Amazon EBS per GB-hourly rate * root EBS volume size * job runtime in hours
  • Amazon EMR cost = number of instances * r5d.4xlarge Amazon EMR cost * job runtime in hours
    • 4xlarge Amazon EMR cost = $0.27 per hour
  • Total cost = Amazon EC2 cost + root Amazon EBS cost + Amazon EMR cost

The calculations reveal that the Amazon EMR 7.12 benchmark yields a 1.7x cost efficiency improvement over open source Spark 3.5.6 and Iceberg 1.10.0 in running the benchmark job.

Metric Amazon EMR 7.12 Open source Spark 3.5.6 and Iceberg 1.10.0
Runtime in seconds 443.58 926.63
Number of EC2 instances(Includes primary node) 9 9
Amazon EBS Size 20gb 20gb
Amazon EC2(Total runtime cost) $1.28 $2.67
Amazon EBS cost $0.00 $0.01
Amazon EMR cost $0.30 $0
Total cost $1.58 $2.68
Cost savings Amazon EMR 7.12 is 1.7 times better Baseline

Run open source Spark benchmarks on Iceberg tables

We used separate EC2 clusters, each equipped with nine r5d.4xlarge instances, for testing both open source Spark 3.5.6 and Amazon EMR 7.12 for Iceberg workload. The primary node was equipped with 16 vCPU and 128 GB of memory, and the eight worker nodes together had 128 vCPU and 1024 GB of memory. We conducted tests using the Amazon EMR default settings to showcase the typical user experience and minimally adjusted the settings of Spark and Iceberg to maintain a balanced comparison.

The following table summarizes the Amazon EC2 configurations for the primary node and eight worker nodes of type r5d.4xlarge.

EC2 Instance vCPU Memory (GiB) Instance storage (GB) EBS root volume (GB)
r5d.4xlarge 16 128 2 x 300 NVMe SSD 20 GB

Benchmarking instructions

Follow the steps below to run the benchmark:

  1. For the open source run, create a Spark cluster on Amazon EC2 using Flintrock with the configuration described previously.
  2. Setup the TPC-DS source data with Iceberg in your S3 bucket.
  3. Build the benchmark application jar from the source to run the benchmarking and get the results.

Detailed instructions are provided in the emr-spark-benchmark GitHub repository.

Summarize the results

After the Spark job finishes, retrieve the test result file from the output S3 bucket at s3://<YOUR_S3_BUCKET>/benchmark_run/timestamp=xxxx/summary.csv/xxx.csv. This can be done either through the Amazon S3 console by navigating to the specified bucket location or by using the Amazon Command Line Interface (AWS CLI). The Spark benchmark application organizes the data by creating a timestamp folder and placing a summary file within a folder labeled summary.csv. The output CSV files contain four columns without headers:

  • Query name
  • Median time
  • Minimum time
  • Maximum time

With the data from three separate test runs with one iteration each time, we can calculate the average and geometric mean of the benchmark runtimes.

Clean up

To help prevent future charges, delete the resources you created by following the instructions provided in the Cleanup section of the GitHub repository.

Summary

Amazon EMR is consistently enhancing the EMR runtime for Spark when used with Iceberg tables, achieving write performance that is over 2 times faster than open source Spark 3.5.6 and Iceberg 1.10.0 with EMR 7.12 on 3TB merge workloads. This represents a significant improvement for data ingestion and ETL pipelines, helping to deliver 1.7x cost reduction while maintaining the ACID assurances of Iceberg. We encourage you to keep up to date with the latest Amazon EMR releases to fully benefit from ongoing performance improvements.

To stay informed, subscribe to the RSS feed for the AWS Big Data Blog, where you can find updates on the EMR runtime for Spark and Iceberg, as well as tips on configuration best practices and tuning recommendations.


About the authors

Atul Felix Payapilly is a software development engineer for Amazon EMR at Amazon Web Services.

Akshaya KP is a software development engineer for Amazon EMR at Amazon Web Services.

Hari Kishore Chaparala is a software development engineer for Amazon EMR at Amazon Web Services.

Giovanni Matteo is the Senior Manager for the Amazon EMR Spark and Iceberg group.

Accelerate data lake operations with Apache Iceberg V3 deletion vectors and row lineage

Post Syndicated from Ron Ortloff original https://aws.amazon.com/blogs/big-data/accelerate-data-lake-operations-with-apache-iceberg-v3-deletion-vectors-and-row-lineage/

Organizations building petabyte-scale data lakes face increasing challenges as their data grows. Batch updates and compliance deletes create a proliferation of positional delete files, slowing downstream data pipelines and driving up storage costs. Tracking data changes for audit trails and incremental processing requires custom, engine-specific implementations that add complexity and maintenance burden. As data volumes scale, these challenges compound, leaving data teams juggling custom solutions and increasing operational costs just to maintain data freshness and compliance.

Apache Iceberg V3 addresses these challenges with two new capabilities: deletion vectors and row lineage. AWS now delivers these capabilities across Apache Spark on Amazon EMR 7.12, AWS Glue, Amazon SageMaker notebooks, Amazon S3 Tables, and the AWS Glue Data Catalog, giving you a complete, integrated V3 experience without custom implementation. This means faster writes, lower storage costs, comprehensive audit trails, and efficient incremental processing, all working seamlessly across your entire data lake architecture.

In this post, we walk you through the new capabilities in Iceberg V3, explain how deletion vectors and row lineage address these challenges, explore real-world use cases across industries, and provide practical guidance on implementing Iceberg V3 features across AWS analytics, catalog, and storage services.

What’s new in Iceberg V3

Iceberg V3 introduces new capabilities and data types. Two key capabilities that address the challenges discussed earlier are deletion vectors and row lineage.

Deletion vectors replace positional delete files with an efficient binary format stored as Puffin files. Instead of creating separate delete files for each delete operation, the deletion vector consolidates these delete references to a single delete vector per data file, rather than a delete reference file per deleted row. During query execution, engines efficiently filter out deleted rows using these compact vectors, maintaining query performance while removing the need to merge multiple delete files.

This avoids write amplification from random batch updates and GDPR compliance deletes, significantly reducing the overhead of maintaining fresh data. High-frequency update workloads can see immediate improvements in write performance and reduced storage costs from fewer small delete files. Additionally, having fewer small delete files reduces table maintenance costs for compaction operations.

Row lineage enables precise change tracking at the row level with full auditability. Row lineage adds metadata fields to each data file that track when rows were created and last modified. The _row_id field uniquely identifies each row, and the _last_updated_sequence_number field tracks the snapshot when the row was last modified. These fields enable efficient change tracking queries without scanning entire tables, and they’re automatically maintained by the Iceberg specification without requiring custom code.

Before row lineage, change tracking in Iceberg provided only the net changes between snapshots, making it difficult to track individual record modifications. Row lineage metadata fields can now be queried to return all incremental changes, giving you full fidelity for auditing data modifications and regulatory compliance. For data transformations, your downstream systems can process changes incrementally, speeding up data pipelines and reducing compute costs for change data capture (CDC) workflows. Row lineage is engine agnostic, interoperable, and built into the Iceberg V3 specification, alleviating the need for custom, engine-specific change tracking implementations.

Real-world use cases

The new Iceberg V3 capabilities address critical challenges across multiple industries:

  • Marketing and advertising services organizations – You can now efficiently handle GDPR right-to-be-forgotten requests and regulatory compliance deletes without the write amplification that previously degraded pipeline performance. Row lineage provides complete audit trails for data modifications, meeting strict regulatory requirements for data governance.
  • Ecommerce platforms processing millions of product updates and inventory changes daily – You can maintain data freshness while reducing storage costs. Deletion vectors enable faster upsert operations, helping teams meet aggressive SLA requirements during peak shopping periods.
  • Healthcare and life sciences companies – You can track patient data modifications with precision for compliance purposes while efficiently processing large-scale genomic datasets. Row lineage provides the detailed change history required for clinical trial audits and regulatory submissions.
  • Media and entertainment providers managing large catalogs of user viewing data – You can efficiently process incremental changes for recommendation engines. Row lineage enables downstream analytics systems to process only changed records, reducing compute costs in incremental processing scenarios.

Get started with Iceberg V3

To take advantage of deletion vectors for optimized writes and row lineage for built-in change tracking in Iceberg V3, set the table property format-version = 3 during table creation. Alternatively, setting this property on an existing Iceberg V2 table atomically upgrades the table without data rewrites. Before creating or upgrading V3 tables, make sure the Iceberg engines in your solution are V3-compatible. Refer to Apache Iceberg V3 on AWS for more details.

Create a new V3 table with Apache Spark on Amazon EMR 7.12

The following code creates a new table named customer_data. Setting the table property format-version = 3 creates a V3 table. If the format-version table property is not explicitly set, a V2 table is created. V2 is currently the Iceberg default table version. Setting write.delete.mode, write.update.mode, and write.merge.mode to merge-on-read configures Spark to write deletion vectors for delete, update, or merge statements performed on the table.

CREATE TABLE customer_data (
customer_id bigint,
name string,
email string,
last_purchase timestamp,
total_spent decimal(10,2)
)
USING iceberg
TBLPROPERTIES (
'format-version' = '3',
'write.delete.mode' = 'merge-on-read',
'write.update.mode' = 'merge-on-read',
'write.merge.mode' = 'merge-on-read'
)

Run the following code to insert records into the customer_data table:

INSERT INTO customer_data VALUES
 (1, 'Alejandro Rosalez', '[email protected]', TIMESTAMP '2025-11-24 18:55:27', 42.97)
,(2, 'Akua Mansa', '[email protected]', TIMESTAMP '2025-11-24 17:55:27', 25.02)
,(3, 'Ana Carolina Silva','[email protected]', TIMESTAMP '2025-11-24 16:55:27', 43.67)
,(4, 'Arnav Desai','[email protected]', TIMESTAMP '2025-11-24 15:55:27', 98.32)
,(5, 'Carlos Salazar','[email protected]', TIMESTAMP '2025-11-24 12:55:27', 76.45)

Delete a record where customer_id = 5 to generate a delete file:

DELETE 
  FROM customer_data 
  WHERE customer_id = 5

Updating a record with the following update statement also generates a delete file:

UPDATE customer_data
  SET name = 'Mansa Akua' 
  WHERE customer_id = 2

The last part of this example queries the manifest’s metadata table to verify delete files were produced:

SELECT added_snapshot_id
      ,sum(added_delete_files_count) as added_delete_files_count 
FROM customer_data.manifests 
GROUP BY added_snapshot_id 
ORDER BY added_snapshot_id

This query will result in three records returned, as shown in the following screenshot. The added_delete_files_count for the first snapshot that inserts records should be 0. The next two snapshots for the corresponding delete and update statements should have 1 each for added_delete_files_count value.

Query row lineage for change tracking

Row lineage is automatically enabled on V3 tables. The following example includes row lineage metadata fields and an example of how to query table changes after a row lineage sequence number:

SELECT
customer_id,
name,
email,
_row_id,
_last_updated_sequence_number
FROM customer_data
WHERE _last_updated_sequence_number > 0
ORDER BY _last_updated_sequence_number, _row_id

Running this query after the previous insert, update, and delete statements returns four records, as shown in the following screenshot. The deleted record is removed. The _last_updated_sequence_number is 3 for the update to customer_id = 2.

Upgrade an existing V2 table

You can upgrade your existing V2 tables to V3 with the following command:

ALTER TABLE existing_customer_data
SET TBLPROPERTIES ('format-version' = '3')

When you upgrade a table from V2 to V3, several important operations occur atomically:

  • A new metadata snapshot is created atomically, resulting in no data loss.
  • Existing Parquet data files are reused without modification.
  • Row-lineage fields (_row_id and _last_updated_sequence_number) are added to the table metadata.
  • The next compaction operation will remove old V2 positional delete files. If new deletion vector files are generated before compaction runs, they will merge existing V2 positional delete files.
  • New modifications will automatically use V3’s deletion vector files.
  • The upgrade does not perform a historical backfill of row-lineage change tracking records.

The upgrade process is synchronous and completes in seconds for most tables. If the upgrade fails, an error message is returned immediately, and the table remains in its V2 state.

Getting the most from Iceberg V3

In this section, we share the key things we’ve learned from customers already using these features.

Know your workload pattern

Deletion vectors work best when you’re doing lots of writes, such as high-frequency updates, batch deletes, or CDC workloads making random non-append-only updates. If you’re writing more than you’re reading, deletion vectors will deliver immediate performance gains. To unlock these benefits, set your table to merge-on-read mode for delete, update, and merge operations.

Let AWS handle compaction

Enable automatic compaction through the Data Catalog or use S3 Tables (on by default). You will get hands-free optimization without building custom maintenance jobs. Deletion vectors produce fewer delete files than positional deletes in Iceberg V2. Given a similar pattern and amount of modified records, V3 compaction should be quicker and cost less than V2.

Understand the importance of row lineage when using the V2 changelog

With the Spark changelog procedure in Iceberg V2, if a row gets inserted and then deleted between snapshots, it disappears from your change feed—you never see it. Iceberg V3 row lineage captures both operations because _last_updated_sequence_number updates on each modification. This full fidelity is important for audit trails and regulatory compliance where you need to prove what happened to every record. Performance-wise, the V2 changelog requires scanning and merging delete files to compute changes—that’s compute you’re paying for on every read. V3 row lineage stores metadata fields directly on each row, so filtering by _last_updated_sequence_number is a simple metadata scan.

Test before you upgrade

Iceberg V3 upgrades are atomic and fast, but test in dev first. Make sure all your query engines support Iceberg V3 before upgrading shared tables—mixing V2 and V3 engines causes headaches. After upgrading, keep a few V2 snapshots around temporarily for time-travel queries while you validate performance.

Conclusion

Iceberg V3 support across AWS analytics, catalog, and storage services marks a significant advancement in data lake capabilities. By combining deletion vectors’ write optimization with row lineage’s comprehensive change tracking, you can build more efficient, auditable, and cost-effective data lakes at scale. The seamless interoperability across AWS services makes sure your data lake architecture remains flexible and future-proof.

To learn more about AWS support for Iceberg V3, refer to Using Apache Iceberg on AWS.

To learn more about building modern data lakes with Iceberg on AWS, refer to Analytics on AWS.


About the authors

Ron Ortloff

Ron Ortloff

Ron is a Principal Product Manager at AWS.

Visualize data lineage using Amazon SageMaker Catalog for Amazon EMR, AWS Glue, and Amazon Redshift

Post Syndicated from Shubham Purwar original https://aws.amazon.com/blogs/big-data/visualize-data-lineage-using-amazon-sagemaker-catalog-for-amazon-emr-aws-glue-and-amazon-redshift/

Amazon SageMaker offers a comprehensive hub that integrates data, analytics, and AI capabilities, providing a unified experience for users to access and work with their data. Through Amazon SageMaker Unified Studio, a single and unified environment, you can use a wide range of tools and features to support your data and AI development needs, including data processing, SQL analytics, model development, training, inference, and generative AI development. This offering is further enhanced by the integration of Amazon Q and Amazon SageMaker Catalog, which provide an embedded generative AI and governance experience, helping users work efficiently and effectively across the entire data and AI lifecycle, from data preparation to model deployment and monitoring.

With the SageMaker Catalog data lineage feature, you can visually track and understand the flow of your data across different systems and teams, gaining a complete picture of your data assets and how they’re connected. As an OpenLineage-compatible feature, it helps you trace data origins, track transformations, and view cross-organizational data consumption, giving you insights into cataloged assets, subscribers, and external activities. By capturing lineage events from OpenLineage-enabled systems or through APIs, you can gain a deeper understanding of your data’s journey, including activities within SageMaker Catalog and beyond, ultimately driving better data governance, quality, and collaboration across your organization.

Additionally, the SageMaker Catalog data lineage feature versions each event, so you can track changes, visualize historical lineage, and compare transformations over time. This provides valuable insights into data evolution, facilitating troubleshooting, auditing, and data integrity by showing exactly how data assets have evolved, and generates trust in data.

In this post, we discuss the visualization of data lineage in SageMaker Catalog and how capture lineage from different AWS analytics services such as AWS Glue, Amazon Redshift, and Amazon EMR Serverless automatically, and visualize it with SageMaker Unified Studio.

Solution overview

The generation of data lineage in SageMaker Catalog operates through an automated system that captures metadata and relationships between different data artifacts for AWS Glue, Amazon EMR, and Amazon Redshift. When data moves through various AWS services, SageMaker automatically tracks these movements, transformations, and dependencies, creating a detailed map of the data’s journey. This tracking includes information about data sources, transformations, processing steps, and final outputs, providing a complete audit trail of data movement and transformation.

The implementation of data lineage in SageMaker Catalog offers several key benefits:

  • Compliance and audit support – Organizations can demonstrate compliance with regulatory requirements by showing complete data provenance and transformation history
  • Impact analysis – Teams can assess the potential impact of changes to data sources or transformations by understanding dependencies and relationships in the data pipeline
  • Troubleshooting and debugging – When issues arise, the lineage system helps identify the root cause by showing the complete path of data transformation and processing
  • Data quality management – By tracking transformations and dependencies, organizations can better maintain data quality and understand how data quality issues might propagate through their systems

Lineage capture is automated using several tools in SageMaker Unified Studio. To learn more, refer to Data lineage support matrix.

In the following sections, we show you how to configure your resources and implement the solution. For this post, we create the solution resources in the us-west-2 AWS Region using an AWS CloudFormation template.

Prerequisites

Before getting started, make sure you have the following:

Configure SageMaker Unified Studio with AWS CloudFormation

The vpc-analytics-lineage-sus.yaml stack creates a VPC, subnet, security group, IAM roles, NAT gateway, internet gateway, Amazon Elastic Compute Cloud (Amazon EC2) client, S3 buckets, SageMaker Unified Studio domain, and SageMaker Unified Studio project. To create the solution resources, complete the following steps:

  1. Launch the stack vpc-analytics-lineage-sus using the CloudFormation template:
  2. Provide the parameter values as listed in the following table.

    Parameters Sample value
    DatazoneS3Bucket s3://datazone-{account_id}/
    DomainName dz-studio
    EnvironmentName sm-unifiedstudio
    PrivateSubnet1CIDR 10.192.20.0/24
    PrivateSubnet2CIDR 10.192.21.0/24
    PrivateSubnet3CIDR 10.192.22.0/24
    ProjectName sidproject
    PublicSubnet1CIDR 10.192.10.0/24
    PublicSubnet2CIDR 10.192.11.0/24
    PublicSubnet3CIDR 10.192.12.0/24
    UsersList analyst
    VpcCIDR 10.192.0.0/16

The stack creation process can take approximately 20 minutes to complete. You can check the Outputs tab for the stack after the stack is created.

Next, we prepare source data, setup the AWS Glue ETL Job, Amazon EMR Serverless Spark Job and Amazon Redshift Job to generate the lineage and capture lineage from Amazon SageMaker Unified Studio

Prepare data

The following is example data from our CSV files:

attendance.csv

EmployeeID,Date,ShiftStart,ShiftEnd,Absent,OvertimeHours
E1000,2024-01-01,2024-01-01 08:00:00,2024-01-01 16:22:00,False,3
E1001,2024-01-08,2024-01-08 08:00:00,2024-01-08 16:38:00,False,2
E1002,2024-01-23,2024-01-23 08:00:00,2024-01-23 16:24:00,False,3
E1003,2024-01-09,2024-01-09 10:00:00,2024-01-09 18:31:00,False,0
E1004,2024-01-15,2024-01-15 09:00:00,2024-01-15 17:48:00,False,1

employees.csv

EmployeeID,Name,Department,Role,HireDate,Salary,PerformanceRating,Shift,Location
E1000,Employee_0,Quality Control,Operator,2021-08-08,33002.0,1,Night,Plant C
E1001,Employee_1,Maintenance,Supervisor,2015-12-31,69813.76,5,Evening,Plant B
E1002,Employee_2,Production,Technician,2015-06-18,46753.32,1,Evening,Plant A
E1003,Employee_3,Admin,Supervisor,2020-10-13,52853.4,5,Night,Plant A
E1004,Employee_4,Quality Control,Manager,2023-09-21,55645.27,5,Evening,Plant A

Upload the sample data from attendance.csv and employees.csv to the S3 bucket specified in the previous CloudFormation stack (s3://datazone-{account_id}/csv/).

Ingest employee data in Amazon Relational Database Dervice (Amazon RDS) for MySQL table

On the CloudFormation console, open the stack vpc-analytics-lineage-sus and collect the Amazon RDS for MySQL database endpoint to use in the following commands to create a default employeedb database.

  1. Connect to Amazon EC2 instance with mysql package installation
  2. Run the following command to connect to the database
    >MySQL -u admin -h database-1.cuqd06l5efvw.us-west-2.rds.amazonaws.com -p

  3. Run the following command to create an employee table
    Use employeedb;
    
    CREATE TABLE employee (
      EmployeeID longtext,
      Name longtext,
      Department longtext,
      Role longtext,
      HireDate longtext,
      Salary longtext,
      PerformanceRating longtext,
      Shift longtext,
      Location longtext
    );

  4. Running the following command to insert rows.
    INSERT INTO employee (EmployeeID, Name, Department, Role, HireDate, Salary, PerformanceRating, Shift, Location) VALUES ('E1000', 'Employee_0', 'Quality Control', 'Operator', '2021-08-08', 33002.00, 1, 'Night', 'Plant C'), ('E1001', 'Employee_1', 'Maintenance', 'Supervisor', '2015-12-31', 69813.76, 5, 'Evening', 'Plant B'), ('E1002', 'Employee_2', 'Production', 'Technician', '2015-06-18', 46753.32, 1, 'Evening', 'Plant A'), ('E1003', 'Employee_3', 'Admin', 'Supervisor', '2020-10-13', 52853.40, 5, 'Night', 'Plant A'), ('E1004', 'Employee_4', 'Quality Control', 'Manager', '2023-09-21', 55645.27, 5, 'Evening', 'Plant A');

Capture lineage from AWS Glue ETL job and notebook

To demonstrate the lineage, we set up an AWS Glue extract, transform, and load (ETL) job to read the employee data from an Amazon RDS for MySQL table and the employee attendance data from Amazon S3, and join both datasets. Finally, we write the data to Amazon S3 and create the attendance_with_emp1 table in the AWS Glue Data Catalog.

Create and configure AWS Glue job for lineage generation

Complete the following steps to create your AWS Glue ETL job:

  1. On the AWS Glue console, create a new ETL job with AWS Glue version 5.0.
  2. Enable Generate lineage events and provide the domain ID (retrieve from the CloudFormation template output for DataZoneDomainid; it will have the format dzd_xxxxxxxx)
  3. Use the following code snippet in the AWS Glue ETL job script. Provide the S3 bucket (bucketname-{account_id}) used in the preceding CloudFormation stack.
    from pyspark.sql import SparkSession
    from pyspark.sql import SparkSession, DataFrame
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    from pyspark import SparkContext
    from pyspark.sql import SparkSession
    import sys
    import logging
    
    
    spark = SparkSession.builder.appName("lineageglue").enableHiveSupport().getOrCreate()
     
    connection_details = glueContext.extract_jdbc_conf(connection_name="connectionname")
    
    employee_df = spark.read.format("jdbc").option("url", "jdbc:MySQL://dbhost:3306/database_name").option("dbtable", "employee").option("user", connection_details['user']).option("password", connection_details['password']).load()
    
    s3_paths = {
    'absent_data': 's3://bucketname-{account_id}/csv/attendance.csv'
    }
    absent_df = spark.read.csv(s3_paths['absent_data'], header=True, inferSchema=True)
    
    joined_df = employee_df.join(absent_df, on="EmployeeID", how="inner")
    
    joined_df.write.mode("overwrite").format("parquet").option("path", "s3://datazone-{account_id}/attendanceparquet/").saveAsTable("gluedbname.tablename")

  4. Choose Run to start the job.
  5. On the Runs tab, confirm the job ran without failure.
  6. After the job has executed successfully, navigate to the SageMaker Unified Studio domain.
  7. Choose Project and under Overview, choose Data Sources.
  8. Select the Data Catalog source (accountid-AwsDataCatalog-glue_db_suffix-default-datasource).
  9. On the Actions dropdown menu, choose Edit.
  10. Under Connection, enable Import data lineage.
  11. In the Data Selection section, under Table Selection Criteria, provide a table name or use * to generate lineage.
  12. Update the data source and choose Run to create an asset called attendance_with_emp1 in SageMaker Catalog.
  13. Navigate to Assets, choose the attendance_with_emp1 asset, and navigate to the LINEAGE section.

The following lineage diagram shows an AWS Glue job that integrates data from two sources: employee information stored in Amazon RDS for MySQL and employee absence records stored in Amazon S3. The AWS Glue job combines these datasets through a join operation, then creates a table in the Data Catalog and registers it as an asset in SageMaker Catalog, making the unified data available for further analysis or machine learning purposes.

Create and configure AWS Glue notebook for lineage generation

Complete the following steps to create the AWS Glue notebook:

  1. On the AWS Glue console, choose Author using an interactive code notebook.
  2. Under Options, choose Start fresh and choose Create notebook.
  3. In the notebook, use the following code to generate lineage.

    In the following code, we add the required Spark configuration to generate lineage and then read CSV data from Amazon S3 and write in Parquet format to the Data Catalog table. The Spark configuration includes the following parameters:

    • spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener – Registers the OpenLineage listener to capture Spark job execution events and metadata for lineage tracking
    • spark.openlineage.transport.type=amazon_datazone_api – Specifies Amazon DataZone as the destination service where the lineage data will be sent and stored
    • spark.openlineage.transport.domainId=dzd_xxxxxxx – Defines the unique identifier of your Amazon DataZone domain where the lineage data will be associated
    • spark.glue.accountId={account_id} – Specifies the AWS account ID where the AWS Glue job is running for proper resource identification and access
    • spark.openlineage.facets.custom_environment_variables – Lists the specific environment variables to capture in the lineage data for context about the AWS and AWS Glue environment
    • spark.glue.JOB_NAME=lineagenotebook – Sets a unique identifier name for the AWS Glue job that will appear in lineage tracking and logs

    See the following code:

    %%configure —name project.spark -f
    {
    "—conf":"spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener \
    --conf spark.openlineage.transport.type=amazon_datazone_api \
    --conf spark.openlineage.transport.domainId=dzd_xxxxxxxx \
    --conf spark.glue.accountId={account_id} \
    --conf spark.openlineage.facets.custom_environment_variables=[AWS_DEFAULT_REGION;GLUE_VERSION;GLUE_COMMAND_CRITERIA;GLUE_PYTHON_VERSION;] \
    --conf spark.glue.JOB_NAME=lineagenotebook"
    }
    
    from pyspark.sql import SparkSession
    from pyspark.sql import SparkSession, DataFrame
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    from pyspark import SparkContext
    from pyspark.sql import SparkSession
    import sys
    import logging
    
    
    spark = SparkSession.builder.appName("lineagegluenotebook").enableHiveSupport().getOrCreate()
    
    s3_paths = {
    'absent_data': 's3://datazone-{account_id}/csv/attendance.csv'
    }
    absent_df = spark.read.csv(s3_paths['absent_data'], header=True, inferSchema=True)
    
    absent_df.write.mode("overwrite").format("parquet").option("path", "s3://datazone-{account_id}/attendanceparquet2/").saveAsTable("gluedbname.tablename")

  4. After the notebook has executed successfully, navigate to the SageMaker Unified Studio domain.
  5. Choose Project and under Overview, choose Data Sources.
  6. Choose the Data Catalog source ({account_id}-AwsDataCatalog-glue_db_suffix-default-datasource).
  7. Choose Run to create the asset attendance_with_empnote in SageMaker Catalog.
  8. Navigate to Assets, choose the attendance_with_empnote asset, and navigate to the LINEAGE section.

The following lineage diagram shows an AWS Glue job that reads data from the employee absence records stored in Amazon S3. The AWS Glue job transform CSV data into Parquet format, then creates a table in the Data Catalog and registers it as an asset in SageMaker Catalog.

Capture lineage from Amazon Redshift

To demonstrate the lineage, we are creating an employee table and an attendance table and join both datasets. Finally, we create a new table called employeewithabsent in Amazon Redshift. Complete the following steps to create and configure lineage for Amazon Redshift tables:

  1. In SageMaker Unified Studio, open your domain.
  2. Under Compute, choose Data warehouse.
  3. Open project.redshift and copy the endpoint name (redshift-serverless-workgroup-xxxxxxx).
  4. On the Amazon Redshift console, open the Query Editor v2, and connect to the Redshift Serverless workgroup with a secret. Use the AWS Secrets Manager option and choose the secret redshift-serverless-namespace-xxxxxxxx.
  5. Use the following code to create tables in Amazon Redshift and load data from Amazon S3 using the COPY command. Make sure the IAM role has GetObject permission on the S3 files attendance.csv and employees.csv.

    Create Redshift table absent

    CREATE TABLE public.absent (
        employeeid character varying(65535),
        date date,
        shiftstart timestamp without time zone ,
        shiftend timestamp without time zone,
        absent boolean,
        overtimehours integer
    );

    Load data into absent table.

    COPY absent
    FROM 's3://datazone-{account_id}/csv/attendance.csv' 
    IAM_ROLE 'arn:aws:iam::accountid:role/RedshiftAdmin'
    csv
    IGNOREHEADER 1;

    Create Redshift table employee

    CREATE TABLE public.employee (
        employeeid character varying(65535),
        name character varying(65535),
        department character varying(65535),
        role character varying(65535),
        hiredate date,
        salary double precision,
        performancerating integer,
        shift character varying(65535),
        location character varying(65535)
    );

    Load data into employee table.

    COPY employee
    FROM 's3://datazone-{account_id}/csv/employees.csv' 
    IAM_ROLE 'arn:aws:iam::account-id:role/RedshiftAdmin'
    csv
    IGNOREHEADER 1;

  6. After the tables are created and the data is loaded, perform the join between the tables and create a new table with a CTAS query:
    CREATE TABLE public.employeewithabsent AS
    SELECT 
      e.*,
      a.absent,
      a.overtimehours
    FROM public.employee e
    INNER JOIN public.absent a
    ON e.EmployeeID = a.EmployeeID;

  7. Navigate to the SageMaker Unified Studio domain.
  8. Choose Project and under Overview, choose Data Sources.
  9. Select the Amazon Redshift source (RedshiftServerless-default-redshift-datasource).
  10. On the Actions dropdown menu, choose Edit.
  11. Under Connection, Enable Import data lineage.
  12. In the Data Selection section, under Table Selection Criteria, provide a table name or use * to generate lineage.
  13. Update the data source and choose Run to create an asset called employeewithabsent in SageMaker Catalog.
  14. Navigate to Assets, choose the employeewithabsent asset, and navigate to the LINEAGE section.

The following lineage diagram shows joining two redshift tables and creating a new redshift table and registers it as an asset in SageMaker Catalog.

Capture lineage from EMR Serverless job

To demonstrate the lineage, we read employee data from an RDS for MySQL table and an attendance dataset from Amazon Redshift, and join both datasets. Finally, we write the data to Amazon S3 and create the attendance_with_employee table in the Data Catalog. Complete the following steps:

  1. On the Amazon EMR console, choose EMR Serverless in the navigation pane.
  2. To create or manage EMR Serverless applications, you need the EMR Studio UI.
    1. If you already have an EMR Studio in the Region where you want to create an application, choose Manage applications to navigate to your EMR Studio, or select the EMR Studio that you want to use.
    2. If you don’t have an EMR Studio in the Region where you want to create an application, choose Get started and then choose Create and launch Studio. EMR Serverless creates an EMR Studio for you so you can create and manage applications.
  3. In the Create studio UI that opens in a new tab, enter the name, type, and release version for your application.
  4. Choose Create application.
  5. Create an EMR Spark serverless application with the following configuration:
    1. For Type, choose Spark.
    2. For Release version, choose emr-7.8.0.
    3. For Architecture, choose x86_64.
    4. For Application setup options, select Use custom settings.
    5. For Interactive endpoint, enable the endpoint for EMR Studio.
    6. For Application configuration, use the following configuration:
      [{
          "Classification": "iceberg-defaults",
          "Properties": {
              "iceberg.enabled": "true"
          }
      }]

  6. Choose Create and Start application.
  7. After application has started, submit the Spark application to generate lineage events. Copy the following script and upload it to the S3 bucket (s3://datazone-{account_id}/script/). Upload the MySQL-connector-java JAR file to the S3 bucket (s3://datazone-{account_id}/jars/) to read the data from MySQL.
    from pyspark.sql import SparkSession
    from pyspark.sql import SparkSession, DataFrame
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    from pyspark import SparkContext
    from pyspark.sql import SparkSession
    import sys
    import logging
    
    
    spark = SparkSession.builder.appName("lineageglue").enableHiveSupport().getOrCreate()
    
    employee_df = spark.read.format("jdbc").option("driver","com.MySQL.cj.jdbc.Driver").option("url", "jdbc:MySQL://dbhostname:3306/databasename").option("dbtable", "employee").option("user", "admin").option("password", "xxxxxxx").load()
    
    absent_df = spark.read.format("jdbc").option("url", "jdbc:redshift://redshiftserverlessendpoint:5439/dev").option("dbtable", "public.absent").option("user", "admin").option("password", "xxxxxxxxxx").load()
    
    joined_df = employee_df.join(absent_df, on="EmployeeID", how="inner")
    
    joined_df.write.mode("overwrite").format("parquet").option("path", "s3://datazone-{account_id}/emrparquetnew/").saveAsTable("gluedname.tablename")

  8. After you upload the script, use the following command to submit the Spark application. Change the following parameters according to your environment details:
    1. application-id: Provide the Spark application ID you generated.
    2. execution-role-arn: Provide the EMR execution role.
    3. entryPoint: Provide the Spark script S3 path.
    4. domainID: Provide the domain ID (from the CloudFormation template output for DataZoneDomainid: dzd_xxxxxxxx).
    5. accountID: Provide your AWS account ID.
      aws emr-serverless start-job-run --application-id 00frv81tsqe0ok0l --execution-role-arn arn:aws:iam::{account_id}:role/service-role/AmazonEMR-ExecutionRole-1717662744320 --name "Spark-Lineage" --job-driver '{
              "sparkSubmit": {
                  "entryPoint": "s3://datazone-{account_id}/script/emrspark2.py",
                  "sparkSubmitParameters": "--conf spark.executor.cores=1 --conf spark.executor.memory=4g --conf spark.driver.cores=1 --conf spark.driver.memory=4g --conf spark.executor.instances=2 --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory --conf spark.jars=/usr/share/aws/datazone-openlineage-spark/lib/DataZoneOpenLineageSpark-1.0.jar,s3://datazone-{account_id}/jars/MySQL-connector-java-8.0.20.jar --conf spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener --conf spark.openlineage.transport.type=amazon_datazone_api --conf spark.openlineage.transport.domainId=dzd_xxxxxxxx --conf spark.glue.accountId={account_id}"
              }
          }'

  9. After the job has executed successfully, navigate to the SageMaker Unified Studio domain.
  10. Choose Project and under Overview, choose Data Sources.
  11. Select the Data Catalog source ({account_id}-AwsDataCatalog-glue_db_xxxxxxxxxx-default-datasource).
  12. On the Actions dropdown menu, choose Edit.
  13. Under Connection, enable Import data lineage.
  14. In the Data Selection section, under Table Selection Criteria, provide a table name or use * to generate lineage.
  15. Update the data source and choose Run to create an asset called attendancewithempnew in SageMaker Catalog.
  16. Navigate to Assets, choose the attendancewithempnew asset, and navigate to the LINEAGE section.

The following lineage diagram shows an AWS Glue job that integrates employee information stored in Amazon RDS for MySQL and employee absence records stored in Amazon Redshift. The AWS Glue job combines these datasets through a join operation, then creates a table in the Data Catalog and registers it as an asset in SageMaker Catalog.

Clean up

To clean up your resources, complete the following steps:

  1. On the AWS Glue console, delete the AWS Glue job.
  2. On the Amazon EMR console, delete the EMR Serverless Spark application and EMR Studio.
  3. On the AWS CloudFormation console, delete the CloudFormation stack vpc-analytics-lineage-sus.

Conclusion

In this post, we showed how data lineage in SageMaker Catalog helps you track and understand the complete lifecycle of your data across various AWS analytics services. This comprehensive tracking system provides visibility into how data flows through different processing stages, transformations, and analytical workflows, making it an essential tool for data governance, compliance, and operational efficiency.

Try out these lineage visualization methods for your own use cases, and share your questions and feedback in the comments section.


About the Authors

Shubham Purwar

Shubham Purwar

Shubham is an AWS Analytics Specialist Solution Architect. He helps organizations unlock the full potential of their data by designing and implementing scalable, secure, and high-performance analytics solutions on the AWS platform. With deep expertise in AWS analytics services, he collaborates with customers to uncover their distinct business requirements and create customized solutions that deliver actionable insights and drive business growth. In his free time, Shubham loves to spend time with his family and travel around the world.

Nitin Kumar

Nitin Kumar

Nitin is a Cloud Engineer (ETL) at Amazon Web Services, specialized in AWS Glue. With a decade of experience, he excels in aiding customers with their big data workloads, focusing on data processing and analytics. He is committed to helping customers overcome ETL challenges and develop scalable data processing and analytics pipelines on AWS. In his free time, he likes to watch movies and spend time with his family.

Prashanthi Chinthala

Prashanthi Chinthala

Prashanthi is a Cloud Engineer (DIST) at AWS. She helps customers overcome EMR challenges and develop scalable data processing and analytics pipelines on AWS.

Optimize Amazon EMR runtime for Apache Spark with EMR S3A

Post Syndicated from Giovanni Matteo Fumarola original https://aws.amazon.com/blogs/big-data/optimize-amazon-emr-runtime-for-apache-spark-with-emr-s3a/

With the Amazon EMR 7.10 runtime, Amazon EMR has introduced EMR S3A, an improved implementation of the open source S3A file system connector. This enhanced connector is now automatically set as the default S3 file system connector for Amazon EMR deployment options, including Amazon EMR on EC2, Amazon EMR Serverless, Amazon EMR on Amazon EKS, and Amazon EMR on AWS Outposts, maintaining complete API compatibility with open source Apache Spark.

In the Amazon EMR 7.10 runtime for Apache Spark, the EMR S3A connector exhibits performance comparable to EMRFS for read workloads, as demonstrated by TPC-DS query benchmark. The connector’s most significant performance gains are evident in write operations, with a 7% improvement in static partition overwrites and a 215% improvement for dynamic partition overwrites when compared to EMRFS. In this post, we showcase the enhanced read and write performance advantages of using Amazon EMR 7.10.0 runtime for Apache Spark with EMR S3A as compared to EMRFS and the open source S3A file system connector.

Read workload performance comparison

To evaluate the read performance, we used a test environment based on Amazon EMR runtime version 7.10.0 running Spark 3.5.5 and Hadoop 3.4.1. Our testing infrastructure featured an Amazon Elastic Compute Cloud (Amazon EC2) cluster comprised of nine r5d.4xlarge instances. The primary node has 16 vCPU and 128 GB memory, and the eight core nodes have a total of 128 vCPU and 1024 GB memory.

The performance evaluation was conducted using a comprehensive testing methodology designed to provide accurate and meaningful results. For the source data, we chose the 3 TB scale factor, which contains 17.7 billion records, approximately 924 GB of compressed data partitioned in Parquet file format. The setup instructions and technical details can be found in the GitHub repository. We used Spark’s in-memory data catalog to store metadata for TPC-DS databases and tables.

To produce a fair and accurate comparison between EMR S3A vs. EMRFS and open source S3A implementations, we implemented a three-phase testing approach:

  • Phase 1: Baseline performance:
    • Established a baseline using default Amazon EMR configuration with EMR’s S3A connector
    • Created a reference point for subsequent comparisons
  • Phase 2: EMRFS analysis:
    • Maintained the default file system as EMRFS
    • Preserved other configuration settings
  • Phase 3: Open source S3A testing:
    • Modified only the hadoop-aws.jar file by replacing it with the open source Hadoop S3A 3.4.1 version
    • Maintained identical configurations across other components

This controlled testing environment was crucial for our evaluation for the following reasons:

  • We could isolate the performance impact specifically to the S3A connector implementation
  • It removed potential variables that could skew the results
  • It provided accurate measurements of performance improvements between Amazon’s S3A implementation and the open source alternative

Test execution and results

Throughout the testing process, we maintained consistency in test conditions and configurations, making sure any observed performance differences could be directly attributed to the S3A connector implementation variations. A total of 104 SparkSQL queries were run in 10 iterations sequentially, and an average of each query’s runtime in these 10 iterations was used for comparison. The average of the 10 iterations’ runtime on the Amazon EMR 7.10 runtime for Apache Spark with EMR S3A was 1116.87 seconds, which is 1.08 times faster than open source S3A and comparable with EMRFS. The following figure illustrates the total runtime in seconds.

The following table summarizes the metrics.

Metric OSS S3A EMRFS EMR S3A
Average runtime in seconds 1208.26 1129.64 1116.87
Geometric mean over queries in seconds 7.63 7.09 6.99
Total cost * $6.53 $6.40 $6.15

*Detailed cost estimates are discussed later in this post.

The following chart demonstrates the per-query performance improvement of EMR S3A relative to open source S3A on the Amazon EMR 7.10 runtime for Apache Spark. The extent of the speedup varies from one query to another, with the fastest up to 1.51 times faster for q3, with Amazon EMR S3A outperforming open source S3A. The horizontal axis arranges the TPC-DS 3TB benchmark queries in descending order based on the performance improvement seen with Amazon EMR, and the vertical axis depicts the magnitude of this speedup as a ratio.

Read cost comparison

Our benchmark outputs the total runtime and geometric mean figures to measure the Spark runtime performance. The cost metric can provide us with additional insights. Cost estimates are computed using the following formulas. They factor in Amazon EC2, Amazon Elastic Block Store (Amazon EBS), and Amazon EMR costs, but don’t include Amazon Simple Storage Service (Amazon S3) GET and PUT costs.

  • Amazon EC2 cost (include SSD cost) = number of instances * r5d.4xlarge hourly rate * job runtime in hours
    • r5d.4xlarge hourly rate = $1.152 per hour
  • Root Amazon EBS cost = number of instances * Amazon EBS per GB-hourly rate * root EBS volume size * job runtime in hours
  • Amazon EMR cost = number of instances * r5d.4xlarge Amazon EMR cost * job runtime in hours
    • r5d.4xlarge Amazon EMR cost = $0.27 per hour
  • Total cost = Amazon EC2 cost + root Amazon EBS cost + Amazon EMR cost

The following table summarizes these costs.

Metric EMRFS EMR S3A OSS S3A
Runtime in hours 0.5 0.48 0.51
Number of EC2 instances 9 9 9
Amazon EBS size 0 gb 0 gb 0 gb
Amazon EC2 cost $5.18 $4.98 $5.29
Amazon EBS cost $0.00 $0.00 $0.00
Amazon EMR cost $1.22 $1.17 $1.24
Total cost $6.40 $6.15 $6.53
Cost savings Baseline EMR S3A is 1.04 times better than EMRFS EMR S3A is 1.06 times better than OSS S3A

Write workload performance comparison

We conducted benchmark tests to assess the write performance of the Amazon EMR 7.10 runtime for Apache Spark.

Static table/partition overwrite

We evaluated the static table/partition overwrite write performance of the different file system by executing the following INSERT OVERWRITE Spark SQL query. The SELECT * FROM range(...) clause generated data at execution time. This produced approximately 15 GB of data across exactly 100 Parquet files in Amazon S3.

SET rows=4e9; -- 4 Billion
SET partitions=100;
INSERT OVERWRITE DIRECTORY 's3://${bucket}/perf-test/${trial_id}'
USING PARQUET SELECT * FROM range(0, ${rows}, 1, ${partitions});

The test environment was configured as follows:

  • EMR cluster with emr-7.10.0 release label
  • Single m5d.2xlarge instance (primary group)
  • Eight m5d.2xlarge instances (core group)
  • S3 bucket in the same AWS Region as the EMR cluster
  • The trial_id property used a UUID generator to avoid conflict between test runs

Results

After running 10 trials for each file system, we captured and summarized query runtimes in the following chart. Whereas EMR S3A averaged only 26.4 seconds, the EMRFS and open source S3A averaged 28.4 seconds and 31.4 seconds—a 1.07 times and 1.19 times improvement, respectively.

Dynamic partition overwrite

We also evaluated the write performance by executing the following INSERT OVERWRITE dynamic partition Spark SQL query, which joins TPC-DS 3TB partitioned Parquet data of the table web_sales and date_dim tables, which inserts approximately 2,100 partitions, where each partition contains one Parquet file with a combined size of approximately 31.2 GB in Amazon S3.

SET spark.sql.sources.partitionOverwriteMode=DYNAMIC;
INSERT OVERWRITE TABLE <TABLE_NAME> PARTITION(wsdt_year,wsdt_month, wsdt_day) 
SELECT ws_order_number,ws_quantity,ws_list_price,ws_sales_price,
ws_net_paid_inc_ship_tax,ws_net_profit,dt.d_year as wsdt_year,dt.d_moy 
as wsdt_month,dt.d_dom as wsdt_day FROM web_sales, date_dim dt 
WHERE ws_sold_date_sk = d_date_sk;

The test environment was configured as follows:

  • EMR cluster with emr-7.10.0 release label
  • Single r5d.4xlarge instance (master group)
  • Five r5d.4xlarge instances (core group)
  • Approximately 2,100 partitions with one Parquet file each
  • Combined size of approximately 31.2 GB in Amazon S3

Results

After running 10 trials for each file system, we captured and summarized query runtimes in the following chart. Whereas EMR S3A averaged only 90.9 seconds, the EMRFS and open source S3A averaged 286.4 seconds and 1,438.5 seconds—a 3.15 times and 15.82 times improvement, respectively.

Summary

Amazon EMR consistently enhances its Apache Spark runtime and S3A connector, delivering continuous performance improvements that help big data customers execute analytics workloads more cost-effectively. Beyond performance gains, the strategic shift to S3A introduces critical advantages, including enhanced standardization, improved cross-platform portability, and robust community-driven support—all while maintaining or surpassing the performance benchmarks established by the previous EMRFS implementation.

We recommend that you stay up-to-date with the latest Amazon EMR release to take advantage of the latest performance and feature benefits. Subscribe to the AWS Big Data Blog’s RSS feed to learn more about the Amazon EMR runtime for Apache Spark, configuration best practices, and tuning advice.


About the authors

Giovanni Matteo Fumarola

Giovanni Matteo Fumarola

Giovanni is the Senior Manager for the Amazon EMR Spark and Iceberg group. He is an Apache Hadoop Committer and PMC member. He has been focusing in the big data analytics space since 2013.

Sushil Kumar Shivashankar

Sushil Kumar Shivashankar

Sushil is the Engineering Manager for the Amazon EMR Hadoop and Flink team at Amazon Web Services. With a focus on big data analytics since 2014, he leads development, optimizations, and growth strategies for Hadoop and Flink business in Amazon EMR.

Narayanan Venkateswaran

Narayanan Venkateswaran

Narayanan is a Senior Software Development Engineer in the Amazon EMR group. He works on developing Hadoop components in Amazon EMR. He has over 20 years of work experience in the industry across several companies, including Sun Microsystems, Microsoft, Amazon, and Oracle. Narayanan also holds a PhD in databases with a focus on horizontal scalability in relational stores.

Syed Shameerur Rahman

Syed Shameerur Rahman

Syed is a Software Development Engineer at Amazon EMR. He is interested in highly scalable, distributed computing. He is an active contributor of open source projects like Apache Hive, Apache Tez, Apache ORC, and Apache Hadoop, and has contributed important features and optimizations. During his free time, he enjoys exploring new places and trying new foods.

Rajarshi Sarkar

Rajarshi Sarkar

Rajarshi is a Software Development Engineer at Amazon EMR. He works on cutting-edge features of Amazon EMR and is also involved in open source projects such as Apache Hive, Iceberg, Trino, and Hadoop. In his spare time, he likes to travel, watch movies, and hang out with friends.

Unlock the power of Apache Iceberg v3 deletion vectors on Amazon EMR

Post Syndicated from Arun Shanmugam original https://aws.amazon.com/blogs/big-data/unlock-the-power-of-apache-iceberg-v3-deletion-vectors-on-amazon-emr/

As modern data architectures expand, Apache Iceberg has become a widely popular open table format, providing ACID transactions, time travel, and schema evolution. In table format v2, Iceberg introduced merge-on-read, improving delete and update handling through positional delete files. These files improve write performance but can slow down reads when not compacted, since Iceberg must merge them during query execution to return the latest snapshot. Iceberg v3 enhances merge performance during reads by replacing positional delete files with deletion vectors for handling row-level deletes in Merge-on-Read (MoR) tables. This change deprecates the use of positional delete files in v3, which marked specific row positions as deleted, in favor of the more efficient deletion vectors.

In this post, we compare and evaluate the performance of the new binary deletion vectors in Iceberg v3 with respect to traditional position delete files of Iceberg v2 using Amazon EMR version 7.10.0 with Apache Spark 3.5.5. We provide insights into the practical impacts of these advanced row-level delete mechanisms on data management efficiency and performance.

Understanding binary deletion vectors and Puffin files

Binary deletion vectors stored in Puffin files use compressed bitmaps to efficiently represent which rows have been deleted within a data file. In contrast, previous Iceberg versions (v2) relied on positional delete files—Parquet files that enumerated rows to delete by file and position. This older approach resulted in many small delete files, which placed a heavy burden on query engines due to numerous file reads and costly in-memory conversions. Puffin files reduce this overhead by compactly encoding deletions, improving query performance and resource utilization.

Iceberg v3 improves this in the following aspects:

  • Reduced I/O – Fewer small delete files lower metadata overhead by introducing deletion vectors—compressed bitmaps that efficiently represent deleted rows. These vectors are stored persistently in Puffin files, a compact binary format optimized for low-latency access.
  • Query performance – Bitmap-based deletion vectors enable faster scan filtering by allowing multiple vectors to be stored in a single Puffin file. This reduces metadata and file count overhead while preserving file-level granularity for efficient reads. The design supports continuous merging of deletion vectors, promoting ongoing compaction that maintains stable query performance and reduces fragmentation over time. It removes the trade-off between partition-level and file-level delete granularity seen in v2, enabling consistently fast reads even in heavy-update scenarios.
  • Storage efficiency – Iceberg v3 uses a compressed binary format instead of verbose Parquet positioning. Engines maintain a single deletion vector per data file at write time, enabling better compaction and consistent query performance.

Solution overview

To explore the performance characteristics of delete operations in Iceberg v2 and v3, we use PySpark to run our comparison tests focusing on delete operation runtime and delete file size. This implementation helps us effectively benchmark and compare the deletion mechanisms between Iceberg v2’s position-delete files using Parquet and v3’s newer Puffin-based deletion vectors.

Our solution demonstrates how to configure Spark with the AWS Glue Data Catalog and Iceberg, create tables, and run delete operations programmatically. We first create Iceberg tables with format versions 2 and 3, insert 10,000 rows, then perform delete operations on a range of record IDs. We also perform table compaction and then measure delete operation runtime and size and count of associated delete files.

In Iceberg v3, deleting rows introduces binary deletion vectors stored in Puffin files (compact binary sidecar files). These allow more efficient query planning and faster read performance by consolidating deletes and avoiding large numbers of small files.

For this test, the Spark job was submitted by SSH’ing into the EMR cluster and using spark-submit directly from the shell, with the required Iceberg JAR file being referenced directly from the Amazon Simple Storage Service (Amazon S3) bucket in the submission command. When running the job, make sure you provide your S3 bucket name. See the following code:

spark-submit --jars s3://< S3-BUCKET-NAME >/iceberg/jars/iceberg-spark-runtime-3.5_2.12-1.9.2.jar v3_deletion_vector_test.py

Prerequisites

To follow along with this post, you must have the following prerequisites:

  • Amazon EMR on Amazon EC2 with version 7.10.0 integrated with the Glue Data Catalog, which includes Spark 3.5.5.
  • The Iceberg 1.9.2 JAR file from the official Iceberg documentation, which includes important deletion vector improvements such as v2 to v3 rewrites and dangling deletion vector detection. Optionally, you can use the default Iceberg 1.8.1-amzn-0 bundled with Amazon EMR 7.10 if these Iceberg 1.9.x improvements are not required.
  • An S3 bucket to store Iceberg data.
  • An AWS Identity and Access management (IAM) role for Amazon EMR configured with the necessary permissions.

The upcoming Amazon EMR 7.11 will ship with Iceberg 1.9.1-amzn-1, which includes deletion vector improvements such as v2 to v3 rewrites and dangling deletion vector detection. This means you no longer need to manually download or upload the Iceberg JAR file, because it will be included and managed natively by Amazon EMR.

Code walkthrough

The following PySpark script demonstrates how to create, write, compact, and delete records in Iceberg tables with two different format versions (v2 and v3) using the Glue Data Catalog as the metastore. The main goal is to compare both write and read performance, along with storage characteristics (delete file format and size) between Iceberg format versions 2 and 3.

The code performs the following functions:

  • Creates a SparkSession configured to use Iceberg with Glue Data Catalog integration.
  • Creates a synthetic dataset simulating user records:
    • Uses a fixed random seed (42) to provide consistent data generation
    • Creates identical datasets for both v2 and v3 tables for fair comparison
  • Defines the function test_read_performance(table_name) to perform the following actions:
    • Measure full table scan performance
    • Measure filtered read performance (with WHERE clause)
    • Track record counts for both operations
  • Defines the function test_iceberg_table(version, test_df) to perform the following actions:
    • Create or use an Iceberg table for the specified format version
    • Append data to the Iceberg table
    • Trigger Iceberg’s data compaction using a system procedure
    • Delete rows with IDs between 1000–1099
    • Collect statistics about inserted data files and delete-related files
    • Measure and record read performance metrics
    • Track operation timing for inserts, deletes, and reads
  • Defines a function to print a comprehensive comparative report including the following information:
    • Delete operation performance
    • Read performance (both full table and filtered)
    • Delete file characteristics (formats, counts, sizes)
    • Performance improvements as percentages
    • Storage efficiency metrics
  • Orchestrate the main execution flow:
    • Create a single dataset to ensure identical data for both versions
    • Clean up existing tables for fresh testing
    • Run tests for Iceberg format version 2 and version 3
    • Output a detailed comparison report
    • Handle exceptions and shut down the Spark session

See the following code:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql import functions as F
import time
import random
import logging
from pyspark.sql.utils import AnalysisException
# Logging
logging.basicConfig(level=logging.INFO, format='%(message)s')
logger = logging.getLogger(__name__)
# Constants
ROWS_COUNT = 10000
DELETE_RANGE_START = 1000
DELETE_RANGE_END = 1099
SAMPLE_NAMES = ["Alice", "Bob", "Charlie", "Diana",
                "Eve", "Frank", "Grace", "Henry", "Ivy", "Jack"]
# Spark Session
spark = (
    SparkSession.builder
    .appName("IcebergWithGlueCatalog")
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
    .config("spark.sql.catalog.glue_catalog.warehouse", "s3://<S3-BUCKET-NAME>/blog/glue/")
    .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    .getOrCreate()
)
spark.sql("CREATE DATABASE IF NOT EXISTS glue_catalog.blog")
def create_dataset(num_rows=ROWS_COUNT):
    # Set a fixed seed for reproducibility
    random.seed(42)
    
    data = [(i,
             random.choice(SAMPLE_NAMES) + str(i),
             random.randint(18, 80))
            for i in range(1, num_rows + 1)]
    schema = StructType([
        StructField("id", IntegerType(), False),
        StructField("name", StringType(), True),
        StructField("age", IntegerType(), True)
    ])
    df = spark.createDataFrame(data, schema)
    df = df.withColumn("created_at", F.current_timestamp())
    return df
def test_read_performance(table_name):
    """Test read performance of the table"""
    start_time = time.time()
    count = spark.sql(f"SELECT COUNT(*) FROM glue_catalog.blog.{table_name}").collect()[0][0]
    read_time = time.time() - start_time
    
    # Test filtered read performance
    start_time = time.time()
    filtered_count = spark.sql(f"""
        SELECT COUNT(*) 
        FROM glue_catalog.blog.{table_name} 
        WHERE age > 30
    """).collect()[0][0]
    filtered_read_time = time.time() - start_time
    
    return read_time, filtered_read_time, count, filtered_count
def test_iceberg_table(version, test_df):
    try:
        table_name = f"iceberg_table_v{version}"
        logger.info(f"\n=== TESTING ICEBERG V{version} ===")
        spark.sql(f"""
            CREATE TABLE IF NOT EXISTS glue_catalog.blog.{table_name} (
                id int,
                name string,
                age int,
                created_at timestamp
            ) USING iceberg
            TBLPROPERTIES (
                'format-version'='{version}',
                'write.delete.mode'='merge-on-read'
            )
        """)
        start_time = time.time()
        test_df.writeTo(f"glue_catalog.blog.{table_name}").append()
        insert_time = time.time() - start_time
        logger.info("Compaction...")
        spark.sql(
            f"CALL glue_catalog.system.rewrite_data_files('glue_catalog.blog.{table_name}')")
        start_time = time.time()
        spark.sql(f"""
            DELETE FROM glue_catalog.blog.{table_name}
            WHERE id BETWEEN {DELETE_RANGE_START} AND {DELETE_RANGE_END}
        """)
        delete_time = time.time() - start_time
        files_df = spark.sql(
            f"SELECT COUNT(*) as data_files FROM glue_catalog.blog.{table_name}.files")
        delete_files_df = spark.sql(f"""
            SELECT COUNT(*) as delete_files,
                   file_format,
                   SUM(file_size_in_bytes) as total_size
            FROM glue_catalog.blog.{table_name}.delete_files
            GROUP BY file_format
        """)
        data_files = files_df.collect()[0]['data_files']
        delete_stats = delete_files_df.collect()
        # Add read performance testing
        logger.info("\nTesting read performance...")
        read_time, filtered_read_time, total_count, filtered_count = test_read_performance(table_name)
        
        logger.info(f"Insert time: {insert_time:.3f}s")
        logger.info(f"Delete time: {delete_time:.3f}s")
        logger.info(f"Full table read time: {read_time:.3f}s")
        logger.info(f"Filtered read time: {filtered_read_time:.3f}s")
        logger.info(f"Data files: {data_files}")
        logger.info(f"Total records: {total_count}")
        logger.info(f"Filtered records: {filtered_count}")
        if len(delete_stats) > 0:
            stats = delete_stats[0]
            logger.info(f"Delete files: {stats.delete_files}")
            logger.info(f"Delete format: {stats.file_format}")
            logger.info(f"Delete files size: {stats.total_size} bytes")
            return delete_time, stats.total_size, stats.file_format, read_time, filtered_read_time
        else:
            logger.info("No delete files found")
            return delete_time, 0, "N/A", read_time, filtered_read_time
    except AnalysisException as e:
        logger.error(f"SQL Error: {str(e)}")
        raise
    except Exception as e:
        logger.error(f"Error: {str(e)}")
        raise
def print_comparison_results(v2_results, v3_results):
    v2_delete_time, v2_size, v2_format, v2_read_time, v2_filtered_read_time = v2_results
    v3_delete_time, v3_size, v3_format, v3_read_time, v3_filtered_read_time = v3_results
    logger.info("\n=== PERFORMANCE COMPARISON ===")
    logger.info(f"v2 delete time: {v2_delete_time:.3f}s")
    logger.info(f"v3 delete time: {v3_delete_time:.3f}s")
    if v2_delete_time > 0:
        improvement = ((v2_delete_time - v3_delete_time) / v2_delete_time) * 100
        logger.info(f"v3 Delete performance improvement: {improvement:.1f}%")
    logger.info("\n=== READ PERFORMANCE COMPARISON ===")
    logger.info(f"v2 full table read time: {v2_read_time:.3f}s")
    logger.info(f"v3 full table read time: {v3_read_time:.3f}s")
    logger.info(f"v2 filtered read time: {v2_filtered_read_time:.3f}s")
    logger.info(f"v3 filtered read time: {v3_filtered_read_time:.3f}s")
    
    if v2_read_time > 0:
        read_improvement = ((v2_read_time - v3_read_time) / v2_read_time) * 100
        logger.info(f"v3 Read performance improvement: {read_improvement:.1f}%")
    
    if v2_filtered_read_time > 0:
        filtered_improvement = ((v2_filtered_read_time - v3_filtered_read_time) / v2_filtered_read_time) * 100
        logger.info(f"v3 Filtered read performance improvement: {filtered_improvement:.1f}%")
    logger.info("\n=== DELETE FILE COMPARISON ===")
    logger.info(f"v2 delete format: {v2_format}")
    logger.info(f"v2 delete size: {v2_size} bytes")
    logger.info(f"v3 delete format: {v3_format}")
    logger.info(f"v3 delete size: {v3_size} bytes")
    if v2_size > 0:
        size_reduction = ((v2_size - v3_size) / v2_size) * 100
        logger.info(f"v3 size reduction: {size_reduction:.1f}%")
# Main
try:
    # Create dataset once and reuse for both versions
    test_dataset = create_dataset()
    
    # Drop existing tables if they exist
    spark.sql("DROP TABLE IF EXISTS glue_catalog.blog.iceberg_table_v2")
    spark.sql("DROP TABLE IF EXISTS glue_catalog.blog.iceberg_table_v3")
    
    # Test both versions with the same dataset
    v2_results = test_iceberg_table(2, test_dataset)
    v3_results = test_iceberg_table(3, test_dataset)
    print_comparison_results(v2_results, v3_results)
finally:
    spark.stop()

Results summary

The output generated by the code includes the results summary section that shows several key comparisons, as shown in the following screenshot. For delete operations, Iceberg v3 uses the Puffin file format compared to Parquet in v2, resulting in significant improvements. The delete operation time decreased from 3.126 seconds in v2 to 1.407 seconds in v3, achieving a 55.0% performance improvement. Additionally, the delete file size was reduced from 1801 bytes using Parquet in v2 to 475 bytes using Puffin in v3, representing a 73.6% reduction in storage overhead. Read operations also saw notable improvements, with full table reads 28.5% faster and filtered reads 23% faster in v3. These improvements demonstrate the efficiency gains from v3’s implementation of binary deletion vectors through the Puffin format.

style=

The actual measured performance and storage improvements depend on workload and environment and might differ from the preceding example.

This following screenshot from the S3 bucket demonstrates a Puffin delete file stored alongside data files.

style=

Clean up

After you finish your tests, it’s important to clean up your environment to avoid unnecessary costs:

  1. Drop the test tables you created to remove associated data from your S3 bucket and prevent ongoing storage charges.
  2. Delete any temporary data left in the S3 bucket used for Iceberg data.
  3. Delete the EMR cluster to stop billing for running compute resources.

Cleaning up resources promptly helps maintain cost-efficiency and resource hygiene in your AWS environment.

Considerations

Iceberg features are introduced through a phased process: first in the specification, then in the core library, and finally in engine implementations. Deletion vector support is currently available in the specification and core library, with Spark being the only supported engine. We validated this capability on Amazon EMR 7.10 with Spark 3.5.5.

Conclusion

Iceberg v3 introduces a significant advancement in managing row-level deletes for merge-on-read operations through binary deletion vectors stored in compact Puffin files. Our performance tests, conducted with Iceberg 1.9.2 on Amazon EMR 7.10.0 and EMR Spark 3.5.5, show clear improvements in both delete operation speed and read performance, along with a considerable reduction in delete file storage compared to Iceberg v2’s positional delete Parquet files. For more information about deletion vectors, refer to Iceberg v3 deletion vectors.


About the authors

Arun Shanmugam

Arun Shanmugam

Arun is a Senior Analytics Solutions Architect at AWS, with a focus on building modern data architecture. He has been successfully delivering scalable data analytics solutions for customers across diverse industries. Outside of work, Arun is an avid outdoor enthusiast who actively engages in CrossFit, road biking, and cricket.

Suthan Phillips

Suthan Phillips

Suthan is a Senior Analytics Architect at AWS, where he helps customers design and optimize scalable, high-performance data solutions that drive business insights. He combines architectural guidance on system design and scalability with best practices to provide efficient, secure implementation across data processing and experience layers. Outside of work, Suthan enjoys swimming, hiking, and exploring the Pacific Northwest.

Kinshuk Paharae

Kinshuk Paharae

Kinshuk is head of product for data processing, leading product teams for AWS Glue, Amazon EMR, and Amazon Athena. He has been with AWS for over 5 years.

Linda OConnor

Linda OConnor

Linda is a Seasoned Go-To-Market Leader with close to three decades of experience driving growth strategies in the data and analytics space. At AWS, she currently leads pan analytics initiatives including lakehouse architectures, helping customers transform their existing landscapes through non-disruptive innovation. She previously served as Global Vice President at a German software company for 25 years, where she spearheaded Data Warehousing and Big Data portfolios, orchestrating successful product launches and driving global market expansion.

Automate and orchestrate Amazon EMR jobs using AWS Step Functions and Amazon EventBridge

Post Syndicated from Senthil Kamala Rathinam original https://aws.amazon.com/blogs/big-data/automate-and-orchestrate-amazon-emr-jobs-using-aws-step-functions-and-amazon-eventbridge/

Many enterprises are adopting Apache Spark for scalable data processing tasks such as extract, transform, and load (ETL), batch analytics, and data enrichment. As data pipelines evolve, the need for flexible and cost-efficient execution environments that support automation, governance, and performance at scale also evolve in parallel. Amazon EMR provides a powerful environment to run Spark workloads, and depending on workload characteristics and compliance requirements, teams can choose between fully managed options like Amazon EMR Serverless or more customizable configurations using Amazon EMR on Amazon Elastic Compute Cloud (Amazon EC2).

In use cases where infrastructure control, data locality, or strict security postures are essential, such as in financial services, healthcare, or government, running transient EMR on EC2 clusters becomes a preferred choice. However, orchestrating the full lifecycle of these clusters, from provisioning to job submission and eventual teardown, can introduce operational overhead and risk if done manually.

To streamline this process, the AWS Cloud offers built-in orchestration capabilities using AWS Step Functions and Amazon EventBridge. Together, these services help you automate and schedule the entire EMR job lifecycle, reducing manual intervention while optimizing cost and compliance. Step Functions provides the workflow logic to manage cluster creation, Spark job execution, and cluster termination, and EventBridge schedules these workflows based on business or operational needs.

In this post, we discuss how to build a fully automated, scheduled Spark processing pipeline using Amazon EMR on EC2, orchestrated with Step Functions and triggered by EventBridge. We walk through how to deploy this solution using AWS CloudFormation, processes COVID-19 public dataset data in Amazon Simple Storage Service (Amazon S3), and store the aggregated results in Amazon S3. This architecture is ideal for periodic or scheduled batch processing scenarios where infrastructure control, auditability, and cost-efficiency are critical.

Solution overview

This solution uses the publicly available COVID-19 dataset to illustrate how to build a modular, scheduled architecture for scalable and cost-efficient batch processing for time-bound data workloads.The solution follows these steps:

  1. Raw COVID-19 data in CSV format is stored in an S3 input bucket.
  2. A scheduled rule in EventBridge triggers a Step Functions workflow.
  3. The Step Functions workflow provisions a transient Amazon EMR cluster using EC2 instances.
  4. A PySpark job is submitted to the cluster to calculate COVID-19 hospital utilization data to compute monthly state-level averages of inpatient and ICU bed utilization, and COVID-19 patient percentages.
  5. The processed results are written back to an S3 output bucket.
  6. After successful job completion, the EMR cluster is automatically deleted.
  7. Logs are persisted to Amazon S3 for observability and troubleshooting.

By automating this workflow, you alleviate the need to manually manage EMR clusters while gaining cost-efficiency by running compute only when needed. This architecture is ideal for periodic Spark jobs such as ETL pipelines, regulatory reporting, and batch analytics, especially when control, compliance, and customization are required.The following diagram illustrates the architecture for this use case.

The infrastructure is deployed using AWS CloudFormation to provide consistency and repeatability. AWS Identity and Access Management (IAM) roles grant least‑privilege access to Step Functions, Amazon EMR, EC2 instances, and S3 buckets, and optional AWS Key Management Service (AWS KMS) encryption can secure data at rest in Amazon S3 and Amazon CloudWatch Logs. By combining a scheduled trigger, stateful orchestration, and centralized logging, this solution delivers a fully automated, cost‑optimized, and secure way to run transient Spark workloads in production.

Prerequisites

Before you get started, make sure you have the following prerequisites:

Set up resources with AWS CloudFormation

To provision the required resources using a single CloudFormation template, complete the following steps:

  1. Sign in to the AWS Management Console as an admin user.
  2. Clone the sample repository to your local machine or AWS CloudShell and navigate into the project directory.
    git clone https://github.com/aws-samples/sample-emr-transient-cluster-step-functions-eventbridge.git
    cd sample-emr-transient-cluster-step-functions-eventbridge

  3. Set an environment variable for the AWS Region where you plan to deploy the resources. Replace the placeholder with your Region code, for example, us-east-1.
    export AWS_REGION=<YOUR AWS REGION>

  4. Deploy the stack using the following command. Update the stack name if needed. In this example, the stack is created with the name covid19-analysis.
    aws cloudformation deploy \
    --template-file emr_transient_cluster_step_functions_eventbridge.yaml \
    --stack-name covid19-analysis \
    --capabilities CAPABILITY_IAM \
    --region $AWS_REGION 

You can monitor the stack creation progress on the AWS CloudFormation console on the Events tab. The deployment typically completes in under 5 minutes.

After the stack is successfully created, go to the Outputs tab on the AWS CloudFormation console and note the following values for use in later steps:

  • InputBucketName
  • OutputBucketName
  • LogBucketName

Set up the COVID-19 dataset

With your infrastructure in place, complete the following steps to set up the input data:

  1. Download the COVID-19 data CSV file from HealthData.gov to your local machine.
  2. Rename the downloaded file to covid19-dataset.csv.
  3. Upload the renamed file to your S3 input bucket under the raw/ folder path.

Set up the PySpark Script

Complete the following steps to set up the PySpark script:

  1. Open AWS CloudShell from the console.
  2. Confirm that you are working inside the sample-emr-transient-cluster-step-functions-eventbridge directory before running the next command.
  3. Copy the PySpark script needed for this walkthrough into your input bucket:
    aws s3 cp covid19_processor.py s3://<InputBucketName>/scripts/

This script processes COVID-19 hospital utilization data stored as CSV files in your S3 input bucket. When running the job, provide the following command-line arguments:

  • --input – The S3 path to the input CSV files
  • --output – The S3 path to store the processed results

The script reads the raw dataset, standardizes various date formats, and filters out records with invalid or missing dates. It then extracts key utilization metrics such as inpatient bed usage, ICU bed usage, and the percentage of beds occupied by COVID-19 patients and calculates monthly averages grouped by state. The aggregated output is saved as timestamped CSV files in the specified S3 location.

This example demonstrates how you can use PySpark to efficiently clean, transform, and analyze large-scale healthcare data to gain actionable insights on hospital capacity trends during the pandemic.

Configure a schedule in EventBridge

The Step Functions state machine is by default scheduled to run on December 31, 2025, as a one-time execution. You can update the schedule for recurring or one-time execution as needed. Complete the following steps:

  1. On the EventBridge console, choose Schedules under Scheduler in the navigation pane.
  2. Select the schedule named <StackName>-covid19-analysis and choose Edit.
  3. Set your preferred schedule pattern.
    1. If you want to run the schedule one time, select One-time schedule for Occurrence and enter a date and time.
    2. If you want to run this on a recurring basis, select Recurring schedule. Specify the schedule type as either Cron-based schedule or Rate-based schedule as needed.
  4. Choose Next twice and choose Save schedule.

Start the workflow in Step Functions

Based on your EventBridge schedule, the Step Functions workflow will run automatically. For this walkthrough, complete the following steps to trigger it manually:

  1. On the Step Functions console, choose State machines in the navigation pane.
  2. Choose the state machine that begins with Covid19AnalysisStateMachine-*.
  3. Choose Start execution.
  4. In the Input section, provide the following JSON (provide the log bucket and output bucket names with the appropriate values captured earlier):
    {
      "LogUri": "s3://<LogBucketName>/logs/",
      "OutputS3Location": "s3://<OutputBucketName>/processed/"
    }

  5. Choose Start execution to initiate the workflow.

Monitor the EMR job and workflow execution

After you start the workflow, you can track both the Step Functions state transitions and the EMR job progress in real time on the console.

Monitor the Step Functions state machine

Complete the following steps to monitor the Step Functions state machine:

  1. On the Step Functions console, choose State machines in the navigation pane.
  2. Choose the state machine that begins with Covid19AnalysisStateMachine-*.
  3. Choose the running execution to view the visual workflow.

    Each state node will update as it progresses—green for success, red for failure.

  4. To explore a step, choose its node and inspect the input, output, and error details in the side pane.

The following screenshot shows an example of a successfully executed workflow.

Monitor the EMR cluster and EMR step

Complete the following steps to monitor the EMR cluster and EMR step status:

  1. While the cluster is active, open the Amazon EMR console and choose Clusters in the navigation pane.
  2. Locate the Covid19Cluster transient EMR cluster.
    Initially, it will be in Starting status.

    On the Steps tab, you can see your Spark submit step listed. As the job progresses, the step status changes from Pending to Running to finally Completed or Failed.

  3. Choose the Applications tab to view the application UIs, in which you can access the Spark History Server and YARN Timeline Server for monitoring and troubleshooting.

Monitor CloudWatch logs

To enable CloudWatch logging and enhanced monitoring for your EMR on EC2 cluster, refer to Amazon EMR on EC2 – Enhanced Monitoring with CloudWatch using custom metrics and logs. This guide explains how to install and configure the CloudWatch agent using a bootstrap action, so you can stream system-level metrics (such as CPU, memory, and disk usage) and application logs from EMR nodes directly to CloudWatch. With this setup, you can gain real-time visibility into cluster health and performance, simplify troubleshooting, and retain critical logs even after the cluster is terminated.

For this walkthrough, check the logs in the S3 log output location.

Confirm cluster deletion

When the Spark step is complete, Step Functions will automatically delete the Amazon EMR cluster. Refresh the Clusters page on the Amazon EMR console. You should see your cluster status change from Terminating to Terminated within a minute.

By following these steps, you gain full end-to-end visibility into your workflow from the moment the Step Functions state machine is triggered to the automatic shutdown of the EMR cluster. You can monitor execution progress, troubleshoot issues, confirm job success, and continuously optimize your transient Spark workloads.

Verify job output in Amazon S3

When the job is complete, complete the following steps to check the processed results in the S3 output bucket:

  1. On the Amazon S3 console, choose Buckets in the navigation pane.
  2. Open the output S3 bucket you noted earlier.
  3. Open the processed folder.
  4. Navigate into the timestamped subfolder to view the CSV output file.
  5. Download the CSV file to view the processed results, as shown in the following screenshot.

Monitoring and troubleshooting

To monitor the progress of your Spark job running on a transient EMR on EC2 cluster, use the Step Functions console. It provides real-time visibility into each state transition in your workflow, from cluster creation and job submission to cluster deletion. This makes it straightforward to track execution flow and identify where issues might occur.During job execution, you can use the Amazon EMR console to access cluster-level monitoring. This includes YARN application statuses, step-level logs, and overall cluster health. If CloudWatch logging is enabled in your job configuration, driver and executor logs stream in near real time, so you can quickly detect and diagnose errors, resource constraints, or data skew within your Spark application.

After the workflow is complete, regardless of whether it succeeds or fails, you can perform a detailed post-execution analysis by reviewing the logs stored in the S3 bucket specified in the LogUri parameter. This log directory includes standard output and error logs, along with Spark history files, offering insights into execution behavior and performance metrics.

For continued access to the Spark UI during job execution, you can use persistent application UIs on the EMR console. These links remain accessible even after the cluster is stopped, enabling deeper root-cause analysis and performance tuning for future runs.

This visibility into both workflow orchestration and job execution can help teams optimize their Spark workloads, reduce troubleshooting time, and build confidence in their EMR automation pipelines.

Clean up

To avoid incurring ongoing charges, clean up the resources provisioned during this walkthrough:

  1. Empty the S3 buckets:
    1. On the Amazon S3 console, choose Buckets in the navigation pane.
    2. Select the input, output, and log buckets used in this tutorial.
    3. Choose Empty to remove all objects before deleting the buckets (optional).
  2. Delete the CloudFormation stack:
    1. On the AWS CloudFormation console, choose Stacks in the navigation pane.
    2. Select the stack you created for this solution and choose Delete.
    3. Confirm the deletion to remove associated resources.

Conclusion

In this post, we showed how to build a fully automated and cost-effective Spark processing pipeline using Step Functions, EventBridge, and Amazon EMR on EC2. The workflow provisions a transient EMR cluster, runs a Spark job to process data, and stops the cluster after the job completes. This approach helps reduce costs while giving you full control over the process. This solution is ideal for scheduled data processing tasks such as ETL jobs, log analytics, or batch reporting, especially when you need detailed control over infrastructure, security, and compliance settings.

To get started, deploy the solution in your environment using the CloudFormation stack provided and adjust it to fit your data processing needs. Check out the Step Functions Developer Guide and Amazon EMR Management Guide to explore further.

Share your feedback and ideas in the comments or connect with your AWS Solutions Architect to fine-tune this pattern for your use case.


About the authors

Senthil Kamala Rathinam

Senthil Kamala Rathinam

Senthil is a Solutions Architect at Amazon Web Services, specializing in Data and Analytics for banking customers across North America. With deep expertise in Data and Analytics, AI/ML, and Generative AI, he helps organizations unlock business value through data-driven transformation. Beyond work, Senthil enjoys spending time with his family and playing badminton.

Shashi Makkapati

Shashi Makkapati

Shashi is a Senior Solutions Architect serving banking customers across North America. He specializes in data analytics, AI/ML, and generative AI, focusing on innovative solutions that transform financial organizations. Shashi is passionate about leveraging technology to solve complex business challenges in the banking sector. Outside of work, he enjoys traveling and spending quality time with his family.

Streamline Spark application development on Amazon EMR with the Data Solutions Framework on AWS

Post Syndicated from Vincent Gromakowski original https://aws.amazon.com/blogs/big-data/streamline-spark-application-development-on-amazon-emr-with-the-data-solutions-framework-on-aws/

Today, organizations are heavily using Apache Spark for their big data processing needs. However, managing the entire development lifecycle of Spark applications—from local development to production deployment—can be complex and time-consuming. Managing the entire code base—including application code, infrastructure provisioning, and continuous integration and delivery (CI/CD) pipelines—is sometimes not fully automated and a shared responsibility across multiple teams, which slows down release cycles. This undifferentiated heavy lifting diverts valuable resources away from core business objectives: deriving value from data.

In this post, we explore how to use Amazon EMR, the AWS Cloud Development Kit (AWS CDK), and the Data Solutions Framework (DSF) on AWS to streamline the development process, from setting up a local development environment to deploying serverless Spark infrastructure, and implementing a CI/CD pipeline for automated testing and deployment.

By adopting this approach, developers gain full control over their code and the infrastructure responsible for running it, alleviating the need for cross-team dependency. Developers can customize the infrastructure to meet specific business needs and optimize performance. Additionally, they can customize CI/CD stages to facilitate comprehensive testing, using the self-mutation capability of AWS CDK Pipelines to automatically update and refine the deployment process. This level of control not only accelerates development cycles but also enhances the reliability and efficiency of the entire application lifecycle, so developers can focus more on innovation and less on manual infrastructure management.

Solution overview

The solution consists of the following key components:

  • The local development environment to develop and test your Spark code locally
  • The infrastructure as code (IaC) that will run your Spark application in AWS environments
  • The CI/CD pipeline running end-to-end tests and deploying into the different AWS environments

In the following sections, we discuss how to set up these components.

Prerequisites

To set up this solution, you must have an AWS account with appropriate permissions, Docker and the AWS CDK CLI.

Set up the local development environment

Developing Spark applications locally can be a challenging task due to the need for a consistent and efficient environment that mirrors your production setup. With Amazon EMR, Docker, and the Amazon EMR toolkit extension for Visual Studio Code, you can quickly set up a local development environment for Spark applications, developing and testing Spark code locally, and seamlessly port it to the cloud.

The Amazon EMR toolkit for VS Code includes an “EMR: Create Local Spark Environment” command that generates a development container. This container is based on an Amazon EMR on Amazon EKS image corresponding to the Amazon EMR version you select. You can develop Spark and PySpark code locally, with full compatibility with your remote Amazon EMR environment. Additionally, the toolkit provides helpers to make it straightforward to connect to the AWS Cloud, including an Amazon EMR explorer, an AWS Glue Data Catalog explorer, and commands to run Amazon EMR Serverless jobs from VS Code.

To set up your local environment, complete the following steps:

  1. Install VS Code and the Amazon EMR Toolkit for VS Code.
  2. Install and launch Docker.
  3. Create a local Amazon EMR environment in your working directory using the command EMR: Create Local Spark Environment.

Amazon EMR Toolkit bootstrap

  1. Choose PySpark, Amazon EMR 7.5, and the AWS Region you want to use, and choose an authentication mechanism.

Amazon EMR toolkit local environment

  1. Log in to Amazon ECR with your AWS credentials using the following command so you can download the Amazon EMR image:
aws ecr get-login-password --region us-east-1 \
    | docker login \
    --username AWS \
    --password-stdin \
    12345678910.dkr.ecr.us-east-1.amazonaws.com
  1. Now you can launch your dev container using the VS Code command Dev Containers: Rebuild and Reopen in container.

The container will install the latest operating system packages and run a local Spark history server on port 18080.

local Spark history server

The container provides spark-shell, spark-sql, and pyspark from the terminal and a Jupyter Python kernel for connecting a Jupyter notebook to execute interactive Spark code.

local Jupyter notebooks

Using the Amazon EMR Toolkit, you can develop your Spark application and test it locally using Pytest—for example, to validate the business logic. You can also connect to other AWS accounts where you have your development environment.

Build the AWS CDK application with DSF on AWS

After you validate the business logic into your local Spark application, you can implement the infrastructure responsible for running your application. DSF provides AWS CDK L3 Constructs that simplify the creation of Spark-based data pipelines on EMR Serverless or Amazon EMR on EKS.

DSF provides the capability to package your local PySpark application, including the Python dependencies, into artifacts that can consumed by EMR Serverless jobs. The PySparkApplicationPackage is a construct that uses a Dockerfile to perform the packaging of dependencies into a Python virtual environment archive and then upload the archive and the PySpark entrypoint file into a secured Amazon Simple Storage Service (Amazon S3) bucket. The following diagram illustrates this architecture.

PySparkApplicationPackage L3 construct

See the following example code:

spark_app = dsf.processing.PySparkApplicationPackage(
    self,
    "SparkApp",
    entrypoint_path="./../spark/src/agg_trip_distance.py",
    application_name="TaxiAggregation",
    # Path of the Dockerfile used to package the dependencies as a Python venv
    dependencies_folder='./../spark',
    # Path of the venv archive in the docker image
    venv_archive_path="/venv-package/pyspark-env.tar.gz",
    removal_policy=RemovalPolicy.DESTROY)

You just need to provide the paths for the following:

  • The PySpark entrypoint. This is the main Python script of your Spark application.
  • The Dockerfile containing the logic for packaging a virtual environment into an archive.
  • The path of the resulting archive in the container file system.

DSF provides helpers to connect the application package to the EMR Serverless job. The PySparkApplicationPackage construct exposes properties that can directly be used into the SparkEmrServerlessJob construct parameters. This construct simplifies the configuration of a batch job using an AWS Step Functions state machine. The following diagram illustrates this architecture.

EmrServerlessJob L3 construct

The following code is an example of an EMR Serverless job:

spark_job = dsf.processing.SparkEmrServerlessJob(
    self,
    "SparkProcessingJob",
    dsf.processing.SparkEmrServerlessJobProps(
        name=f"taxi-agg-job-{Names.unique_resource_name(self)}",
        # ID of the previously created EMR Serverless runtime
        application_id=spark_runtime.application.attr_application_id,
        # The IAM role used by the EMR Job with permissions required by the application
        execution_role=processing_exec_role,
        spark_submit_entry_point=spark_app.entrypoint_uri,
        # Add the Spark parameters from the PySpark package to configure the dependencies (using venv)
        spark_submit_parameters=spark_app.spark_venv_conf + spark_params,
        removal_policy=RemovalPolicy.DESTROY,
        schedule=schedule))

Note the two parameters of SparkEmrServerlessJob that are provided by PySparkApplicationPackage:

  • entrypoint_uri, which is the S3 URI of the entrypoint file
  • spark_venv_conf, which contains the Spark submit parameters for using the Python virtual environment

DSF also provides a SparkEmrServerlessRuntime to simplify the creation of the EMR Serverless application responsible for running the job.

Deploy the Spark application using CI/CD

The final step is to implement a CI/CD pipeline that can test your Spark code and promote from dev/test/stage and then to production. DSF provides a L3 Construct that simplifies the creation of the CI/CD pipeline for your Spark applications. DSF’s implementation of the Spark CI/CD pipeline construct uses the AWS CDK built-in pipeline functionality. One of the key capabilities when using an AWS CDK pipeline is its self-mutating capability. It can update itself whenever you change its definition, avoiding the traditional chicken-and-egg problem of pipeline updates and helping developers fully control their CI/CD pipeline.

When the pipeline runs, it follows a carefully orchestrated sequence. First, it retrieves your code from your repository and synthesizes it into AWS CloudFormation templates. Before doing anything else, it examines these templates to see if you’ve made any changes to the pipeline’s own structure. If the pipeline detects that its definition has changed, it will pause its normal operation and update itself first. After the pipeline has updated itself, it will continue with its regular stages, such as deploying your application.

DSF provides an opinionated implementation of CDK Pipelines for Spark applications, where the PySpark code is automatically unit tested using Pytest and where the configuration is simplified. You only need to configure four components:

  • The CI/CD stages (testing, staging, production, and so on). This includes the AWS account ID and Region where these environments reside in.
  • The AWS CDK stack that is deployed in each environment.
  • (Optional) The integration test script that you want to run against the deployed stack.
  • The SparkEmrCICDPipeline AWS CDK construct.

The following diagram illustrates how everything works together.

SparkCICDPipeline L3 construct

Let’s dive into each of these components.

Define cross-account deployment and CI/CD stages

With the SparkEmrCICDPipeline construct, you can deploy your Spark application stack across different AWS accounts. For example, you can have a separate account for your CI/CD processes and different accounts for your staging and production environments.To set this up, first bootstrap the various AWS accounts (staging, production, and so on):

cdk bootstrap --profile <ENVIRONMENT_ACCOUNT_PROFILE> \ 
    aws://<ENVIRONMENT_ACCOUNT_ID&gt;/&lt;REGION> \ 
    --trust <CICD_ACCOUNT_ID> \ 
    --cloudformation-execution-policies "POLICY_ARN"

This step sets up the necessary resources in the environment accounts and creates a trust relationship between those accounts and the CI/CD account where the pipeline will run.Next, choose between two options to define the environments (both options require the relevant configuration in the cdk.context.json file.The first option is to use pre-defined environments, which is defined as follows:

{ 
    "staging": { 
        "account": "<STAGING_ACCOUNT_ID>", 
        "region": "<REGION>" 
    }, 
    "prod": { 
        "account": "<PROD_ACCOUNT_ID>", 
        "region": "<REGION>" 
    } 
}

Alternatively, you can use user-defined environments, which is defined as follows:

{
   "environments":[
      {
         "stageName":"<STAGE_NAME_1>",
         "account":"<STAGE_ACCOUNT_ID>",
         "region":"<REGION>",
         "triggerIntegTest":"<OPTIONAL_BOOLEAN_CAN_BE_OMMITTED>"
      },
      {
         "stageName":"<STAGE_NAME_2>",
         "account":"<STAGE_ACCOUNT_ID>",
         "region":"<REGION>",
         "triggerIntegTest":"<OPTIONAL_BOOLEAN_CAN_BE_OMMITTED>"
      },
      {
         "stageName":"<STAGE_NAME_3>",
         "account":"<STAGE_ACCOUNT_ID>",
         "region":"<REGION>",
         "triggerIntegTest":"<OPTIONAL_BOOLEAN_CAN_BE_OMMITTED>"
      }
   ]
}

Customize the stack to be deployed

Now that the environments have been bootstrapped and configured, let’s look at the actual stack that contains the resources that will be deployed in the various environments. Two classes must be implemented:

  • A class that extends the stack – This is where the resources that are going to be deployed in each of the environments are defined. This can be a normal AWS CDK stack, but it can be deployed in another AWS account depending on the environment configuration defined in the previous section.
  • A class that extends ApplicationStackFactory – This is DSF specific, and makes it possible to configure and then return the stack that is created.

The following code shows a full example:

class MyApplicationStack(cdk.Stack): 
    def __init__(self, scope, *, stage): 
        super().__init__(scope, "MyApplicationStack") 
        bucket = Bucket(self, "TestBucket",
                        auto_delete_objects=True, 
                        removal_policy=cdk.RemovalPolicy.DESTROY) 
        cdk.CfnOutput(self, "BucketName", value=bucket.bucket_name) 
        
class MyStackFactory(dsf.utils.ApplicationStackFactory): 
    def create_stack(self, scope, stage): 
        return MyApplicationStack(scope, stage=stage)

ApplicationStackFactory supports customization of the stack before returning the initialized object to be deployed by the CI/CD pipeline. You can customize your stack behavior by passing the current stage to your stack. For example, you can skip scheduling the Spark application in the integration tests stage because the integration tests trigger it manually as part of the CI/CD pipeline. For the production stage, the scheduling facilitates automatic execution of the Spark application.

Write the integration test script

The integration test script is a bash script that is triggered after the main application stack has been deployed. Inputs to the bash script can come from the AWS CloudFormation outputs of the main application stack. These outputs are mapped into environment variables that the bash script can access directly.

In the Spark CI/CD example, the application stack uses the SparkEMRServerlessJob CDK construct. This construct uses a Step Functions state machine to manage the execution and monitoring of the Spark job. The following is an example integration test bash script that we use to test that the deployed stack can run the associated Spark job successfully:

#!/bin/bash 
EXECUTION_ARN=$(aws stepfunctions start-execution --state-machine-arn $STEP_FUNCTION_ARN | jq -r '.executionArn')

while true 
do 
    STATUS=$(aws stepfunctions describe-execution --execution-arn $EXECUTION_ARN | jq -r '.status') 
    if [ $STATUS = "SUCCEEDED" ]; then 
        exit 0 
    elif [ $STATUS = "FAILED" ] || [ $STATUS = "TIMED_OUT" ] || [ $STATUS = "ABORTED" ]; then 
        exit 1 
    else 
        sleep 10
        continue 
    fi
done

The integration test scripts are executed within an AWS CodeBuild project. As part of the IntegrationTestStack, we’ve included a custom resource that periodically checks the status of the integration test script as it runs. Failure of the CodeBuild execution causes the parent pipeline (residing in the pipeline account) to fail. This helps teams only promote changes that pass all the required testing.

Bring all the components together

When you have your components ready, you can use the SparkEmrCICDPipeline to bring them together. See the following example code:

dsf.processing.SparkEmrCICDPipeline(
    self,
    "SparkCICDPipeline",
    spark_application_name="SparkTest",
    # The Spark image to use in the CICD unit tests
    spark_image=dsf.processing.SparkImage.EMR_7_5,
    # The factory class to dynamically pass the Application Stack
    application_stack_factory=SparkApplicationStackFactory(),
    # Path of the CDK python application to be used by the CICD build and deploy phases
    cdk_application_path="infra",
    # Path of the Spark application to be built and unit tested in the CICD
    spark_application_path="spark",
    # Path of the bash script responsible to run integration tests 
    integ_test_script='./infra/resources/integ-test.sh',
    # Environment variables used by the integration test script, value is the CFN output name
    integ_test_env={
        "STEP_FUNCTION_ARN": "ProcessingStateMachineArn"
    },
    # Additional permissions to give to the CICD to run the integration tests
    integ_test_permissions=[
        PolicyStatement(
            actions=["states:StartExecution", "states:DescribeExecution"
            ],
            resources=["*"]
        )
    ],
    source= CodePipelineSource.connection("your/repo", "branch",
        connection_arn="arn:aws:codeconnections:us-east-1:222222222222:connection/7d2469ff-514a-4e4f-9003-5ca4a43cdc41"
    ),
    removal_policy=RemovalPolicy.DESTROY,
)

The following elements of the code are worth highlighting:

  • With the integ_test_env parameter, you can define the environment variable mapping with the output of your application stack that’s defined in the application_stack_factory parameter
  • The integ_test_permissions parameter specifies the AWS Identity and Access Management (IAM) permissions that are attached to the CodeBuild project where the integration test script runs in
  • CDK Pipelines needs an AWS code connection Amazon Resource Name (ARN) to connect to your Git repository when you host your code

Now you can deploy the stack containing the CI/CD pipeline. This is a one-time operation because the CI/CD pipeline will dynamically be updated based on code changes that impact the CI/CD pipeline itself:

cd infra 
cdk deploy CICDPipeline

Then you can commit and push the code into the source code repository defined in the source parameter. This step triggers the pipeline and deploys the application in the configured environments. You can check the pipeline definition and status on the AWS CodePipeline console.

AWS CodePipeline

You can find the full example on the Data Solutions Framework GitHub repository.

Clean up

Follow the readme guide to delete the resources created by the solution.

Conclusion

By using Amazon EMR, the AWS CDK, DSF on AWS, and the Amazon EMR toolkit, developers can now streamline their Spark application development process. The solution described in this post helps developers gain full control over their code and infrastructure, making it possible to set up local development environments, implement automated CI/CD pipelines, and deploy serverless Spark infrastructure across multiple environments.

DSF supports other patterns, such as streaming governance and data sharing and Amazon Redshift data warehousing. The DSF roadmap is publicly available, and we look forward to your feature requests, contributions, and feedback. You can get started using DSF by following our Quick start guide.

 


About the authors

Jan Michael Go Tan

Jan Michael Go Tan

Jan is a Principal Solutions Architect for Amazon Web Services. He helps customers design scalable and innovative solutions with the AWS Cloud.

Vincent Gromakowski

Vincent Gromakowski

Vincent is an Analytics Specialist Solutions Architect at AWS where he enjoys solving customers’ analytics, NoSQL, and streaming challenges. He has a strong expertise on distributed data processing engines and resource orchestration platform.

Lotfi Mouhib

Lotfi Mouhib

Lotfi is a Principal Solutions Architect working for the Public Sector team with Amazon Web Services. He helps public sector customers across EMEA realize their ideas, build new services, and innovate for citizens. In his spare time, Lotfi enjoys cycling and running.

Deploy Apache YuniKorn batch scheduler for Amazon EMR on EKS

Post Syndicated from Suvojit Dasgupta original https://aws.amazon.com/blogs/big-data/deploy-apache-yunikorn-batch-scheduler-for-amazon-emr-on-eks/

As organizations successfully grow their Apache Spark workloads on Amazon EMR on EKS, they may seek to optimize resource scheduling to further enhance cluster utilization, minimize job queuing, and maximize performance. Although Kubernetes’ default scheduler, kube-scheduler, works well for most containerized applications, it lacks feature sets capable of managing complex big data workloads with specific requirements such as gang scheduling, resource quotas, job priorities, multi-tenancy, and hierarchical queue management. This limitation can result in inefficient resource utilization, longer job completion times, and increased operational costs for organizations running large-scale data processing workloads.

Apache YuniKorn addresses these limitations by providing a custom resource scheduler specifically designed for big data and machine learning (ML) workloads running on Kubernetes. Unlike kube-scheduler, YuniKorn offers features such as gang scheduling, making sure all containers of a Spark application start together, resource fairness amongst multiple tenants, priority and preemption capabilities, and queue management with hierarchical resource allocation. For data engineering and platform teams managing large-scale Spark workloads on Amazon EMR on EKS, YuniKorn can improve resource utilization rates, reduce job completion times, and provide improved resource allocation for multi-tenant clusters. This is particularly valuable for organizations running mixed workloads with varying resource requirements, strict SLA requirements, or complex resource sharing policies across different teams and applications.

This post explores Kubernetes scheduling fundamentals, examines the limitations of the default kube-scheduler for batch workloads, and demonstrates how YuniKorn addresses these challenges. We discuss how to deploy YuniKorn as a custom scheduler for Amazon EMR on EKS, its integration with job submissions, how to configure queues and placement rules, and how to establish resource quotas. We also show these features in action through practical Spark job examples.

Understanding Kubernetes scheduling and the need for YuniKorn

In this section, we dive into the details of Kubernetes scheduling and the need for YuniKorn.

How Kubernetes scheduling works

Kubernetes scheduling is the process of assigning pods to nodes within a cluster while considering resource requirements, scheduling constraints, and isolation constraints. The scheduler evaluates each pod individually against all schedulable worker nodes, considering multiple factors, including resource requirements such as CPU, memory and I/O requests, node affinity preferences for specific node characteristics, inter-pod affinity and anti-affinity rules that determine whether the pods should be distributed across multiple worker nodes or require colocation, taints and tolerations that dictate scheduling constraints, and Quality of Service classifications that influence scheduling priority.

The scheduling process operates through a two-phase approach. During the filtering phase, the scheduler identifies all worker nodes that could potentially host the pod by eliminating those that don’t meet the basic requirements. The scoring phase then ranks all feasible worker nodes using scoring algorithms to determine the optimal placement, ultimately selecting the highest-scoring node for pod assignment.

Default implementation of kube-scheduler

kube-scheduler serves as the Kubernetes default scheduler. This scheduler operates on a pod-by-pod basis, treating each scheduling decision as an independent operation without consideration for the broader application context.When kube-scheduler processes scheduling requests, it follows a continuous workflow. The API server is monitored for newly created pods awaiting node assignment, applies filtering logic to eliminate unsuitable worker nodes, executes its scoring algorithm to rank the remaining candidates, binds the selected pod to the optimal node, and repeats the process with the next unscheduled pod in the queue.This individual pod scheduling approach works well for microservices and web applications where each pod has fewer interdependencies. However, this design creates significant challenges when applied to distributed big data frameworks like Spark that require coordinated scheduling of multiple interdependent pods.

Challenges using kube-scheduler for batch jobs

Batch processing workloads, particularly those built on Spark, present different scheduling requirements that expose limitations in kube-scheduler algorithm. Such applications consist of multiple pods that must operate as a cohesive unit, yet kube-scheduler lacks the application-level awareness necessary to handle coordinated scheduling requirements.

Gang scheduling challenges

The most significant challenge emerges from the need for gang scheduling, where all components of a distributed application must be scheduled simultaneously. A typical Spark application requires a driver pod and multiple executor pods running in parallel to function correctly. Without YuniKorn, kube-scheduler first schedules the driver pod without knowing the total amount of resources that the driver and executors will need together. When the driver pod starts running, it attempts to spin up the required executor pods but might fail to find sufficient resources in the cluster. This sequential approach can result in the driver being scheduled successfully while some or all executor pods remain in a pending state due to insufficient cluster capacity.This partial scheduling creates a problematic scenario where the application consumes cluster resources but can’t execute meaningful work. The partially scheduled application will hold onto allocated resources indefinitely while waiting for the missing components, preventing other applications from utilizing those resources and resulting in a deadlock situation.

Resource fragmentation issues

Resource fragmentation represents another critical issue that emerges from individual pod scheduling. When multiple batch applications compete for cluster resources, the lack of coordinated scheduling leads to scenarios where sufficient total resources exist for a given application, but they become fragmented across multiple incomplete applications. This fragmentation prevents efficient resource utilization and can leave applications in perpetual pending states.

The absence of hierarchical queue management further compounds these challenges. kube-scheduler provides limited support for hierarchical resource allocation, making it difficult to implement fair sharing policies across different tenants. Organizations can’t easily establish resource quotas that guarantee minimum allocations while setting maximum limits, nor can they implement preemption policies that allow higher-priority jobs to reclaim resources from lower-priority workloads.

The Need for YuniKorn

YuniKorn addresses these batch scheduling limitations through a set of features designed for distributed computing workloads. Unlike the pod-centric approach of kube-scheduler, YuniKorn operates with application-level awareness, understanding the relationships between different components of distributed applications and making scheduling decisions accordingly. The features are as follows:

  • Gang scheduling for atomic application deployment – Gang scheduling represents YuniKorn’s advantage for batch workloads. This capability makes sure pods belonging to an application are scheduled atomically—either all components receive node assignments, or none are scheduled until sufficient resources become available. YuniKorn’s all-or-nothing approach to scheduling minimizes resource deadlocks and partial application failures that impact kube-scheduler based deployments, resulting in more predictable job execution and higher completion rates.
  • Hierarchical queue management and resource organization – YuniKorn’s queue management system provides the hierarchical resource organization that enterprise batch processing environments require. Organizations can establish multi-level queue structures that mirror their organizational hierarchy, implementing resource quotas at each level to facilitate fair resource distribution. The scheduler supports guaranteed resource allocations that provide minimum resource commitments and maximum limits that prevent a single queue from monopolizing cluster resources.
  • Dynamic resource preemption based on priority – The preemption capabilities built into YuniKorn enable dynamic resource reallocation based on job priorities and queue policies. When higher-priority applications require resources currently allocated to lower-priority workloads, YuniKorn can gracefully stop lower-priority pods and reallocate their resources, making sure critical jobs receive the resources they need without manual intervention.
  • Intelligent resource pooling and fair share distribution – Resource pooling and fair share scheduling further enhance YuniKorn’s effectiveness for batch workloads. Rather than treating each scheduling decision in isolation, YuniKorn considers the broader resource allocation landscape, implementing fair-share algorithms that facilitate equitable resource distribution across different applications and users while maximizing overall cluster utilization.

These features add to the existing capabilities of Amazon EMR on EKS by establishing an enhanced environment in which the unique requirements of distributed computing workloads are satisfied.

Solution overview

Consider HomeMax, a fictitious company operating a shared Amazon EMR on EKS cluster where three teams regularly submit Spark jobs with distinct characteristics and priorities:

  • Analytics team – Runs time-sensitive customer analysis jobs requiring immediate processing for business decisions
  • Marketing team – Executes large overnight batch jobs for campaign optimization with predictable resource patterns
  • Data science team – Runs experimental workloads with varying resource needs throughout the day for model development and research

Without proper resource scheduling, these teams face common challenges: resource contention, job failures due to partial scheduling, and inability to guarantee SLAs for critical workloads.For our YuniKorn demonstration, we configured an Amazon EMR on EKS cluster with the following specifications:

  • Amazon EKS cluster: Four worker nodes using m5.2xlarge Amazon Elastic Compute Cloud (Amazon EC2) instances
  • Per-node resources: 8 vCPUs, 32 GiB memory
  • Total cluster capacity: 32 vCPU cores and 128 GiB memory
  • Available for Spark: Approximately 30 vCPUs and approximately 120 GiB memory (after system overhead)
  • Kubernetes version: 1.30+ (required for YuniKorn 1.6.x compatibility)

The following code shows the node group configuration:

# EKS Node Group specification
NodeGroup:
  InstanceTypes:
    - m5.2xlarge
  ScalingConfig:
    MinSize: 4
    DesiredSize: 4
    MaxSize: 4
  DiskSize: 20
  AmiType: AL2023_x86_64_STANDARD

We intentionally use a fixed-capacity cluster to provide a controlled environment that showcases YuniKorn’s scheduling capabilities with consistent, predictable resources. This approach makes resource contention scenarios more apparent and demonstrates how YuniKorn resolves them.

Amazon EMR on EKS offers robust scaling capabilities through Karpenter. The principles demonstrated in this fixed environment apply equally to dynamic environments, where YuniKorn’s capabilities complement the scaling features of Amazon EMR on EKS to optimize resource utilization during peak demand periods or when scaling limits are reached.

The following diagram shows the high-level architecture of the YuniKorn scheduler running on Amazon EMR on EKS. This solution also includes a secure bastion host not shown in the architecture diagram that provides access to the EKS cluster via AWS Systems Manager (SSM) Session Manager. The bastion host is deployed in a private subnet with all necessary tools pre-installed with proper permissions for seamless cluster interaction.

In the following sections, we explore YuniKorn’s queue architecture optimized for this use case. We examine various demonstration scenarios, including gang scheduling, queue-based resource management, priority-based preemption, and fair share distribution. We walk through the process of deploying an Amazon EMR on EKS cluster, implementing the YuniKorn scheduler, configuring the specified queues, and submitting Spark jobs to showcase these scenarios.

YuniKorn integration on Amazon EMR on EKS

The integration involves three key components working together: the Amazon EMR on EKS virtual cluster configuration, YuniKorn’s admission webhook system, and job-level queue annotations.

Namespace and virtual cluster foundation

The integration begins with a dedicated Kubernetes namespace where your Amazon EMR on EKS jobs will run. In our demonstration, we use the emr namespace, created as a standard Kubernetes namespace:

apiVersion: v1
kind: Namespace
metadata:
  name: emr

The Amazon EMR on EKS virtual cluster is configured to deploy all jobs within this specific namespace. When creating the virtual cluster, you specify the namespace in the container provider configuration:

aws emr-containers create-virtual-cluster \
    --name "emr-on-eks-cluster-v" \
    --container-provider "{
        \"id\": \"my-eks-cluster\",
        \"type\": \"EKS\",
        \"info\": {
            \"eksInfo\": {
                \"namespace\": \"emr\"
            }
        }
    }"

This configuration makes sure all jobs submitted to this virtual cluster will be deployed in the emr namespace, establishing the foundation for YuniKorn integration.

The YuniKorn interception mechanism

When YuniKorn is installed using Helm, it automatically registers a MutatingAdmissionWebhook with the Kubernetes API server. This webhook acts as an interceptor that monitors pod creation events in your designated namespace. The webhook registration tells Kubernetes to call YuniKorn whenever pods are created in the emr namespace:

# YuniKorn registers this webhook configuration
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingAdmissionWebhook
rules:
- operations: ["CREATE"]
  resources: ["pods"]
  namespaces: ["emr"]  # Intercepts pods in EMR namespace

This webhook is triggered by any pod creation in the emr namespace, not specifically by YuniKorn annotations. However, the webhook’s logic only modifies pods that contain YuniKorn queue annotations, leaving other pods unchanged.

End-to-end job flow

When you submit a Spark job through the Spark Operator, the following sequence occurs:

  1. Your Spark job includes YuniKorn queue annotations on both driver and executor pods:
driver:
  annotations:
    yunikorn.apache.org/queue: "root.analytics-queue"
executor:
  annotations:
    yunikorn.apache.org/queue: "root.analytics-queue"
  1. The Spark Operator processes your SparkApplication and creates individual Kubernetes pods for the driver and executors. These pods inherit the YuniKorn annotations from your job template.
  2. When the Spark Operator attempts to create pods in the emr namespace, Kubernetes calls YuniKorn’s admission webhook. The webhook examines each pod and performs the following actions:
    1. Detects pods with yunikorn.apache.org/queue annotations.
    2. Adds schedulerName: yunikorn to those pods.
    3. Leaves pods without YuniKorn annotations unchanged.

This interception means you don’t need to manually specify schedulerName: yunikorn in your Spark jobs—YuniKorn claims the pods transparently based on the presence of queue annotations.

  1. The YuniKorn scheduler receives the scheduling requests and applies the queue placement rules configured in the YuniKorn ConfigMap:
placementrules:
  - name: provided    # Uses the annotation value
    create: false.    # Doesn’t create the queue if not present
  - name: fixed       # Fallback to root.default queue
    value: root.default

The provided rule reads the yunikorn.apache.org/queue annotation and places the job in the specified queue (for example, root.analytics-queue). YuniKorn then applies gang scheduling logic, holding all pods until sufficient resources are available for the entire application, preventing the partial scheduling issues that come with kube-scheduler.

  1. After YuniKorn determines that all pods can be scheduled according to the queue’s resource guarantees and limits, it schedules all driver and executor pods. The Spark job begins execution with the guaranteed resource allocation defined in the queue configuration.

The combination of namespace-based virtual cluster configuration, admission webhook interception, and annotation-driven queue placement creates an integration that transforms Amazon EMR on EKS job scheduling without disrupting existing workflows.

YuniKorn queue architecture

To demonstrate the various YuniKorn features described in the next section, we configured three job-specific queues and a default queue representing our enterprise teams with carefully balanced resource allocations:

# Analytics Queue - Time-sensitive workloads
analytics-queue:
  guaranteed: 10 vCPUs, 38GB memory (30% of cluster)
  max: 24 vCPUs, 96GB memory (80% burst capacity)
  priority: 100 (highest)
  policy: FIFO (predictable scheduling)
# Marketing Queue - Large batch jobs
marketing-queue:
  guaranteed: 8 vCPUs, 32GB memory (25% of cluster)
  max: 24 vCPUs, 96GB memory (80% burst capacity)
  priority: 75 (medium)
  policy: Fair Share (balanced resource distribution)
# Data Science Queue - Experimental workloads
datascience-queue:
  guaranteed: 6 vCPUs, 26GB memory (20% of cluster)
  max: 24 vCPUs, 96GB memory (80% burst capacity)
  priority: 50 (lower)
  policy: Fair Share (experimental workload balancing)
# Default Queue - Fallback for unmatched jobs
default:
  guaranteed: 6 vCPUs, 26GB memory (20% of cluster)
  max: 24 vCPUs, 96GB memory (80% burst capacity)
  priority: 25 (lowest)
  policy: FIFO (predictable job submission)

Demonstration scenarios

This section outlines key YuniKorn scheduling capabilities and their corresponding Spark job submissions. These scenarios demonstrate guaranteed resource allocation and burst capacity usage. Guaranteed resources represent minimum allocations that queues can always access, but jobs might exceed these allocations when additional cluster capacity is available. The marketing-job specifically demonstrates burst capacity usage beyond its guaranteed allocation.

  • Gang scheduling – In this scenario, we submit analytics-job.py (analytics-queue, 9 total cores) and marketing-job.py (marketing-queue, 17 total cores) simultaneously. YuniKorn makes sure all pods for each job are scheduled atomically, preventing partial resource allocation that could cause job failures in our resource-constrained cluster.
  • Queue-based resource management – We run all three jobs concurrently to observe guaranteed resource allocation. YuniKorn distributes remaining capacity proportionally based on queue weights and maximum limits.
    • analytics-job.py (analytics-queue) receives guaranteed 10 vCPUs and 38 GB memory.
    • marketing-job.py (marketing-queue) receives guaranteed 8 vCPUs and 32 GB memory.
    • datascience-job.py (datascience-queue) receives guaranteed 6 vCPUs and 26 GB memory.
  • Priority-based preemption – We start datascience-job.py (datascience-queue, priority 25) and marketing-job.py (marketing-queue, priority 50) consuming cluster resources, then submit high-priority analytics-job.py (analytics-queue, priority 100). YuniKorn preempts lower-priority jobs to make sure the time-sensitive analytics workload gets its guaranteed resources, maintaining SLA compliance.
  • Fair share distribution – We submit multiple jobs to each queue when all queues have available capacity. YuniKorn applies configured fair share policies within queues—the analytics queue uses First In, First Out (FIFO) method for predictable scheduling, and the marketing and data science queues use fair sharing method for balanced resource distribution.

Source code

You can find the codebase in the AWS Samples GitHub repository.

Prerequisites

Before you deploy this solution, make sure the following prerequisites are in place:

Set up the solution infrastructure

Complete the following steps to set up the infrastructure:

  1. Clone the repository to your local machine and set the two environment variables. Replace <AWS_REGION> with the AWS Region where you want to deploy these resources.
git clone https://github.com/aws-samples/sample-emr-eks-yunikorn-scheduler.git
cd sample-emr-eks-yunikorn-scheduler
export REPO_DIR=$(pwd)
export AWS_REGION=<AWS_REGION>
  1. Execute the following script to create the infrastructure:
cd $REPO_DIR/infrastructure
./setup-infra.sh
  1. To verify successful infrastructure deployment, open the AWS CloudFormation console, choose your stack, and check the Events, Resources, and Outputs tabs for completion status, details, and list of resources created.

Deploy YuniKorn on Amazon EMR on EKS

Run the following script to deploy the Yunikorn helm chart and update the configmap with the queues and placement rules:

cd $REPO_DIR/yunikorn/
./setup-yunikorn.sh

Establish EKS cluster connectivity

Complete the following steps to establish secure connectivity to your private EKS cluster:

  1. Execute the following script in a new terminal window. This script establishes port forwarding through the bastion host to make your private EKS cluster accessible from your local machine. Keep this terminal window open and running throughout your work session. The script maintains the connection to your EKS cluster.
export REPO_DIR=$(pwd)
export AWS_REGION=<AWS_REGION>
cd $REPO_DIR/port-forward
./eks-connect.sh --start
  1. Test kubectl connectivity in the main terminal window to verify that you can successfully communicate with the EKS cluster. You should see the EKS worker nodes listed, confirming that the port forwarding is working correctly.

kubectl get nodes

Verify successful YuniKorn deployment

Complete the following steps to verify a successful deployment:

  1. List all Kubernetes objects in the yunikorn namespace:

kubectl get all -n yunikorn

You will see details like the following screenshot.

  1. Check the YuniKorn scheduler logs for configuration loading and look for queue configuration messages:
kubectl logs -n yunikorn deployment/yunikorn-scheduler --tail=50
kubectl logs -n yunikorn deployment/yunikorn-scheduler | grep -i queue
  1. Access the YuniKorn web UI by navigating to http://127.0.0.1:9889 in your browser. Port 9889 is the default port for the YuniKorn web UI.
# macOS
open http://127.0.0.1:9889
# Linux
xdg-open http://127.0.0.1:9889
# Windows
start http://127.0.0.1:9889

The following screenshots show the YuniKorn web UI with queues but no running applications.

Run Spark jobs with YuniKorn on Amazon EMR on EKS

Complete the following steps to run Spark jobs with YuniKorn on Amazon EMR on EKS:

  1. Execute the following script to set up the Spark jobs environment. The script uploads PySpark scripts to Amazon Simple Storage Service (Amazon S3) bucket locations and creates ready-to-use YAML files from templates.
cd $REPO_DIR/spark-jobs
./setup-spark-jobs.sh
  1. Submit analytics, marketing, and data science Spark jobs using the following commands. YuniKorn will place the jobs in their respective queues and allocate resources to execution. Refer to Using YuniKorn as a custom scheduler for Apache Spark on Amazon EMR on EKS for supported job submission methods with YuniKorn as a custom scheduler.
kubectl apply -f spark-operator/analytics-job.yaml
kubectl apply -f spark-operator/marketing-job.yaml
kubectl apply -f spark-operator/datascience-job.yaml
  1. Review the previous section describing different demonstration scenarios and submit the Spark jobs using various combinations to see YuniKorn scheduler’s capabilities in action. We encourage you to adjust the cores, instances, and memory parameters and explore the scheduler’s behavior by executing the jobs. We also encourage you to modify the queues’ guaranteed and max capacities in the file yunikorn/queue-config-provided.yaml, apply the changes, and submit jobs to further understand Yunikorn scheduler behavior under various circumstances.

Clean up

To avoid incurring future charges, complete the following steps to delete the resources you created:

  1. Stop the port forwarding sessions:
cd $REPO_DIR/port-forwarding
./eks-connect.sh --stop
  1. Remove all created AWS resources:
cd $REPO_DIR
./cleanup.sh

Conclusion

YuniKorn addresses the scheduling limitations of default kube-scheduler while running Spark workloads on Amazon EMR on EKS through gang scheduling, intelligent queue management, and priority-based resource allocation. This post showed how YuniKorn’s queue system enables better resource utilization, prevents job failure due to poor allocation of resources, and supports multi-tenant environments.

To get started with YuniKorn on Amazon EMR on EKS, explore the Apache YuniKorn documentation for implementation guides, review Amazon EMR on EKS best practices for optimization strategies, and engage with the YuniKorn community for ongoing support.


About the authors

Suvojit Dasgupta is a Principal Data Architect at Amazon Web Services. He leads a team of skilled engineers in designing and building scalable data solutions for diverse customers. He specializes in developing and implementing innovative data architectures to address complex business challenges.

Peter Manastyrny is a Senior Product Manager at AWS Analytics. He leads Amazon EMR on EKS, a product that makes it straightforward and efficient to run open-source data analytics frameworks such as Spark on Amazon EKS.

Matt Poland is a Senior Cloud Infrastructure Architect at Amazon Web Services. He is passionate about solving complex problems and delivering well-structured solutions for diverse customers. His expertise spans across a range of cloud technologies, providing scalable and reliable infrastructure tailored to each project’s unique challenges.

Gregory Fina is a Principal Startup Solutions Architect for Generative AI at Amazon Web Services, where he empowers startups to accelerate innovation through cloud adoption. He specializes in application modernization, with a strong focus on serverless architectures, containers, and scalable data storage solutions. He is passionate about using generative AI tools to orchestrate and optimize large-scale Kubernetes deployments, as well as advancing GitOps and DevOps practices for high-velocity teams. Outside of his customer-facing role, Greg actively contributes to open source projects, especially those related to Backstage.

How Ancestry optimizes a 100-billion-row Iceberg table

Post Syndicated from Thomas Cardenas original https://aws.amazon.com/blogs/big-data/how-ancestry-optimizes-a-100-billion-row-iceberg-table/

This is a guest post by Thomas Cardenas, Staff Software Engineer at Ancestry, in partnership with AWS.

Ancestry, the global leader in family history and consumer genomics, uses family trees, historical records, and DNA to help people on their journeys of personal discovery. Ancestry has the largest collection of family history records, consisting of 40 billion records. They serve more than 3 million subscribers and have over 23 million people in their growing DNA network. Their customers can use this data to discover their family story.

Ancestry is proud to connect users with their families past and present. They help people learn more about their own identity by learning about their ancestors. Users build a family tree through which we surface relevant records, historical documents, photos, and stories that might contain details about their ancestors. These artifacts are surfaced through Hints. The Hints dataset is one of the most interesting datasets at Ancestry. It’s used to alert users that potential new information is available. The dataset has multiple shards, and there are currently 100 billion rows being used by machine learning models and analysts. Not only is the dataset large, it also changes rapidly.

In this post, we share the best practices that Ancestry used to implement an Apache Iceberg-based hints table capable of handling 100 billion rows with 7 million hourly changes. The optimizations covered here resulted in cost reductions of 75%.

Overview of solution

Ancestry’s Enterprise Data Management (EDM) team faced a critical challenge—how to provide a unified, performant data ecosystem that could serve diverse analytical workloads across financial, marketing, and product analytics teams. The ecosystem needed to support everything from data scientists training recommendation models to geneticists developing population studies—all requiring access to the same Hints data.

The ecosystem around Hints data had been developed organically, without a well-defined architecture. Teams independently accessed Hints data through direct service calls, Kafka topic subscriptions, or warehouse queries, creating significant data duplication and unnecessary system load. To reduce cost and improve performance, EDM implemented a centralized Apache Iceberg data lake on Amazon Simple Storage Service (Amazon S3), with Amazon EMR providing the processing power. This architecture, shown in the following image, creates a single source of truth for the Hints dataset while using Iceberg’s ACID transactions, schema evolution, and partition evolution capabilities to handle scale and update frequency.

End-to-end AWS analytics architecture showcasing data movement from Fargate through MSK, EMR, to S3 data lake with Glue Catalog

Hints table management architecture

Managing datasets exceeding one billion rows presents unique challenges, and Ancestry faced this challenge with the trees collection of 20–100 billion rows across multiple tables. At this scale, dataset updates require careful execution to control costs and prevent memory issues. To solve these challenges, EDM chose Amazon EMR on Amazon EC2 running Spark to write Iceberg tables on Amazon S3 for storage. With large and steady Amazon EMR workloads, running the clusters on Amazon EC2, as opposed to Serverless, proved cost effective. EDM has scheduled an Apache Spark job to run every hour on their Amazon EMR on EC2. This job uses the merge operation to update the Iceberg table with recently changed rows. Performing updates like this on such a large dataset can easily lead to runaway costs and out-of-memory errors.

Key optimization techniques

The engineers needed to enable fast, row-level updates without impacting query performance or incurring substantial cost. To achieve this, Ancestry used a combination of partitioning strategies, table configurations, Iceberg procedures, and incremental updates. The following is covered in detail:

  • Partitioning
  • Sorting
  • Merge-on-read
  • Compaction
  • Snapshot management
  • Storage-partitioned joins

Partitioning strategy

Developing an effective partitioning strategy was crucial for the 100-billion-row Hints table. Iceberg supports various partition transforms including column value, temporal functions (year, month, day, hour), and numerical transforms (bucket, truncate). Following AWS best practices, Ancestry carefully analyzed query patterns to identify a partitioning approach that would support these queries while balancing these two competing considerations:

  • Too few partitions would force queries to scan excessive data, degrading performance and increasing costs.
  • Too many partitions would create small files and excessive metadata, causing management overhead and slower query planning. It’s generally best to avoid parquet files smaller than 100 MB.

Through query pattern analysis, Ancestry discovered that most analytical queries filtered on hint status (particularly pending status) and hint type. This insight led us to implement a two-level partitioning strategy-first on status and then on type, which dramatically reduced the amount of data scanned during typical queries.

Sorting

To further optimize query performance, Ancestry implemented strategic data organization within partitions using Iceberg’s sort orders. While Iceberg doesn’t maintain perfect ordering, even approximate sorting significantly improves data locality and compression ratios.

For the Hints table with 100 billion rows, Ancestry faced a unique challenge: the primary identifiers (PersonId and HintId) are high-cardinality numeric columns that would be prohibitively expensive to sort completely. The solution uses Iceberg’s truncate transform function to support sorting on just a portion of the number, effectively creating another partition by grouping a collection of IDs together. For example, we can specify truncate(100_000_000, hintId) to create groups of 100 million hint IDs, greatly improving the performance of queries that specify that column.

Merge on read

With 7 million changes to the Hints table occurring hourly, optimizing write performance became critical to the architecture. In addition to making sure queries performed well, Ancestry also needed to make sure our frequent updates would perform well in both time and cost. It was quickly discovered that the default copy-on-write (CoW) strategy, which copies an entire file when any part of it changes, was too slow and expensive for their use case. Ancestry was able to get the performance we needed by instead specifying the merge-on-read (MoR) update strategy, which maintains new information in diff files that are reconciled on read. The large updates that happen every hour led us to choose faster updates at the cost of slower reads.

File compaction

The frequent updates mean files are constantly needing to be re-written to maintain performance. Iceberg provides the rewrite_data_files procedure for compaction, but default configurations proved insufficient for our scale. Leaving the default configuration in place, the rewrite operation wrote to five partitions at a time and didn’t meet our performance objective. We found that increasing the concurrent writes improved performance. We used the following set of parameters, setting a relatively high max-concurrent-file-group-rewrites value of 100 to more efficiently deal with our thousands of partitions. The default of rewriting only one file at a time couldn’t keep up with the frequency of our updates.

CALL datalake.system.rewrite_data_files(
  table => ‘database.table’, 
  strategy => ‘binpack’, 
  options => map (
    'max-concurrent-file-group-rewrites','100',
    'partial-progress.enabled','true',
    'rewrite-all','true'
  )
)

Key optimizations in Ancestry’s approach include:

  • High concurrency: We increased max-concurrent-file-group-rewrites from the default 5 to 100, enabling parallel processing of our thousands of partitions. This increased compute costs but was necessary to help ensure that the jobs finished.
  • Resilience at scale: We enabled partial-progress to create compaction checkpoints, essential when operating at our scale where failures are particularly costly.
  • Comprehensive delta elimination: Setting rewrite-all to true helps ensure that both data files and delete files are compacted, preventing the accumulation of delete files. By default, the delete files created as part of this strategy aren’t re-written and would continue to accumulate, slowing queries.

We arrived at these optimizations through successive trials and evaluations. For example, with our very large dataset, we discovered that we could use a WHERE clause to limit re-writes to a single partition. Based on the partitions, we see varied execution times and resource utilization. For some partitions, we needed to reduce concurrency to avoid running into out of memory errors.

Snapshot management

Iceberg tables maintain snapshots to preserve the history of the table, allowing you to time travel through the changes. As these snapshots accrue, they add to storage costs and degrade performance. This is why maintaining an Iceberg table requires you to periodically call the expire_snapshots procedure. We found we needed to enable concurrency for snapshot management so that it would complete in a timely manner:

CALL datalake.system.expire_snapshots(
        table => '`database`.table', 
        retain_last => 1, 
        max_concurrent_deletes => 20)

Consider how to balance performance, cost, and the need to keep historical records depending on your use case. When you do so, note that there is a table-level setting for maximum snapshot age which can override the retain_last parameter and retain only the active snapshot.

Reducing shuffle with Storage-Partitioned Joins

We use Storage-Partitioned Joins (SPJ) in Iceberg tables to minimize expensive shuffles during data processing. SPJ is an advanced Iceberg feature (available in Spark 3.3 or later with Iceberg 1.2 or later) that uses the physical storage layout of tables to eliminate shuffle operations entirely. For our Hints update pipeline, this optimization was transformational.

SPJ is especially useful during MERGE INTO operations, where datasets have identical partitioning. Proper configuration helps ensure effective use of SPJ to optimize joins.

SPJ has a few requirements such as both tables must be Iceberg partitioned the same way and joined on the partition key. Then Iceberg will know that it doesn’t have to shuffle the data when the tables are loaded. This even works when there are a different number of partitions on either side.

Updates to the Hints database are first staged in the Hint Changes database where data is transformed from the original Kafka data format into how it will look in the target (Hints) table. This is a temporary Iceberg table where we are able to perform audits using Write-Audit-Publish (WAP) pattern. In addition to using the WAP pattern we are able to use the SPJ functionality.

Technical workflow showing AWS data processing pipeline with following sequence: Amazon MSK starting point Parallel paths to: Hint changes in S3 (Apache Iceberg) Hint backups in S3 (Apache Iceberg) Stage hourly updates via EMR Cluster Staging table in S3 (Apache Iceberg) EMR hourly table maintenance jobs Final hints table in S3 (Apache Iceberg)

The Hints data pipeline

Reducing full-table scans

Another strategy to reduce shuffle is minimizing the data involved in joins by dynamically pushing down filters. In production, these filters vary between batches, so a multi-step operation is often necessary for setting up merges. The following example code first limits its scope by setting minimum and maximum values for the ID, then performs an update or delete to the target table depending on whether a target value exists.

val stats: Dataset[Row] = session.read.table("catalog.database.source")
  .agg(
    min(col("id")).as("min_value"),
    max(col("id")).as("max_value")
)

val statRow: Row = stats.head
val minId: String = statRow.getInt(0)
val maxId: String = statRow.getInt(1)

session.sql(s"""
  MERGE INTO catalog.database.target t
    USING (SELECT * FROM catalog.database.source) s
  ON (t.id BETWEEN $minId AND $maxId)
    AND (t.id = s.id)
  WHEN MATCHED
    THEN UPDATE SET *
  WHEN NOT MATCHED
    THEN INSERT *
""")

This technique reduces cost in several ways: the bounded merge reduces the number of affected rows, it allows for predicate pushdown optimization, which filters at the storage layer, and it reduces shuffle operations when compared with a join.

Additional insights

Apart from the Hints table, we have implemented over 1,000 Iceberg tables in our data ecosystem. The following are some key insights that we observed:

  • Updating a table using MERGE is typically the most expensive action, so this is where we spent the most time optimizing. It was still our best option.
  • Using complex data types can help co-locate similar data in the table.
  • Monitor costs of each pipeline because while following good practice you can stumble across things you miss that are causing costs to increase.

Conclusion

Organizations can use Apache Iceberg tables on Amazon S3 with Amazon EMR to manage massive datasets with frequent updates. Many customers will be able to achieve excellent performance with a low maintenance burden by using the AWS Glue table optimizer for automatic, asynchronous compaction. Some customers, like Ancestry, will require custom optimizations of their maintenance procedures to meet their cost and performance goals. These customers should start with a careful assessment of query patterns to develop a partitioning strategy to minimize the amount of data that needs to be read and processed. Update frequency and latency requirements will dictate other choices, like whether merge-on-read or copy-on-write is the better strategy.

If your organization faces similar challenges with high volumes of data requiring frequent updates, you can use a combination of Apache Iceberg’s advanced features with AWS services like Amazon EMR Serverless, Amazon S3, and AWS Glue to build a truly modern data lake that delivers the scale, performance, and cost-efficiency you need.

Further reading


About the authors

Thomas Cardenas

Thomas Cardenas

Thomas is a Staff Software Engineer at Ancestry. He focuses on building data lake infrastructure and improving data quality for financial reporting and analytics. He loves building the technical foundations that help millions of people discover their family history.

Robert Fisher

Robert Fisher

Robert is an AWS Sr. Solutions Architect. He has over twenty years experience designing software solutions and leading software engineering teams. He is passionate about helping customers use technology to achieve their business objectives.

Harsh Vardan

Harsh Vardan

Harsh is an AWS Solutions Architect, specializing in big data and analytics. He has a decade of experience working in the field of data science. He is passionate about helping customers adopt best practices and discover insights from their data.

Improve Amazon EMR HBase availability and tail latency using generational ZGC

Post Syndicated from Vishal Chaudhary original https://aws.amazon.com/blogs/big-data/improve-amazon-emr-hbase-availability-and-tail-latency-using-generational-zgc/

At Amazon EMR, we constantly listen to our customers’ challenges with running large-scale Amazon EMR HBase deployments. One consistent pain point that kept emerging is unpredictable application behavior due to garbage collection (GC) pauses on HBase. Customers running critical workloads on HBase were experiencing occasional latency spikes due to varying GC pauses, particularly impacting when they occurred during peak business hours.

To reduce this unpredictable impact to business-critical applications running on HBase, we turn to Oracle’s Z Garbage Collector (ZGC), specifically it’s generational support introduced in JDK 21. Generational ZGC delivers consistent sub-millisecond pause times that dramatically reduce tail latency.

In this post, we examine how unpredictable GC pauses affect business-critical workloads, benefits of enabling generational ZGC in HBase. We also cover additional GC tuning techniques to improve the application throughput and reduce tail latency. Amazon EMR 7.10.0 introduces new configuration parameters that allow you to seamlessly configure and tune the garbage collector for HBase RegionServers.

By incorporating generational collection into ZGC’s ultra-low pause architecture, it efficiently handles both short-lived and long-lived objects, making it exceptionally well-suited to HBase’s workload characteristics:

  • Handling mixed object lifetimes – HBase operations create a mix of short-lived objects (such as temporary buffers for read/write operations) and long-lived objects (such as cached data blocks and metadata). Generational ZGC can efficiently manage both, reducing overall GC frequency and impact.
  • Adapting to workload patterns – As workload patterns change throughout the day — for instance, from write-heavy ingestion to read-heavy analytics — generational ZGC adapts its collection strategy, maintaining optimal performance.
  • Scaling with heap size – As data volumes grow and HBase clusters require larger heaps, generational ZGC maintains it’s sub-millisecond pause times, providing consistent performance even as you scale up.

Understanding the impact of GC pauses on HBase

When running HBase RegionServers, the JVM heap can accumulate a large number of objects, both short-lived (temporary objects created during operations) and long-lived (cached data, metadata). Traditional garbage collectors like Garbage-First Garbage Collector (G1 GC) need to pause application threads during certain phases of garbage collection, particularly during “stop-the-world” (STW) events. GC pauses can have several impacts on HBase :

  • Latency spikes – GC pauses introduce latency spikes, often impacting tail latencies (p99.9 and p99.99) of the application which can lead to timeout for client requests and inconsistent response times..
  • Application availability – All application threads are halted during STW events and it negatively impacts overall application availability.
  • RegionServer failures – If GC pauses exceed the configured ZooKeeper session timeout, they might lead to RegionServer failures.

HBase RegionServer reports whenever there is an unusually long GC pause time using the JvmPauseMonitor. The following log entry shows an example of GC pauses reported by HBase RegionServer. During YCSB benchmarking, G1 GC exhibited 75 such pauses over a 7-hour period, whereas generational ZGC showed no long pauses under identical workload and testing conditions.

INFO  [JvmPauseMonitor] util.JvmPauseMonitor: Detected pause in JVM or host machine (eg GC): pause of approximately 2839ms
INFO  [JvmPauseMonitor] util.JvmPauseMonitor: Detected pause in JVM or host machine (eg GC): pause of approximately 3021ms

G1 GC pauses are proportional to the pressure on the heap and the object allocation patterns. As a result, the pauses might get worse if the heap is under too much load, whereas generational ZGC maintains it’s pause times goals even under high pressure.

Pause time and availability (uptime) comparison: Generational ZGC vs. G1GC in Amazon EMR HBase

Our testing revealed significant differences in GC pause time between the generational ZGC and G1 GC for HBase on Amazon EMR 7.10. We used 1 m5.4xlarge (primary), 5 m5.4xlarge (core) nodes cluster settings and ran multiple iterations of 1-billion rows YCSB workloads to compare the GC pauses and uptime percentage. Based on our test cluster, we observed a GC pause time improvement from over 1 minute, 24 seconds, to under 1 seconds for over an hour-long execution, improving the application uptime from 98.08% to 99.99%.

We conducted extensive performance testing comparing G1 GC and generational ZGC on HBase clusters running on Amazon EMR, using the default heap settings automatically configured based on Amazon Elastic Compute Cloud (Amazon EC2) instance type. The following image shows the comparison in both GC pause time and uptime percentage at a peak load of 3,00,000 requests per second (data sampled over 1 hour).

Side-by-side comparison of Java garbage collectors showing Generational ZGC's superior pause time and uptime metrics versus G1GC

The following figures show the breakdown of the 1-hour runtime in 10-minute intervals. The left vertical axis measures the uptime, the right vertical axis measures the GC pause time, and the horizontal axis shows the interval. The generational ZGC maintained consistent uptime and pause time in milliseconds, and G1 GC demonstrated inconsistent and decreased uptime, pause times in seconds.

G1GC performance chart with dual y-axes: uptime percentage bars declining from 99.72% to 99.31%, and pause time trend peaking at 14.6s

Generational ZGC performance visualization with consistent uptime above 99.98% and fluctuating pause times peaking at 93ms

Tail latency comparison: Generational ZGC vs. G1GC in Amazon EMR HBase

One of the most compelling advantages of generational ZGC over G1 GC is its predictable garbage collection behavior and the impact on application tail latency. G1 GC’s collection triggers are non-deterministic, meaning pause times can vary significantly and occur at unpredictable intervals. These unexpected pauses, though generally manageable, can create latency spikes that particularly affect the slowest percentile of operations. In contrast, generational ZGC maintains consistent, sub-millisecond pause times throughout its operation. This predictability proves crucial for applications requiring stable performance, especially at the highest percentiles of latency (99.9th and 99.99th percentiles). Our YCSB benchmark testing reveals the real-world impact of these different approaches. The following graph illustrates tail latency distribution between G1 GC and generational ZGC over a 2-hour sampling period :

Dual violin plot visualization comparing garbage collector latency distributions, demonstrating Generational ZGC's superior performance with lower mean latencies and tighter distribution

Improvements to BucketCache

BucketCache is an off-heap cache in HBase that is used to cache the frequently accessed data blocks and minimize disk I/O. Bucket cache and heap memory works in conjunction and might increase the contention on the heap depending on the workload. Generational ZGC maintains it’s pause time goals even with a terabyte-sized bucket cache. We benchmarked multiple HBase clusters with varying bucket cache sizes and 32 GB RegionServer heap. The following figures show the peak pause times observed over a 1-hour sampling period, comparing G1 GC and generational ZGC performance.

128GB Bucket Cache performance metrics displaying Generational ZGC's superior pause times and uptime compared to G1GC implementation

Side-by-side performance metrics showing Generational ZGC's 1.1s pause time and 99.97% uptime versus G1GC's longer pauses and lower uptime

Enabling this feature and additional fine-tuning parameters

To enable this feature, follow the configurations mentioned in the Performance Considerations. In the following sections, we discuss additional fine-tuning parameters to tailor the configuration for your specific use case.

Fixed JVM heap 

Batch processing jobs and short-lived applications benefit from dynamic allocation’s ability to adapt to varying input sizes and processing demands when multiple applications co-exist on the same cluster and run with resource constraints. The memory footprint can expand during peak processing and contract when the workload diminishes. However, for production HBase deployments without any co-existing applications in the same fixed heap allocation offers stable, reliable performance.

Dynamic heap allocation is when the JVM flexibly grows and shrinks its memory usage between minimum (-Xms) and maximum (-Xmx) limits based on application needs, returning unused memory to the operating system. However, this flexibility comes at the cost of performance overhead and memory fragmentation. Dynamic allocation seemed flexible, but it created constant disruptions. The JVM was always negotiating with the operating system for memory, leading to performance overhead and fragmentation. On the other hand, fixed heap allocation pre-allocates a constant amount of memory for the JVM at startup and maintains it throughout runtime, providing better performance by reducing memory negotiation overhead with the operating system. To enable this feature, use the following configuration: :

[
    {
        "Classification": "hbase",
        "Properties": {
            "hbase.regionserver.fixed.heap.enabled": "true"
        }
    }
]

Enable pre-touch

Applications with large heaps can experience more significant pauses when the JVM needs to allocate and fault in new memory pages. Pre-touch (-XX:+AlwaysPreTouch) instructs the JVM to physically touch and commit all memory pages during heap initialization, rather than waiting until they’re first accessed during runtime. This early commitment reduces the latency of on-demand page faults and memory mappings that occur when pages are first accessed, resulting in more predictable performance especially during heavy load situations. By pre-touching memory pages at startup, you trade a slightly longer JVM startup time for more consistent runtime performance. To enable pre-touch for your HBase cluster, use the following configuration :

[
    {
        "Classification": "hbase-env",
        "Properties": {},
        "Configurations": [
            {
                "Classification": "export",
                "Properties": {
                    "JAVA_HOME": "/usr/lib/jvm/jre-21",
                    "HBASE_REGIONSERVER_GC_OPTS": "\"-XX:+UseZGC -XX:+ZGenerational -XX:+AlwaysPreTouch\""
                }
            }
        ]
    }
]

Increasing memory mappings for large heaps

Depending on the workload and scale, you might need to increase the Java heap size to accommodate large data in memory. When using the generational ZGC with a large heap setup, it’s critical to also increase the operating system’s memory mapping limit (vm.max_map_count).

When a ZGC-enabled application starts, the JVM proactively checks the system’s vm.max_map_count value. If the limit is too low to support the configured heap, it will issue the following warning :

[warning] The system limit on number of memory mappings per process might be too low for the given
[warning] max Java heap size (131072M). Please adjust /proc/sys/vm/max_map_count to allow for at
[warning] least 235929 mappings (current limit is 65530). Continuing execution with the current
[warning] limit could lead to a premature OutOfMemoryError being thrown, due to failure to map memory.

To increase the memory mappings, use the following configuration and adjust the count value in the command based on the heap size of the application.

echo "vm.max_map_count = 262144" | sudo tee -a /etc/sysctl.conf
sudo sysctl -p

sudo systemctl restart hbase-regionserver

Conclusion

The introduction of generational ZGC and fixed heap allocation for HBase on Amazon EMR marks a significant leap forward in the predictable performance and tail latency reduction. By addressing the long-standing challenges of GC pauses and memory management, these features unlock new levels of efficiency and stability for Amazon EMR HBase deployments. Although the performance improvements vary depending on workload characteristics, you can expect to see significant enhancements in your Amazon EMR HBase clusters’ responsiveness and stability. As data volumes continue to grow and low-latency requirements become increasingly stringent, features like generational ZGC and fixed heap allocation become indispensable. We encourage HBase users on Amazon EMR to enable these features and experience the benefits firsthand. As always, we recommend testing in a staging environment that mirrors your production workload to fully understand the impact and optimize configurations for your specific use case.

Stay tuned for more innovations as we continue to push the boundaries of what’s possible with HBase on Amazon EMR.


About the authors

Vishal Chaudhary is a Software Development Engineer at Amazon EMR. His expertise is in Amazon EMR, HBase and Hive Query Engine. His dedication towards solving distributed system problems is helping Amazon EMR to achieve higher performance improvements.

Ramesh Kandasamy is an Engineering Manager at Amazon EMR. He is a long tenured Amazonian dedicated to solve distributed systems problems.

Achieve low-latency data processing with Amazon EMR on AWS Local Zones

Post Syndicated from Gagan Brahmi original https://aws.amazon.com/blogs/big-data/achieve-low-latency-data-processing-with-amazon-emr-on-aws-local-zones/

Enterprises today require both single-digit millisecond latency data processing and data residency compliance for their applications. By deploying Amazon EMR on AWS Local Zones, organizations can achieve single-digit millisecond latency data processing for applications while maintaining data residency compliance. This post demonstrates how to use AWS Local Zones to deploy EMR clusters closer to your users, enabling millisecond-level response times. We use a Secure Web Gateway as an example and implement Amazon EMR with Apache Flink on AWS Local Zones to process network traffic with ultra-low latency. We also go through the process of creating an EMR cluster on AWS Local Zones, highlighting performance optimizations and architecture considerations specific to edge deployments. This approach uses AWS Local Zones to bring Amazon EMR’s data processing capabilities closer to your users and data sources – ideal for security applications or any other latency-sensitive workloads.

Solution overview

The following diagram illustrates the solution architecture.

Sample Architecture for Secure Web Gateway on AWS LocalZones

The solution consists of several key components:

  • AWS Local Zones deployment – Positioned close to corporate offices to minimize latency
  • Network traffic interception – Using AWS Transit Gateway and virtual private cloud (VPC) endpoints
  • Request queuing and rules streaming – Using Apache Kafka on Amazon Elastic Kubernetes Service (Amazon EKS) to queue the incoming and outgoing network requests as well as stream rules as they are updated by the security administrator
  • EMR cluster – Running Flink for real-time stream processing and functions to combine rules
  • Policy management system – For defining and updating security rules
  • Logging – Using Amazon Simple Storage Service (Amazon S3) for visibility, compliance, and data analytics

In this scenario, the Secure Web Gateway is designed to inspect and make decisions on network traffic within single-digit milliseconds. The workflow consists of the following steps:

  1. The corporate office uses AWS Direct Connect to connect to AWS Local Zones.
  2. The security administrator defines the rules from a rules interface running on Kubernetes pods on Amazon EKS. As the rules are added or modified, they are sent to the swg_rules Kafka topic running on Amazon EKS. These rules are stored and processed by Flink running on the EMR cluster.
  3. A corporate user requests for a software as a service (SaaS) application from the corporate office. The request is routed through Direct Connect to the Local Zone.
  4. The Secure Web Gateway proxy service running on Kubernetes pods on Amazon EKS receives the access request, which is sent to the swg_requests Kafka topic.
  5. Flink running on EMR evaluates and consumes the messages from the swg_requests Kafka topic and determines the routing decision, which is sent back to the swg_decisions Kafka topic.
  6. The Secure Web Gateway proxy service consumes the swg_decisions topic and routes the traffic to the SaaS application, if the access request is allowed. If the request is denied, the proxy responds back to the users with the reason or violations details, if any.

Due to the real-time nature of the solution, the security administrator can add, modify, or remove the rules through the swg_rules topic as Flink constantly consumes and evaluates this topic.In the following sections, we discuss the key components of the solution in more detail.

AWS Local Zones: The foundation

AWS Local Zones provide low-latency extensions of AWS Regions positioned near large population and industry centers. For our Secure Web Gateway use case, deploying in a Local Zones offers several advantages:

  • Proximity to corporate offices – Reducing round-trip latency for traffic inspection. AWS Local Zones is designed to provide applications with low latency aiming for single-digit millisecond performance.
  • AWS-native security controls – Using AWS security features.
  • Consistent connectivity – Reliable connection between corporate networks and AWS resources.

The Local Zone hosts our EMR cluster and networking components, making sure traffic inspection through the Secure Web Gateway happens with single-digit millisecond latency. For scenarios where traffic inspection doesn’t require single-digit millisecond latency, deploying hosting the solution on EMR cluster in a Region should work fine.

Amazon EMR with Apache Flink: The decision engine

The core intelligence of our Secure Web Gateway solution is powered by Amazon EMR running Flink for real-time stream processing. With Amazon EMR running on Flink, we take advantage of the optimized real-time stream processing capability offered by Flink. EMR running in AWS Local Zones helps users perform complex data processing closer to their data centers or corporate locations without worrying any potential latency introduced for moving the data to other Regions. In this particular solution, we use Flink’s stateful processing, which allows for maintaining the session context across multiple network requests/packets. The solution also provides a dynamic rules engine that is combined with the real-time stream of requests for network access.

Architectural component choice considerations

Amazon EMR offers several deployment options for different kinds of workloads and use cases, including Amazon EMR on EKS. AWS also provides Amazon Managed Service for Apache Flink, a fully managed service that simplifies the process of building and managing Flink applications. As of this writing, both the EMR on EKS deployment option and Amazon Managed Service for Apache Flink are not available in AWS Local Zones.

Prerequisites

Before proceeding with this deployment, ensure you have:

  • AWS account with AWS IAM permissions for Amazon VPC, EMR, and Local Zones management
  • Basic familiarity with the AWS Management Console

Deploy Amazon EMR on a Local Zone

To deploy Amazon EMR on a Local Zone, you first need to enable the Local Zone for the AWS account. For instructions, refer to Step 1 and Step 2 in Getting started with AWS Local Zones.

After you have enabled a Local Zone and created a Local Zone subnet, create your EMR cluster. For instructions, refer to Step 1: Configure data resources and launch an Amazon EMR cluster. You can follow the instructions provided for the AWS Management Console. Make sure you select the appropriate Amazon EMR release version (5.28.0 or later for Local Zone support). Select the applications you need, which in this case is Hadoop and Flink.

A crucial step to launching an EMR cluster in a Local Zone is selecting the Local Zone network configuration. Choose the VPC that contains your Local Zone subnet, and choose the subnet that you created in the Local Zone.

Review all other configurations and settings for your cluster and make any final adjustments as needed, then choose Create cluster to launch your EMR cluster in the Local Zone.

Performance and scaling considerations

The Local Zone EMR deployment can be scaled based on traffic patterns. You can manually scale the EMR cluster horizontally by adding more worker nodes during peak traffics to provide low-latency performance, after you have increased the number of users that access the Secure Web Gateway. Alternatively, you can set up a scheduled action to scale the EMR cluster at predetermined times based on known workload patterns. You can also perform vertical scaling by using Amazon Elastic Compute Cloud (Amazon EC2) instance types with more compute capacity. Consider using the manual resize option for EMR clusters to modify the cluster size based on workload requirements.

Another important performance consideration is to optimize Flink checkpointing for fault tolerance. To learn more, see Optimizing job restart times for task recovery and scaling operations.

Security considerations

Although this architecture prioritizes low-latency performance, implementing proper security controls is essential for production deployments. The solution handles sensitive corporate network traffic that requires protection through encryption, access controls, and monitoring. For comprehensive security guidance specific to EMR deployments, refer to Security in Amazon EMR. Consider the following key areas:

  • Data protection – Enable encryption at rest and in transit using Amazon EMR security configurations, including Amazon S3 encryption and TLS certificates for inter-node communication
  • Access control – Implement AWS Identity and Access Management (IAM) roles with least privilege for Amazon EMR service roles, EC2 instance profiles, and runtime roles to isolate job access
  • Network security – Deploy EMR clusters in private subnets with security groups following least privilege, and enable the Amazon EMR block public access feature

Benefits of Amazon EMR

Using Amazon EMR on AWS Local Zones in this architecture offers several key benefits:

  • Low latency – Providing the compute in AWS Local Zones close to corporate offices helps you achieve low-latency processing.
  • Real-time inspection – Flink’s streaming capabilities unlocks the ability to process real-time inspection for network requests.
  • Complex policy application – With Flink on Amazon EMR, you can build a complex policy application that, for instance, can detect sophisticated access patterns across multiple events and time windows that would be impossible with traditional rule-based systems.
  • Scalability – Amazon EMR provides the flexibility to automatically scale the cluster with a custom policy. Moreover, Amazon EMR release 6.15.0 and higher supports Flink autoscaler, which automatically scales the individual Flink job vertexes based on the job metrics.
  • Compliance – Logging all the events to a durable storage like Amazon S3 helps users improve their security and audit posture.

Clean up

To avoid incurring unnecessary charges, clean up the resources you created during this walkthrough. Follow these steps in order:

Step 1: Terminate the EMR cluster

  • Open the Amazon EMR console
  • Select your EMR cluster from the list
  • Choose Terminate
  • Confirm the termination when prompted
  • Wait for the cluster status to change to “TERMINATED”

Step 2: Clean up VPC resources

  • In the Amazon VPC console, delete the Local Zone subnet you created
  • If you created a custom VPC specifically for this demo, delete any associated:
    • Route tables
    • Internet gateways
    • Security groups (other than default)
    • The VPC itself

Step 3: Disable the Local Zone (optional)

  • In the EC2 console, go to Zones under “Settings”
  • Find your enabled Local Zone
  • Choose Manage and disable the zone if you no longer need it for other workloads

Step 4: Review additional resources Check for and clean up any other resources you may have created:

  • S3 buckets used for logging or EMR storage
  • CloudWatch log groups
  • Any custom IAM roles or policies created specifically for this architecture

Conclusion

This implementation of Amazon EMR on AWS Local Zones demonstrates how enterprises can bring powerful data processing capabilities to the edge while maintaining single-digit millisecond latency. By showcasing a Secure Web Gateway application, we have illustrated just one of many possible use cases where performance-sensitive workloads can benefit from this architecture.As the edge computing landscape evolves, we anticipate organizations will increasingly use this pattern for additional use cases, including:

  • Real-time fraud detection for financial transactions requiring immediate decision-making
  • Connected vehicle applications where processing telemetry data with minimal latency is critical
  • Internet of Things (IoT) sensor analytics that require immediate insights from operational technology environments
  • Augmented reality experiences where processing must happen close to end-users

We encourage you to evaluate your latency-sensitive workloads and consider how AWS Local Zones with Amazon EMR might help you implement architectures previously perceived highly challenging. Start small with a proof of concept like the one outlined here, measure the performance gains, and expand to production use cases with confidence. Implementing a Secure Web Gateway in AWS Local Zones with Amazon EMR and Flink offers enterprises a powerful solution for securing corporate traffic. By using the proximity of Local Zones and the real-time processing capabilities of Flink, organizations can implement sophisticated security policies without the latency penalties traditionally associated with traffic inspection.


About the authors

Gagan Brahmi is a Specialist Senior Solutions Architect at Amazon Web Services (AWS), specializing in Data Analytics and AI/ML solutions. With over 20 years in information technology, he helps customers architect scalable, high-performance analytics platforms using distributed data processing, real-time streaming technologies, and machine learning services on AWS. When not designing cloud solutions, Gagan enjoys exploring new places with his family.

Arun Shanmugam is a Senior Analytics Solutions Architect at AWS, with a focus on building modern data architecture. He has been successfully delivering scalable data analytics solutions for customers across diverse industries. Outside of work, Arun is an avid outdoor enthusiast who actively engages in CrossFit, road biking, and cricket.

George Oakes is a Senior Hybrid Solutions Architect at AWS, with a focus on edge, on-premise, and low latency architectures. He has been successfully delivering scalable hybrid AWS solutions for customers across diverse industries. Outside of work, George is an avid outdoor enthusiast who enjoys hiking and visiting parks and UNESCO sites around.

Enhance Amazon EMR observability with automated incident mitigation using Amazon Bedrock and Amazon Managed Grafana

Post Syndicated from Yu-Ting Su original https://aws.amazon.com/blogs/big-data/enhance-amazon-emr-observability-with-automated-incident-mitigation-using-amazon-bedrock-and-amazon-managed-grafana/

Maintaining high availability and quick incident response for Amazon EMR clusters is important in data analytics environments. In this post, we show you how to build an automated observability system that combines Amazon Managed Grafana with Amazon Bedrock to detect and remediate EMR cluster issues. We demonstrate how to integrate real-time monitoring with AI-powered remediation suggestions, combining Amazon Managed Grafana for visualization, Amazon Bedrock for intelligent response recommendations, and AWS Systems Manager for automated remediation actions on Amazon Web Services (AWS).

Solution overview

This solution helps you improve EMR cluster observability through a comprehensive four-layer architecture—comprising monitoring, notification, remediation, and knowledge management—to provide the following features:

  • Real-time monitoring of EMR clusters using Amazon Managed Service for Prometheus and Amazon Managed Grafana
  • Automated first-aid remediation through Systems Manager
  • AI-powered incident response suggestions using Amazon Bedrock
  • Integration with the AWS Premium Support knowledge base
  • Historical incident data archival and analysis

The implementation of this architecture delivers the following key benefit:

  • Reduced Mean time to resolution (MTTR)
  • Proactive incident prevention
  • Automated first-response actions
  • Knowledge base enrichment through machine learning

The following diagram illustrates the solution architecture.

End-to-end AWS monitoring solution diagram integrating Knowledge Center, Support, CloudWatch metrics with EventBridge rules and Lambda processing

The architecture comprises the following core components:

  • Monitoring layer – The monitoring layer uses Amazon Managed Service for Prometheus and Amazon CloudWatch to capture real-time metrics from EMR clusters. Amazon Managed Grafana serves as the visualization layer, offering comprehensive dashboards for Apache YARN, HDFS, Apache HBase, and Apache Hudi performance monitoring. Advanced alerting mechanisms trigger notifications based on predefined query results.
  • Notification layer – To provide timely and reliable alert delivery, the notification layer uses Amazon Simple Notification Service (Amazon SNS) for distribution and Amazon Simple Queue Service (Amazon SQS) for message queuing. This architecture prevents message delays and provides a robust trigger mechanism for AWS Lambda functions.
  • Remediation layer – The remediation layer enables automatic issue resolution through:
    • Lambda functions for orchestration
    • Systems Manager for script execution
    • Amazon Bedrock (amazon.nova-lite-v1:0) for generating intelligent response recommendations
  • Knowledge management layer – To maintain an up-to-date knowledge base, the solution:

We provide an AWS CloudFormation template to deploy the solution resources.

Prerequisites

Before starting this walkthrough, make sure you have access to the following AWS resources and configurations:

  • An AWS account
  • Access to the US East (N. Virginia) AWS Region
    • Add access to Amazon Bedrock foundation models (amazon.nova-lite-v1:0)

  • Amazon EMR version 6.15.0 (used in this demo)
  • Archived technical or troubleshooting articles
  • AWS IAM Identity Center enabled with at least one role that can become a Grafana administrator
  • (Optional) AWS Premium Support with a business support plan or higher for enhanced troubleshooting capabilities

Throughout this walkthrough, we provide detailed instructions to set up and configure these prerequisites if you haven’t already done so.

Configure resources using AWS CloudFormation

Complete the following steps to configure your resources:

  1. Launch the CloudFormation stack:

launch stack

  1. Provide emrobservability as the stack name.
  2. Select a virtual private cloud (VPC) and assign a public subnet.
  3. For EMRClusterName, enter a name for your cluster (default: emrObservability).
  4. Enter an existing Amazon S3 location as the Apache HBase root directory location (for example, s3://mybucket/my/hbase/rootdir/).
  5. For MasterInstanceType and CoreInstanceType, enter your instance types (default: m5.xlarge for both).
  6. For CoreInstanceCount, enter your instance count (default: 2).
  7. For SSHIPRange, use CheckIp and enter your IP (for example, 10.1.10/32).
  8. Choose the release label (default: 6.15.0).
  9. For KeyName, enter a key name to SSH to Amazon Elastic Compute Cloud (Amazon EC2) instances.
  10. For LatestAmiId, enter your AMI (default: /aws/service/ami-amazon-linux-latest/amzn2-ami-hvm-x86_64-gp2).
  11. For KBS3Bucket, enter a name for your S3 bucket (for example, mykbbucket).
  12. For SubscriptionEndpoint, enter an email address to receive notifications and responses (for example, [email protected]).

Accept subscription confirmation

Accept the subscription confirmation sent to the email address you specified in the CloudFormation stack parameters. The following screenshot shows an example of the email you receive.

AWS email confirmation for SNS topic subscription to QA Lambda function responses with opt-out instructions

Prepare the knowledge base

Complete the following steps to populate the S3 bucket with archived technical articles and cases:

  1. On the Lambda console, choose Functions in the navigation pane.
  2. Choose the function CustomFunctionCopyKCArticlesToS3Bucket.

AWS Lambda console displaying Functions page with CustomFunctionCopyKCArticlesToS3Bucket function details

  1. Manually invoke the function by choosing Test on the Test tab.

AWS Lambda Test tab interface with event configuration options

  1. Verify successful execution by checking the CloudWatch logs.

AWS Lambda successful function execution result with null output

  1. Repeat the process for the Lambda function CustomFunctionCopyCasesToS3Bucket.

Lambda function interface displaying CustomFunctionCopyCasesToS3Bucket configuration with CloudFormation ID and description panel

AWS Lambda test interface showing Test event configuration options and action buttons

AWS Lambda function execution success message with null response and SHA-256 code

  1. Confirm the S3 bucket has been populated with archived technical articles and cases.

Amazon S3 bucket interface showing two folders with action buttons and search functionality

Sync data to the Amazon Bedrock knowledge base

Complete the following steps to sync the data to your knowledge base:

  1. On the Lambda console, choose Functions in the navigation pane.
  2. Choose the function KBDataSourceSync.

AWS Lambda console displaying filtered functions with CloudFormation tags, Python runtime versions, and modification timestamps

  1. Manually invoke the function by choosing Test on the Test tab.

This task might take 10–15 minutes to complete.

AWS Lambda console test configuration panel with CloudWatch integration and event creation controls

  1. Verify successful execution by checking the CloudWatch logs.

Lambda function execution results showing successful completion status and details

Configure your Amazon Managed Grafana workspace

Complete the following steps to configure your Amazon Managed Grafana workspace:

  1. On the Amazon Managed Grafana console, choose Workspaces in the navigation pane.
  2. Open your workspace.
  3. Choose Assign new user or group.

Amazon Grafana workspace showing IAM configuration notice and user assignment button

  1. Select your IAM Identity Center role and choose Assign users and groups.

Amazon Grafana IAM Identity Center user assignment panel with search and selection controls

  1. On the Admin dropdown menu, choose Make admin.

Amazon Grafana user list showing assigned viewer with admin action options

  1. Enable Grafana alerting, then choose Save changes.

Amazon Grafana alerting configuration panel showing disabled status with navigation tabs and edit button

Amazon Grafana configuration panel showing enabled alerting and plugin management settings

  1. Wait 10 minutes for the workspace to become active.
  2. When it’s active, sign in to the Grafana workspace. (For more information, refer to Connect to your workspace.)

Configure data sources

Add and configure the following data sources:

  1. For Service, choose CloudWatch, then select your Region and add CloudWatch as a data source.

  1. Choose Amazon Managed Service for Prometheus as a second data source and select your Region.

  1. Validate CloudWatch connectivity:
    1. Run test queries (for example, Namespace: AWS/EC2, Metric name: CPUUtilization, Statistic: Maximum).
      Amazon Managed Gragana interface showing CPU utilization query setup for EC2 instance.
    2. Verify CloudWatch metric retrieval.
      Line graph showing CPU utilization over time with peak at 40%.
  1. Validate Amazon Managed Service for Prometheus connectivity:
    1. Run test queries (for example, Metric: hadoop_hbase_numregionservers, Label filters: cluster_id = <Amazon EMR cluster ID>).
      Amazon Managed Grafana query interface showing Hadoop HBase metric configuration.
    2. Verify Prometheus metric retrieval.
      Amazon Managed Grafana monitoring dashboard showing a graph with HBase Region Server amount from 0 to 2

Confirm SNS notification channels

Complete the following steps to confirm your SNS notification is set up:

  1. On the Amazon SNS console, choose Topics in the navigation pane.
  2. Locate and note the ARNs for -LambdaFunctionTopic and -QALambdaFunctionTopic.

AWS SNS Topics list showing 4 topics with names, types, and ARNs

AWS SNS Topics console showing filtered search results for "LambdaFunctionTopic"

AWS SNS Topics console showing filtered search results for "QALambdaFunctionTopic"

  1. Choose Contact points under Alerting.

  1. Create the first contact point:
    1. For Name, enter SNS_SSM.
    2. For Integration, choose AWS SNS.
    3. For Topic, enter the ARN for LambdaFunctionTopic.
    4. For Auth Provider, choose Workspace IAM role.
    5. For Alert Message format, choose JSON.

  1. Create the second contact point:
    1. For Name, enter SNS_QA.
    2. For Integration, choose AWS SNS.
    3. For Topic, enter the ARN for QALambdaFunctionTopic.
    4. For Auth Provider, choose Workspace IAM role.
    5. For Alert Message format, choose JSON.

Create alert rules

Complete the following steps to set up two critical alert rules:

  1. Choose Alert rules under Alerting.

  1. Set up alerting if the Apache HBase region server status is abnormal:
    1. For Alert name, enter HBase region server down.
    2. For Data source, choose Amazon Managed Service for Prometheus.
    3. For Metric, choose hadoop_hbase_numregionservers.
      Alert rule configuration interface for HBase region server monitoring
    4. For Threshold, configure to alert if the region server count is less than 2 for 3 minutes.
      Amazon Managed Grafana alert rule configuration interface with expressions setup
    5. For Evaluation interval, set to 1 minute.
      New evaluation group creation modal showing P0_RegionServer name input and 1m interval settingHBase alert configuration panel showing P0_RegionServer group and 3m pending period
    6. For Contact point, choose SNS_SSM.
      Amazon Managed Grafana alert configuration interface showing labels and notifications setup with AWS SNS integration
  1. Create a second alert for if Amazon EC2 CPU utilization is abnormal:
    1. For Alert name, enter EC2 CPU utilization too high.
    2. For Data source, choose Amazon CloudWatch.
    3. For Namespace, choose AWS/EC2.
    4. For Metric name, choose CPUUtilization
    5. For Statistic, choose Maximum.
      Amazon CloudWatch query interface for setting up EC2 CPU utilization alert conditions
    6. For Threshold, configure to alert if CPU utilization is more than 95% for 3 minutes.
      Amazon Managed Grafana alert interface with Reduce and Threshold expressions for alert condition management
    7. For Evaluation interval, configure to 1 minute.
      New evaluation group configuration modal showing CPU utilization monitoring setup with 1-minute interval
      AWS Managed Grafana alert rule configuration screen showing evaluation behavior settings
    8. For Contact point, choose SNS_QA.Amazon Managed Grafana alert configuration showing customizable labels, contact point selection for SNS_QA integration
  1. On the alert rule creation page, scroll to 5. Add annotations and for Summary, add a clear description of the alert, for example, CPU utilization on EC2 instance is too high.

Alert configuration summary field with "CPU utilization on EC2 instance is too high" warning message

Apache HBase region server incident test

To confirm the system is working as expected, complete the following Apache HBase region server incident test:

  1. SSH into an EMR core instance.
  2. Stop the Apache HBase region server using systemctl:
 # Stop HBase region server service 
 sudo systemctl stop hbase-regionserver.service 

  1. Verify the service status:
 # Check the current state of HBase region server service 
 sudo systemctl status hbase-regionserver.service
  1. Observe Amazon Managed Grafana alert progression:
    1. Monitor alert status changes.
      Alert dashboard showing HBase region server alert status in pending state
      Alert dashboard showing HBase region server alert in firing state
    2. Verify SNS message generation.
    3. Confirm SQS message queuing.
    4. Track the Lambda function triggered for remediation.

Terminal output showing HBase RegionServer service status and daemon processes

HBase monitoring interface displaying region server status with health indicators and action buttons

CPU utilization stress test

Complete the following CPU utilization stress test:

  1. SSH into the EMR primary instance.
  2. Install stress testing tools:
 sudo amazon-linux-extras install epel -y
 sudo yum install stress -y 

  1. Verify the installation:
 stress --version 

  1. Generate high CPU load using the stress command and the following command structure:
 sudo stress [options] 

For our Amazon EMR test, use the following command:

 # For m5.xlarge instances (4 vCPUs) sudo stress --cpu 4 

-c 4 in the command creates 4 CPU-bound processes (one for each vCPU).The following are instance type vCPUs for your reference:

  • m5.xlarge: 4 vCPUs
  • m5.2xlarge: 8 vCPUs
  • m5.4xlarge: 16 vCPUs
  1. Monitor system response:
    1. Observe Amazon Managed Grafana alert status changes.
      Amazon Managed Grafana dashboard header showing rules status
    2. Verify Amazon Bedrock recommendation generation.
    3. Check SNS email notification delivery.
      AWS SNS notification email showing troubleshooting steps for high CPU usageCode snippet showing CPU usage troubleshooting steps in red text

Best practices and considerations

Monitoring infrastructure requires precise alert prioritization and threshold configuration. Alert aggregation techniques prevent notification overload by consolidating event streams and reducing redundant alerts. Operational teams must maintain dashboards through consistent updates and metric integration, providing real-time visibility into system performance and health.

Security implementations focus on least-privilege AWS Identity and Access Management (IAM) roles, restricting access to critical resources and minimizing potential breach vectors. Data protection strategies involve encryption protocols for information at rest and in transit, using AES-256 standards. Automated security audit processes scan automation scripts, identifying potential vulnerabilities through code analysis and runtime inspection.

Performance optimization in serverless architectures uses Lambda extensions to cache knowledge base content, reducing latency and improving response times. Retry mechanisms for API calls implement exponential backoff strategies, mitigating transient network exceptions and enhancing system resilience. Execution time monitoring of Lambda functions enables detection of anomalies through statistical analysis, providing insights into potential system-wide incidents or performance degradations.

Clean up

To avoid incurring future charges, delete the resources by deleting the parent stack on the AWS CloudFormation console.

Conclusion

This solution provides a robust framework for automated EMR cluster monitoring and incident response. By combining real-time monitoring with AI-powered remediation suggestions and automated execution, organizations can significantly reduce MTTR for common Amazon EMR issues while building a knowledge base for future incident response.

Try out this solution for your own use case, and leave your feedback in the comments section.


About the authors

Author Yu-ting Su, Sr. Hadoop System Engineer, AWS Support Engineering. Yu-Ting is a Sr. Hadoop Systems Engineer at Amazon Web Services (AWS). Her expertise is in Amazon EMR and Amazon OpenSearch Service. She’s passionate about distributing computation and helping people to bring their ideas to life.

Use Databricks Unity Catalog Open APIs for Spark workloads on Amazon EMR

Post Syndicated from Venkat Viswanathan original https://aws.amazon.com/blogs/big-data/use-databricks-unity-catalog-open-apis-for-spark-workloads-on-amazon-emr/

This post was written with John Spencer, Sreeram Thoom, and Dipankar Kushari from Databricks.

Organizations need seamless access to data across multiple platforms and business units. A common scenario involves one team using Amazon EMR for data processing while needing to access data that another team manages in Databricks Unity Catalog. Traditionally, this would require data duplication or complex manual setups.

Although both Amazon EMR and Databricks Unity Catalog are powerful tools on their own, integrating them effectively is crucial for maintaining strong data governance, security, and operational efficiency. In this post, we demonstrate how to achieve this integration using Amazon EMR Serverless, though the approach works well with other Amazon EMR deployment options and Unity Catalog OSS.

EMR Serverless makes running big data analytics frameworks straightforward by offering a serverless option that automatically provisions and manages the infrastructure required to run big data applications. Teams can run Apache Spark and other workloads without the complexity of cluster management, while providing cost-effective scaling based on actual workload demands and seamless integration with AWS services and security controls.

Databricks Unity Catalog serves as a unified governance solution for data and AI assets, providing centralized access control and auditing capabilities. It enables fine-grained permissions across workspaces and cloud platforms, while supporting comprehensive metadata management and data discovery across the organization, and can complement governance tools like AWS Lake Formation.

To enable Amazon EMR to process data maintained in Unity Catalog, the data team traditionally copies data products across the platforms to a location accessible by Amazon EMR. The practice of data duplication not only leads to increased storage costs, but also severely impacts data quality and makes it challenging to effectively enforce same governance policies across different systems, track data lineage, enforce data retention policies, and maintain consistent access controls across the organization.

Now using Unity Catalog’s Open REST APIs, Amazon EMR customers can read from and write to Databricks Unity Catalog and Unity Catalog OSS tables using Spark, enabling cross-platform interoperability while maintaining governance and access controls across Amazon EMR and Unity Catalog.

Solution overview

In this post, we will provide an overview of EMR Spark workload integration with Databricks Unity Catalog and walk through the end-to-end process of reading from and writing to Databricks Unity Catalog tables using Amazon EMR and Spark. We show you how to configure EMR Serverless to interact with Databricks Unity Catalog, run an interactive Spark workload to access the data, and run an analysis to derive insights.

The following diagram illustrates the solution architecture.

Prerequisites

You must have the following prerequisites:

In the following sections, we walk through the process of reading and writing to Unity Catalog with EMR Serverless.

Enable Unity Catalog for external access

Log in to your workspace as a Databricks admin and complete the following steps to configure external access to read Databricks objects:

  1. Enable external data access for your metastore. For instructions, see Enable external data access on the metastore.
  2. Set up a principal that will be configured with Amazon EMR for data access.
  3. Grant the principal the privilege to configure the integration of the EXTERNAL USE SCHEMA privilege on the schema containing the objects. For instructions, see Grant a principal EXTERNAL USE SCHEMA.
  4. For this post, we generate a Databricks personal access token (PAT) for the principal and note it down. For instructions, refer to Authorizing access to Databricks resources and Databricks personal access token authentication.

For a production deployment, store the PAT in AWS Secrets Manager. You can use it in a later step to read and write to Unity Catalog with Amazon EMR.

Configure EMR Spark to access Unity Catalog

In this walkthrough, we run PySpark interactive queries through notebooks using EMR Studio. Complete the following steps:

  1. Open the AWS Management Console with administrator permission.
  2. Create an EMR Studio to run interactive workloads. To create a workspace, you need to specify the S3 bucket created in the prerequisites and the minimum service role for EMR Serverless. For instructions, see Set up an EMR Studio.
  3. For this post, we create two EMR Serverless applications. For instructions, see Creating an EMR Serverless application from the EMR Studio console.
    1. For Iceberg tables, create an EMR Serverless application called dbx-demo-application-iceberg with version 7.8.0 or higher. Make sure to deselect Use AWS Glue Data Catalog as Metastore under Additional Configurations, Metastore configuration. Add the following Spark configuration (see Configure applications). Provide the name of the catalog in Unity Catalog that contains your tables and the URL of the Databricks workspace.
      {
          "runtimeConfiguration": [
              {
                  "classification": "spark-defaults",
                  "properties": {
                      "spark.jars": "/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar",
                      "spark.jars.packages": "io.unitycatalog:unitycatalog-spark_2.12:0.2.0",
                      "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
                      "spark.sql.defaultCatalog": "<uc-catalog-name>",
                      "spark.sql.catalog.<uc-catalog-name>": "org.apache.iceberg.spark.SparkCatalog",
                      "spark.sql.catalog.<uc-catalog-name>.uri": "https://<workspace-url>/api/2.1/unity-catalog/iceberg-rest",
                      "spark.sql.catalog.<uc-catalog-name>.type": "rest",
                      "spark.sql.catalog.<uc-catalog-name>.warehouse": "<uc-catalog-name>"
                  }
              }
          ]
      }

    2. For Delta tables, create an EMR Serverless application called dbx-demo-application and version 7.8.0 or higher. Make sure to deselect Use AWS Glue Data Catalog as Metastore under Additional Configurations, Metastore configuration. Add the following Spark configuration (see Configure applications). Provide the name of the catalog in Unity Catalog that contains your tables and the URL of the Databricks workspace.
      {
          "runtimeConfiguration": [
              {
                  "classification": "spark-defaults",
                  "properties": {
                      "spark.jars": "/usr/share/aws/delta/lib/delta-spark.jar,/usr/share/aws/delta/lib/delta- storage.jar",
                      "spark.jars.packages": "io.unitycatalog:unitycatalog-spark_2.12:0.2.0",
                      "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
                      "spark.sql.defaultCatalog": "<uc-catalog-name>",
                      "spark.sql.catalog.spark_catalog": "io.unitycatalog.spark.UCSingleCatalog",
                      "spark.sql.catalog.<uc-catalog-name>": "io.unitycatalog.spark.UCSingleCatalog",
                      "spark.sql.catalog.<uc-catalog-name>.uri": "https://<workspace-url>/api/2.1/unity-catalog"
                  }
              }
          ]
      }

  4. To set up your interactive workload with a runtime role, see Run interactive workloads with EMR Serverless through EMR Studio.

Read and write to Unity Catalog with Amazon EMR

Launch the workspace created in the previous step. Download the notebooks create-delta-table and create-iceberg-table and upload them to the EMR Studio workspace.

The create-delta-table.ipynb notebook configures the metastore properties to work with Delta tables. The create-iceberg-table.ipynb notebook configures the metastore properties to work with Iceberg tables.

Add the generated token to the session.

For a production deployment, store the PAT in Secrets Manager.

For Iceberg tables, connect to the EMR Serverless application dbx-demo-application-iceberg with the runtime role created in earlier steps under compute and run the notebook (create-iceberg-table). Select PySpark as the kernel and execute each cell in the notebook by choosing the run icon. Refer to Submit a job run or interactive workload for further details about how to run an interactive notebook.

We use the following code to create an external Iceberg table in the catalog:

CREATE SCHEMA IF NOT EXISTS customerschema;
USE SCHEMA customerschema;
CREATE TABLE IF NOT EXISTS iceberg_customer (id string, name string, country string) USING iceberg; 
insert into iceberg customer values('1','Alice','US');

For Delta tables, connect to the EMR Serverless application dbx-demo-application with the runtime role created in earlier steps and run the notebook (create-delta-table). Select PySpark as the kernel and execute each cell in the notebook by choosing the run icon. Refer to Submit a job run or interactive workload for further details about how to run an interactive notebook.

We use the following code to create an external Delta table in the catalog:

CREATE SCHEMA IF NOT EXISTS customerschema;
USE SCHEMA customerschema;
CREATE TABLE IF NOT EXISTS delta_customer (id int, name string, country string) USING delta LOCATION ‘s3://<bucket_name>/emr-dbx/external/customerschema/delta_customer’;
insert into delta_customer values(1,'Bob','US'); 

Verify in Databricks for both Iceberg and Delta tables

Now you can run queries in Databricks Unity Catalog to show the records inserted into the Iceberg and Delta tables from EMR Serverless:

  1. Log in to your Databricks workspace.
  2. Choose SQL Editor in the navigation pane.
  3. Run queries for both Iceberg and Delta tables.
  4. Verify the results show the same as what you saw in the Jupyter notebook in EMR Studio.

The following screenshot shows an example of querying the Iceberg table.

The following screenshot shows an example of querying the Delta table.

Clean up

Clean up the resources used in this post to avoid additional charges:

  1. Delete the IAM roles for this post.
  2. Delete the EMR applications and EMR Studio setup created for this post.
  3. Delete the resources created in Unity Catalog.
  4. Empty and then delete the S3 bucket.

Summary

In this post, we demonstrated the powerful interoperability between Amazon EMR and Databricks Unity Catalog by walking through how to enable external access to Unity Catalog, configure EMR Spark to connect seamlessly with Unity Catalog, and perform DML and DDL operations on Unity Catalog tables using EMR Serverless.

To learn more about using EMR Serverless, see Getting started with Amazon EMR Serverless. To learn more about using tools like EMR Spark with Unity Catalog, see Unity Catalog integrations.


About the authors

Venkatavaradhan (Venkat) Viswanathan is a Global Partner Solutions Architect at Amazon Web Services. Venkat is a Technology Strategy Leader in Data, AI, ML, generative AI, and Advanced Analytics. Venkat is a Global SME for Databricks and helps AWS customers design, build, secure, and optimize Databricks workloads on AWS.

Srividya Parthasarathy is a Senior Big Data Architect on the AWS Lake Formation team. She works with the product team and customers to build robust features and solutions for their analytical data platform. She enjoys building data mesh solutions and sharing them with the community.

Ramkumar Nottath is a Principal Solutions Architect at AWS focusing on Analytics services. He enjoys working with various customers to help them build scalable, reliable big data and analytics solutions. His interests extend to various technologies such as analytics, data warehousing, streaming, data governance, and machine learning. He loves spending time with his family and friends.

John Spencer is a Product Manager at Databricks, dedicated to making Unity Catalog work seamlessly with customers’ ecosystems of tools and platforms so they can easily access, govern, and use their data.

Sreeram Thoom is a Specialist Solutions Architect at Databricks helping customers design secure, scalable applications on the Data Lakehouse.

Dipankar Kushari is a specialist solutions architect at Databricks helping customer architect and build secured applications on Data Lakehouse.

RocksDB 101: Optimizing stateful streaming in Apache Spark with Amazon EMR and AWS Glue

Post Syndicated from Melody Yang original https://aws.amazon.com/blogs/big-data/rocksdb-101-optimizing-stateful-streaming-in-apache-spark-with-amazon-emr-and-aws-glue/

Real-time streaming data processing is a strategic imperative that directly impacts business competitiveness. Organizations face mounting pressure to process massive data streams instantaneously—from detecting fraudulent transactions and delivering personalized customer experiences to optimizing complex supply chains and responding to market dynamics milliseconds ahead of competitors.

Apache Spark Structured Streaming addresses these critical business challenges through its stateful processing capabilities, enabling applications to maintain and update intermediate results across multiple data streams or time windows. RocksDB was introduced in Apache Spark 3.2, offering a more efficient alternative to the default HDFS-based in-memory store. RocksDB excels in stateful streaming in scenarios that require handling large quantities of state data. It delivers optimal performance benefits, particularly in reducing Java virtual machine (JVM) memory pressure and garbage collection (GC) overhead.

This post explores RocksDB’s key features and demonstrates its implementation using Spark on Amazon EMR and AWS Glue, providing you with the knowledge you need to scale your real-time data processing capabilities.

RocksDB state store overview

Spark Structured Streaming processes fall into two categories:

  • Stateful: Requires tracking intermediate results across micro-batches (for example, when running aggregations and de-duplication).
  • Stateless: Processes each batch independently.

A state store is required by stateful applications that track intermediate query results. This is essential for computations that depend on continuous events and change results based on each batch of input, or on aggregate data over time, including late arriving data. By default, Spark offers a state store that keeps states in JVM memory, which is performant and sufficient for most general streaming cases. However, if you have a large number of stateful operations in a streaming application—such as, streaming aggregation, streaming dropDuplicates, stream-stream joins, and so on—the default in-memory state store might face out-of-memory (OOM) issues because of a large JVM memory footprint or frequent GC pauses, resulting in degraded performance.

Advantages of RocksDB over in-memory state store

RocksDB addresses the challenges of an in-memory state store through off-heap memory management and efficient checkpointing.

  • Off-heap memory management: RocksDB stores state data in OS-managed off-heap memory, reducing GC pressure. While off-heap memory still consumes machine memory, it doesn’t occupy space in the JVM. Instead, its core memory structures, such as block cache or memTables, allocate directly from the operating system, bypassing the JVM heap. This approach makes RocksDB an optimal choice for memory-intensive applications.
  • Efficient checkpointing: RocksDB automatically saves state changes to checkpoint locations, such as Amazon Simple Storage Service (Amazon S3) paths or local directories, helping to ensure full fault tolerance. When interacting with S3, RocksDB is designed to improve checkpointing efficiency; it does this through incremental updates and compaction to reduce the amount of data transferred to S3 during checkpoints, and by persisting fewer large state files compared to the many small files of the default state store, reducing S3 API calls and latency.

Implementation considerations

RocksDB operates as a native C++ library embedded within the Spark executor, using off-heap memory. While it doesn’t fall under JVM GC control, it still affects overall executor memory usage from the YARN or OS perspective. RocksDB’s off-heap memory usage might exceed YARN container limits without triggering container termination, potentially leading to OOM issues. You should consider the following approaches to manage Spark’s memory:

Adjust the Spark executor memory size

Increase spark.executor.memoryOverheadorspark.executor.memoryOverheadFactor to leave more room for off-heap usage. The following example sets half (4 GB) of spark.executor.memory (8 GB) as the memory overhead size.

# Total executor memory = 8GB (heap) + 4GB (overhead) = 12GB
spark-submit \
. . . . . . . .
--conf spark.executor.memory=8g \         # JVM Heap
--conf spark.executor.memoryOverhead=4g \ # Off-heap allocation (RocksDB + other native)
. . . . . . . .

For Amazon EMR on Amazon Elastic Compute Cloud (Amazon EC2), enabling YARN memory control with the following strict container memory enforcement through polling method preempts containers to avoid node-wide OOM failures:

yarn.nodemanager.resource.memory.enforced = false
yarn.nodemanager.elastic-memory-control.enabled = false
yarn.nodemanager.pmem-check-enabled = true 
or 
yarn.nodemanager.vmem-check-enabled = true

Off-heap memory control

Use RocksDB-specific settings to configure memory usage. More details can be found in the Best practices and considerations section.

Get started with RocksDB on Amazon EMR and AWS Glue

To turn on the state store RocksDB in Spark, configure your application with the following setting:

spark.sql.streaming.stateStore.providerClass=org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider

In the following sections, we explore creating a sample Spark Structured Streaming job with RocksDB enabled running on Amazon EMR and AWS Glue respectively.

RocksDB on Amazon EMR

Amazon EMR versions 6.6.0 and later support RocksDB, including Amazon EMR on EC2, Amazon EMR serverless and Amazon EMR on Amazon Elastic Kubernetes Service (Amazon EKS). In this case, we use Amazon EMR on EC2 as an example.

Use the following steps to run a sample streaming job with RocksDB enabled.

  1. Upload the following sample script to s3://<YOUR_S3_BUCKET>/script/sample_script.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, col, expr
import random

# List of words
words = ["apple", "banana", "orange", "grape", "melon", 
         "peach", "berry", "mango", "kiwi", "lemon"]

# Create random strings from words
def generate_random_string():
    return " ".join(random.choices(words, k=5)) 
    
    
# Create Spark Session
spark = SparkSession \
    .builder \
    .appName("StreamingWordCount") \
    .config("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") \
    .getOrCreate()


# Register UDF
spark.udf.register("random_string", generate_random_string)

# Create streaming data
raw_stream = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 1) \
    .load() \
    .withColumn("words", expr("random_string()"))

# Execute word counts
wordCounts = raw_stream.select(explode(split(raw_stream.words, " ")).alias("word")).groupby("word").count()

# Output the results
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()
  1. On the AWS Management Console for Amazon EMR, choose Create Cluster
  2. For Name and applications – required, select the latest Amazon EMR release.
  3. For Steps, choose Add. For Type, select Spark application.
  4. For Name, enter GettingStartedWithRocksDB and s3://<YOUR_S3_BUCKET>/script/sample_script.py as the Application location.
  5. Choose Save step.
  6. For other settings, choose the appropriate settings based on your use case.
  7. Choose Create cluster to start the streaming application via Amazon EMR step.

RocksDB on AWS Glue

AWS Glue 4.0 and later versions support RocksDB. Use the following steps to run the sample job with RocksDB enabled on AWS Glue.

  1. On the AWS Glue console, in the navigation pane, choose ETL jobs.
  2. Choose Script editor and Create script.
  3. For the job name, enter GettingStartedWithRocksDB.
  4. Copy the script from the previous example and paste it on the Script tab.
  5. On Job details tab, for Type, select Spark Streaming.
  6. Choose Save, and then choose Run to start the streaming job on AWS Glue.

Walkthrough details

Let’s dive deep into the script to understand how to run a simple stateful Spark application with RocksDB using the following example pySpark code.

  1. First, set up RocksDB as your state store by configuring the provider class:
spark = SparkSession \
    .builder \
    .appName("StreamingWordCount") \
    .config("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") \
    .getOrCreate()
  1. To simulate streaming data, create a data stream using the rate source type. It generates one record per second, containing five random fruit names from a pre-defined list.
# List of words
words = ["apple", "banana", "orange", "grape", "melon", 
         "peach", "berry", "mango", "kiwi", "lemon"]

# Create random strings from words
def generate_random_string():
    return " ".join(random.choices(words, k=5))
# Register UDF
spark.udf.register("random_string", generate_random_string)

# Create streaming data
raw_stream = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 1) \
    .load() \
    .withColumn("words", expr("random_string()"))
  1. Create a word counting operation on the incoming stream. This is a stateful operation because it maintains running counts between processing intervals, that is, previous counts must be stored to calculate the next new totals.
# Split raw_stream into words and counts them
wordCounts = raw_stream.select(explode(split(raw_stream.words, " ")).alias("word")).groupby("word").count()
  1. Finally, output the word count totals to the console:
# Output the results
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

Input data

In the same sample code, test data (raw_stream) is generated at a rate of one-row-per-second, as shown in the following example:

+-----------------------+-----+--------------------------------+
|timestamp              |value|words                           |
+-----------------------+-----+--------------------------------+
|2025-04-18 07:05:57.204|125  |berry peach orange banana banana|
+-----------------------+-----+--------------------------------+

Output result

The streaming job produces the following results in the output logs. It demonstrates how Spark Structured Streaming maintains and updates the state across multiple micro-batches:

  • Batch 0: Starts with an empty state
  • Batch 1: Processes multiple input records, resulting in initial counts for every one of the 10 fruits (for example, banana appears 8 times)
  • Batch 2: Running totals based on new occurrences from the next set of records are added to the counts (for example,  banana increases from 8 to 15, indicating 7 new occurrences).

-------------------------------------------
Batch: 0
-------------------------------------------
+----+-----+
|word|count|
+----+-----+
+----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
|  word|count|
+------+-----+
|banana|    8|
|orange|    4|
| apple|    3|
| berry|    5|
| lemon|    7|
|  kiwi|    6|
| melon|    8|
| peach|    8|
| mango|    7|
| grape|    9|
+------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+------+-----+
|  word|count|
+------+-----+
|banana|   15|
|orange|    8|
| apple|    7|
| berry|   11|
| lemon|   12|
|  kiwi|   11|
| melon|   16|
| peach|   15|
| mango|   12|
| grape|   13|
+------+-----+

State store logs

RocksDB generates detailed logs during the job run, like the following:

INFO    2025-04-18T07:52:28,378 83933   org.apache.spark.sql.execution.streaming.MicroBatchExecution    [stream execution thread for [id = xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx, runId = xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx]] 60  Streaming query made progress: {
  "id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
  "runId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
  "name": null,
  "timestamp": "2025-04-18T07:52:27.642Z",
  "batchId": 39,
  "numInputRows": 1,
  "inputRowsPerSecond": 100.0,
  "processedRowsPerSecond": 1.3623978201634879,
  "durationMs": {
    "addBatch": 648,
    "commitOffsets": 39,
    "getBatch": 0,
    "latestOffset": 0,
    "queryPlanning": 10,
    "triggerExecution": 734,
    "walCommit": 35
  },
  "stateOperators": [
    {
      "operatorName": "stateStoreSave",
      "numRowsTotal": 10,
      "numRowsUpdated": 4,
      "allUpdatesTimeMs": 18,
      "numRowsRemoved": 0,
      "allRemovalsTimeMs": 0,
      "commitTimeMs": 3629,
      "memoryUsedBytes": 174179,
      "numRowsDroppedByWatermark": 0,
      "numShufflePartitions": 36,
      "numStateStoreInstances": 36,
      "customMetrics": {
        "rocksdbBytesCopied": 5009,
        "rocksdbCommitCheckpointLatency": 533,
        "rocksdbCommitCompactLatency": 0,
        "rocksdbCommitFileSyncLatencyMs": 2991,
        "rocksdbCommitFlushLatency": 44,
        "rocksdbCommitPauseLatency": 0,
        "rocksdbCommitWriteBatchLatency": 0,
        "rocksdbFilesCopied": 4,
        "rocksdbFilesReused": 24,
        "rocksdbGetCount": 8,
        "rocksdbGetLatency": 0,
        "rocksdbPinnedBlocksMemoryUsage": 3168,
        "rocksdbPutCount": 4,
        "rocksdbPutLatency": 0,
        "rocksdbReadBlockCacheHitCount": 8,
        "rocksdbReadBlockCacheMissCount": 0,
        "rocksdbSstFileSize": 35035,
        "rocksdbTotalBytesRead": 136,
        "rocksdbTotalBytesReadByCompaction": 0,
        "rocksdbTotalBytesReadThroughIterator": 0,
        "rocksdbTotalBytesWritten": 228,
        "rocksdbTotalBytesWrittenByCompaction": 0,
        "rocksdbTotalBytesWrittenByFlush": 5653,
        "rocksdbTotalCompactionLatencyMs": 0,
        "rocksdbWriterStallLatencyMs": 0,
        "rocksdbZipFileBytesUncompressed": 266452
      }
    }
  ],
  "sources": [
    {
      "description": "RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default",
      "startOffset": 63,
      "endOffset": 64,
      "latestOffset": 64,
      "numInputRows": 1,
      "inputRowsPerSecond": 100.0,
      "processedRowsPerSecond": 1.3623978201634879
    }
  ],
  "sink": {
    "description": "org.apache.spark.sql.execution.streaming.ConsoleTable$@2cf39784",
    "numOutputRows": 10
  }
}

In Amazon EMR on EC2, these logs are available on the node where the YARN ApplicationMaster container is running. They can be found at/var/log/hadoop-yarn/containers/<Application ID>/<container_id>/stderr.

As for AWS Glue, you can find the RocksDB metrics in Amazon CloudWatch, under the log group /aws-glue/jobs/error.

RocksDB metrics

The metrics from the preceding logs provide insights on RocksDB status. The followings are some example metrics you might find useful when investigating streaming job issues:

  • rocksdbCommitCheckpointLatency: Time spent writing checkpoints to local storage
  • rocksdbCommitCompactLatency: Duration of checkpoint compaction operations during checkpoint commits
  • rocksdbSstFileSize: Current size of SST files in RocksDB.

Deep dive into RocksDB key concepts

To better understand the state metrics shown in the logs, we deep dive into RocksDB’s key concepts: MemTable, sorted string table (SST) file, and checkpoints. Additionally, we provide some tips for best practices and fine-tuning.

High level architecture

RocksDB is a local, non-distributed persistent key-value store embedded in Spark executors. It enables scalable state management for streaming workloads, backed by Spark’s checkpointing for fault tolerance. As shown in the preceding figure, RocksDB stores data in memory and also on disk. RocksDB’s ability to spill data over to disk is what allows Spark Structured Streaming to handle state data that exceeds the available memory.

Memory:

  • Write buffers (MemTables): Designated memory to buffer writes before flushing onto disk
  • Block cache (read buffer): Reduces query time by caching results from disk

Disk:

  • SST files: Sorted String Table saved as SST file format for fast access

MemTable: Stored off-heap

MemTable, shown in the preceding figure, is an in-memory store where data is first written off-heap, before being flushed to disk as an SST file. RocksDB caches the latest two batches of data (hot data) in MemTable to reduce streaming process latency. By default, RocksDB only has two MemTables—one is active and the other is read-only. If you have sufficient memory, the configuration spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber can be increased to have more than two MemTables. Among these MemTables, there is always one active table, and the rest are read-only MemTables used as write buffers.

SST files: Stored on Spark executor’s local disk

SST files are block-based tables stored on the Spark executor’s local disk. When the in-memory state data can no longer fit into a MemTable (defined by a Spark configuration writeBufferSizeMB), the active table is marked as immutable, saving it as the SST file format, which switches it to a read-only MemTable while asynchronously flushing it to local disks. While flushing, the immutable MemTable can still be read, so that the most recent state data is available with minimal read latency.

Reading from RocksDB follows the sequence demonstrated by the preceding diagram:

  1. Read from the active MemTable.
  2. If not found, iterate through read-only MemTables in the order of newest to oldest.
  3. If not found, read from BlockCache (read buffer).
  4. If misses, load index (one index per SST) from disk into BlockCache. Look up key from index and if hits, load data block onto BlockCache and return result.

SST files are stored on executors’ local directories under the path of spark.local.dir (default: /tmp) or yarn.nodemanager.local-dirs:

  • Amazon EMR on EC2 – ${yarn.nodemanager.local-dirs}/usercache/hadoop/appcache/<yarn_app_id>/<spark_app_id>/
  • Amazon EMR Serverless, Amazon EMR on EKS, AWS Glue${spark.local.dir}/<spark_app_id>/

Additionally, by using application logs, you can track the MemTable flush and SST file upload status under the file path:

  • Amazon EMR on EC2/var/log/hadoop-yarn/containers/<application_id>/<container_id>/stderr
  • Amazon EMR on EKS –/var/log/spark/user/<spark_app_name>-<spark_executor_ID>/stderr

The following is an example command to check the SST file status in an executor log from Amazon EMR on EKS:

cat /var/log/spark/user/<spark_app_name>-<spark_executor_ID>/stderr/current | grep old

or

kubectl logs <spark_executor_pod_name> --namespace emr -c spark-kubernetes-executor | grep old

The following screenshot is an example of the output of either command.

You can use the following examples to check if the MemTable records were deleted and flushed out to SST:

cat /var/log/spark/user/<spark_app_name>-<spark_executor_ID>/stderr/current | grep deletes

or

kubectl logs <spark_executor_pod_name> --namespace emr -c spark-kubernetes-executor | grep deletes

The following screenshot is an example of the output of either command.

Checkpoints: Stored on the executor’s local disk or in an S3 bucket

To handle fault tolerance and fail over from the last committed point, RocksDB supports checkpoints. The checkpoint files are usually stored on the executor’s disk or in an S3 bucket, including snapshot and delta or changelog data files.

Starting with Amazon EMR 7.0 and AWS Glue5.0, RocksDB state store provides a new feature called changelog checkpointing to enhance checkpoint performance. when the changelog is enabled (disabled by default) using the setting spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled, RocksDB writes smaller change logs to the storage location (the local disk by default) instead of frequently persisting large snapshot data. Note that snapshots are still created but less frequently, as shown in the following screenshot.

Here’s an example of a checkpoint location path when overridden to an S3 bucket: s3://<S3BUCKET>/<checkpointDir>/state/0/spark_parition_ID/state_version_ID.zip

Best practices and considerations

This section outlines key strategies for fine-tuning RocksDB performance and avoiding common pitfalls.

1. Memory management for RocksDB

To prevent OOM errors on Spark executors, you can configure RocksDB’s memory usage at either the node level or instance level:

  • Node level (recommended): Enforce a global off-heap memory limit per executor. In this context, each executor is treated as a RocksDB node. If an executor processes N partitions of a stateful operator, it will have N number of RocksDB instances on a single executor.
  • Instance-level: Fine-tune individual RocksDB instances.

Node-level memory control per executor

Starting with Amazon EMR 7.0 and AWS Glue 5.0 (Spark 3.5), a critical Spark configuration, boundedMemoryUsage, was introduced (through SPARK-43311) to enforce a global memory cap at a single executor level that is shared by multiple RocksDB instances. This prevents RocksDB from consuming unbounded off-heap memory, which could lead to OOM errors or executor termination by resource managers such as YARN or Kubernetes.

The following example shows the node-level configuration:

 # Bound total memory usage per executor 
 "spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage": "true"
 # Set a static total memory size per executor
 "spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB": "500"
 # For read-heavy workloads, split memory allocation between write buffers (30%) and block cache (70%) 
 "spark.sql.streaming.stateStore.rocksdb.writeBufferCacheRatio": "0.3"

A single RocksDB instance level control

For granular memory management, you can configure individual RocksDB instances using the following settings:

# Control MemTable (write buffer) size and count
"spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB": "64"
"spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber": "4"
  • writeBufferSizeMB (default: 64, suggested: 64 – 128): Controls the maximum size of a single MemTable in RocksDB, affecting memory usage and write throughput. This setting is available in Spark3.5 – [SPARK-42819] and later. It determines the size of the memory buffer before state data is flushed to disk. Larger buffer sizes can improve write performance by reducing SST flush frequency but will increase the executor’s memory usage. Adjusting this parameter is crucial for optimizing memory usage and write throughput.
  • maxWriteBufferNumber (default: 2, suggested: 3 – 4): Sets the total number of active and immutable MemTables.

For read-heavy workloads, prioritize the following block cache tuning over write buffers to reduce disk I/O. You can configure SST block size and caching as follows:

"spark.sql.streaming.stateStore.rocksdb.blockSizeKB": "64"
"spark.sql.streaming.stateStore.rocksdb.blockCacheSizeMB": "128"
  •  blockSizeKB (default: 4, suggested: 64–128): When an active MemTable is full, it becomes a read-only memTable. From there, new writes continue to accumulate in a new table. The read-only MemTable is flushed into SST files on the disk. The data in SST files is approximately chunked into fixed-sized blocks (default is 4 KB). Each block, in turn, keeps multiple data entries. When writing data to SST files, you can compress or encode data efficiently within a block, which often results in a smaller data size compared with its raw format.

For workloads with a small state size (such as less than 10 GB), the default block size is usually sufficient. For a large state (such as more than 50 GB), increasing the block size can improve compression efficiency and sequential read performance but increase CPU overhead.

  • blockCacheSizeMB (default: 8, suggested: 64–512, large state: more than 1024): When retrieving data from SST files, RocksDB provides a cache layer (block cache) to improve the read performance. It first locates the data block where the target record might reside, then caches the block to memory, and finally searches that record within the cached block. To avoid frequent reads of the same block, the block cache can be used to keep the loaded blocks in memory.

2. Clean up state data at checkpoint

To help ensure that your state file sizes and storage costs remain under control when checkpoint performance becomes a concern, use the following Spark configurations to adjust cleanup frequency, retention limits, and checkpoint file types:

# clean up RocksDB state every 30 seconds
"spark.sql.streaming.stateStore.maintenanceInterval":"30s"
# retain only the last 50 state versions  
"spark.sql.streaming.minBatchesToRetain":"50"
# use changelog instead of snapshots
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled":"true"
  • maintenanceInterval (default: 60 seconds): Retaining a state for a long period of time can help reduce maintenance cost and background IO. However, longer intervals increase file listing time, because state stores often scan every retained file.
  • minBatchesToRetain (default: 100, suggested: 10–50): Limits the number of state versions retained at checkpoint locations. Reducing this number results in fewer files being persisted and reduces storage usage.
  • changelogCheckpointing (default: false, suggested: true): Traditionally, RocksDB snapshots and uploads incremental SST files to checkpoint. To avoid this cost, changelog checkpointing was introduced in Amazon EMR7.0+ and AWS Glue 5.0, which write only state changes since the last checkpoint.

To track an SST file’s retention status, you can search RocksDBFileManager entries in the executor logs. Consider the following logs in Amazon EMR on EKS as an example. The output (shown in the screenshot) shows that four SST files under version 102 were uploaded to an S3 checkpoint location, and that an old changelog state file with version 97 was cleaned up.

cat /var/log/spark/user/<spark_app_name>-<spark_executor_ID>/stderr/ current | grep RocksDBFileManager

or

kubectl logs <spark_executor_pod_name> -n emr -c spark-kubernetes-executor | grep RocksDBFileManager

3. Optimize local disk usage

RocksDB consumes local disk space when generating SST files at each Spark executor. While disk usage doesn’t scale linearly, RocksDB can accumulate storage over time based on state data size. When running streaming jobs, if local available disk space gets insufficient, No space left on device errors can occur.

To optimize disk usage by RocksDB, adjust the following Spark configurations:

# compact state files during commit (default:false)
"spark.sql.streaming.stateStore.rocksdb.compactOnCommit": "true"
# number of delta SST files before becomes a consolidated snapshot file(default:10)
"spark.sql.streaming.stateStore.minDeltasForSnapshot": "5" 

Infrastructure adjustments can further mitigate the disk issue:

For Amazon EMR:

For AWS Glue:

  • Use AWS Glue G.2X or larger worker types to avoid the limited disk capacity of G.1X workers.
  • Schedule regular maintenance windows at optimal timing to free up disk space based on workload needs.

Conclusion

In this post, we explored RocksDB as the new state store implementation in Apache Spark Structured Streaming, available on Amazon EMR and AWS Glue. RocksDB offers advantages over the default HDFS-backed in-memory state store, particularly for applications dealing with large-scale stateful operations. RocksDB helps prevent JVM memory pressure and garbage collection issues common with the default state store.

The implementation is straightforward, requiring minimal configuration changes, though you should pay careful attention to memory and disk space management for optimal performance. While RocksDB is not guaranteed to reduce job latency, it provides a robust solution for handling large-scale stateful operations in Spark Structured Streaming applications.

We encourage you to evaluate RocksDB for your use cases, particularly if you’re experiencing memory pressure issues with the default state store or need to handle large amounts of state data in your streaming applications.


About the authors

Melody Yang is a Senior Big Data Solution Architect for Amazon EMR at AWS. She is an experienced analytics leader working with AWS customers to provide best practice guidance and technical advice in order to assist their success in data transformation. Her areas of interests are open-source frameworks and automation, data engineering and DataOps.

Dai Ozaki is a Cloud Support Engineer on the AWS Big Data Support team. He is passionate about helping customers build data lakes using ETL workloads. In his spare time, he enjoys playing table tennis.

Noritaka Sekiyama is a Principal Big Data Architect with Amazon Web Services (AWS) Analytics services. He’s responsible for building software artifacts to help customers. In his spare time, he enjoys cycling on his road bike.

Amir Shenavandeh is a Sr Analytics Specialist Solutions Architect and Amazon EMR subject matter expert at Amazon Web Services. He helps customers with architectural guidance and optimisation. He leverages his experience to help people bring their ideas to life, focusing on distributed processing and big data architectures.

Xi Yang is a Senior Hadoop System Engineer and Amazon EMR subject matter expert at Amazon Web Services. He is passionate about helping customers resolve challenging issues in the Big Data area.

Using AWS Glue Data Catalog views with Apache Spark in EMR Serverless and Glue 5.0

Post Syndicated from Aarthi Srinivasan original https://aws.amazon.com/blogs/big-data/using-aws-glue-data-catalog-views-with-apache-spark-in-emr-serverless-and-glue-5-0/

The AWS Glue Data Catalog has expanded its Data Catalog views feature, and now supports Apache Spark environments in addition to Amazon Athena and Amazon Redshift. This enhancement, launched in March 2025, now makes it possible to create, share, and query multi-engine SQL views across Amazon EMR Serverless, Amazon EMR on Amazon EKS, and AWS Glue 5.0 Spark, as well as Athena and Amazon Redshift Spectrum. The multi-dialect views empower data teams to create SQL views one time and query them through supported engines—whether it’s Athena for ad-hoc analytics, Amazon Redshift for data warehousing, or Spark for large-scale data processing. This cross-engine compatibility means data engineers can focus on building data products rather than managing multiple view definitions or complex permission schemes. Using AWS Lake Formation permissions, organizations can share these views within the same AWS account, across different AWS accounts, and with AWS IAM Identity Center users and groups, without granting direct access to the underlying tables. Features of Lake Formation such as fine-grained access control (FGAC) using Lake Formation-tag based access control (LF-TBAC) can be applied to Data Catalog views, enabling scalable sharing and access control across organizations.

In an earlier blog post, we demonstrated the creation of Data Catalog views using Athena, adding a SQL dialect for Amazon Redshift, and querying the view using Athena and Amazon Redshift. In this post, we guide you through the process of creating a Data Catalog view using EMR Serverless, adding the SQL dialect to the view for Athena, sharing it with another account using LF-Tags, and then querying the view in the recipient account using a separate EMR Serverless workspace and AWS Glue 5.0 Spark job and Athena. This demonstration showcases the versatility and cross-account capabilities of Data Catalog views and access through various AWS analytics services.

Benefits of Data Catalog views

The following are key benefits of Data Catalog views for business solutions:

  • Targeted data sharing and access control – Data Catalog views, combined with the sharing capabilities of Lake Formation, enable organizations to provide specific data subsets to different teams or departments without duplicating data. For example, a retail company can create views that show sales data to regional managers while restricting access to sensitive customer information. By applying LF-TBAC to these views, companies can efficiently manage data access across large, complex organizational structures, maintaining compliance with data governance policies while promoting data-driven decision-making.
  • Multi-service analytics integration – The ability to create a view in one analytics service and query it across Athena, Amazon Redshift, EMR Serverless, and AWS Glue 5.0 Spark breaks down data silos and promotes a unified analytics approach. This feature allows businesses to use the strengths of different services for various analytics needs. For instance, a financial institution could create a view of transaction data and use Athena for ad-hoc queries, Amazon Redshift for complex aggregations, and EMR Serverless for large-scale data processing—all without moving or duplicating the data. This flexibility accelerates insights and improves resource utilization across the analytics stack.
  • Centralized auditing and compliance – With views stored in the central Data Catalog, businesses can maintain a comprehensive audit trail of data access across connected accounts using AWS CloudTrail logs. This centralization is crucial for industries with strict regulatory requirements, such as healthcare or finance. Compliance officers can seamlessly monitor and report on data access patterns, detect unusual activities, and demonstrate adherence to data protection regulations like GDPR or HIPAA. This centralized approach simplifies compliance processes and reduces the risk of regulatory violations.

These capabilities of Data Catalog views provide powerful solutions for businesses to enhance data governance, improve analytics efficiency, and maintain robust compliance measures across their data ecosystem.

Solution overview

An example company has multiple datasets containing details of their customers’ purchase details mixed with personally identifiable information (PII) data. They categorize their datasets based on sensitivity of the information. The data steward wants to share a subset of their preferred customers data for further analysis downstream by their data engineering team.

To demonstrate this use case, we use sample Apache Iceberg tables customer and customer_address. We create a Data Catalog view from these two tables to filter by preferred customers. We then use LF-Tags to share restricted columns of this view to the downstream engineering team. The solution is represented in the following diagram.

arch diagram

Prerequisites

To implement this solution, you need two AWS accounts with an AWS Identity and Access Management (IAM) admin role. We use the role to run the provided AWS CloudFormation templates and also use the same IAM roles added as Lake Formation administrator.

Set up infrastructure in the producer account

We provide a CloudFormation template that deploys the following resources and completes the data lake setup:

  • Two Amazon Simple Storage Service (Amazon S3) buckets: one for scripts, logs, and query results, and one for the data lake storage.
  • Lake Formation administrator and catalog settings. The IAM admin role that you provide is registered as Lake Formation administrator. Cross-account sharing version is set to 4. Default permissions for newly created databases and tables is set to use Lake Formation permissions only.
    data catalog settings
  • An IAM role with read, write, and delete permissions on the data lake bucket objects. The data lake bucket is registered with Lake Formation using this IAM role.
    data lake locations
  • An AWS Glue database for the data lake.
  • Lake Formation tags. These tags are attached to the database.
    lf-tags
  • CSV and Iceberg format tables in the AWS Glue database. The CSV tables are pointing to s3://redshift-downloads/TPC-DS/2.13/10GB/ and the Iceberg tables are stored in the user account’s data lake bucket.
  • An Athena workgroup.
  • An IAM role and an AWS Lambda function to run Athena queries. Athena queries are run in the Athena workgroup to insert data from CSV tables to Iceberg tables. Relevant Lake Formation permissions are granted to the Lambda role.
    lf-tables
  • An EMR Studio and related virtual private cloud (VPC), subnet, routing table, security groups, and EMR Studio service IAM role.
  • An IAM role with policies for the EMR Studio runtime. Relevant Lake Formation permissions are granted to this role on the Iceberg tables. This role will be used as the definer role to create the Data Catalog view. A definer role is the IAM role with necessary permissions to access the referenced tables, and runs the SQL statement that defines the view.

Complete the following steps in your producer AWS account:

  1. Sign in to the AWS Management Console as an IAM administrator role.
  2. Launch the CloudFormation stack.

Allow approximately 5 minutes for the CloudFormation stack to complete creation. After the CloudFormation has completed launching, proceed with the following instructions.

  1. If you’re using the producer account in Lake Formation for the first time, on the Lake Formation console, create a database named default and grant describe permission on the default database to runtime role GlueViewBlog-EMRStudio-RuntimeRole.
    data permissions

Create an EMR Serverless application

Complete the following steps to create an EMR Serverless application in your EMR Studio:

  1. On the Amazon EMR console, under EMR Studio in the navigation pane, choose Studios.
  2. Choose GlueViewBlog-emrstudio and choose the URL link of the Studio to open it.
    glueviewblog-emrstudio
  3. On the EMR Studio dashboard, choose Create application.
    emr-studio-dashboard

You will be directed to the Create application page on EMR Studio. Let’s create a Lake Formation enabled EMR Serverless application.

  1. Under Application settings, provide the following information:
    1. For Name, enter a name (for example, emr-glueview-application).
    2. For Type, choose Spark.
    3. For Release version, choose emr-7.8.0.
    4. For Architecture, choose x86_64.
  2. Under Application setup options, select Use custom settings.
  3. Under Interactive endpoint, select Enable endpoint for EMR studio.
  4. Under Additional configurations, for Metastore configuration, select Use AWS Glue Data Catalog as metastore, then select Use Lake Formation for fine-grained access control.
  5. Under Network connections, choose emrs-vpc for VPC, enter any two private subnets, and enter emr-serverless-sg for Security groups.
  6. Choose Create and start the application.

Create an EMR Workspace

Complete the following steps to create an EMR Workspace:

  1. On the EMR Studio console, choose Workspaces in the navigation pane and choose Create Workspace.
  2. Enter a Workspace name (for example, emrs-glueviewblog-workspace).
  3. Leave all other settings as default and choose Create Workspace.
  4. Choose Launch Workspace. Your browser might request to allow pop-up permissions for the first time launching the Workspace.
  5. After the Workspace is launched, in the navigation pane, choose Compute.
  6. For Compute type, select EMR Serverless application and enter emr-glueview-application for the application and GlueViewBlog-EMRStudio-RuntimeRole for Interactive runtime role.
  7. Make sure the kernel attached to the Workspace is PySpark.

Create a Data Catalog view and verify

Complete the following steps:

  1. Download the notebook glueviewblog_producer.ipynb. The code creates a Data Catalog view customer_nonpii_view from the two Iceberg tables, customer_iceberg and customer_address_iceberg, in the database glueviewblog_<account-id>_db.
  2. On your EMR Workspace emrs-glueviewblog-workspace, go to the File browser section and choose Upload files.
  3. Upload glueviewblog_producer.ipynb.
  4. Update the data lake bucket name, AWS account ID, and AWS Region to match your resources.
  5. Update the database_name, table1_name, and table2_name to match your resources.
  6. Save the notebook.
  7. Choose the double arrow icon to restart the kernel and rerun the notebook.

The Data Catalog view customer_nonpii_view is created and verified.

  1. In the navigation pane on the Lake Formation console, under Data Catalog, choose Views.
  2. Choose the new view customer_nonpii_view.
  3. On the SQL definitions tab, verify EMR with Apache Spark shows up for Engine name.
  4. Choose the tab LF-Tags. The view should show the LF-Tag sensitivity=pii-confidential inherited from the database.
  5. Choose Edit LF-Tags.
  6. On the Values dropdown menu, choose confidential to overwrite the Data Catalog view’s key value of sensitivity from pii-confidential.
  7. Choose Save.

With this, we have created a non-PII view to share with the data engineering team from the datasets that has PII information of customers.

Add Athena SQL dialect to the view

With the view customer_nonpii_view having been created by the EMR runtime role GlueViewBlog-EMRStudio-RuntimeRole, the Admin will have only describe permissions on it as a database creator and Lake Formation administrator. In this step, the Admin will grant itself alter permissions on the view, in order to add the Athena SQL dialect to the view.

  1. On the Lake Formation console, in the navigation pane, choose Data permissions.
  2. Choose Grant and provide the following information:
    1. For Principals, enter Admin.
    2. For LF-Tags or catalog resources, select Resources matched by LF-Tags.
    3. For Key, choose sensitivity.
    4. For Values, choose confidential and pii-confidential.
    5. Under Database permissions, select Super for Database permissions and Grantable permissions.
    6. Under Table permissions, select Super for Table permissions and Grantable permissions.
    7. Choose Grant.
  3. Verify the LF-Tags based permissions the Admin.
  4. Open the Athena query editor, choose the Workgroup GlueViewBlogWorkgroup and choose the AWS Glue database glueviewblog_<accountID>_db.
  5. Run the following query. Replace <accountID> with your account ID.
    ALTER VIEW glueviewblog_<accountID>_db.customer_nonpii_view ADD DIALECT
    AS
    select c_customer_id, c_customer_sk, c_last_review_date, ca_country, ca_location_type
    from glueviewblog__<accountID>_db.customer_iceberg, glueviewblog__<accountID>_db.customer_address_iceberg
    where c_current_addr_sk = ca_address_sk and c_preferred_cust_flag='Y';

  6. Verify the Athena dialect by running a preview on the view.
  7. On the Lake Formation console, verify the SQL dialects on the view customer_nonpii_view.

Share the view to the consumer account

Complete the following steps to share the Data Catalog view to the consumer account:

  1. On the Lake Formation console, in the navigation pane, choose Data permissions.
  2. Choose Grant and provide the following information:
    1. For Principals, select External accounts and enter the consumer account ID.
    2. For LF-Tags or catalog resources, select Resources matched by LF-Tags.
    3. For Key, choose sensitivity.
    4. For Values, choose confidential.
    5. Under Database permissions, select Describe for Database permissions and Grantable permissions.
    6. Under Table permissions, select Describe and Select for Table permissions and Grantable permissions.
    7. Choose Grant.
  3. Verify granted permissions on the Data permissions page.

With this, the producer account data steward has created a Data Catalog view of a subset of data from two tables in their Data Catalog, using the EMR runtime role as the definer role. They have shared it to their analytics account using LF-Tags to run further processing of the data downstream.

Set up infrastructure in the consumer account

We provide a CloudFormation template to deploy the following resources and set up the data lake as follows:

  • An S3 bucket for Amazon EMR and AWS Glue logs
  • Lake Formation administrator and catalog settings similar to the producer account setup
  • An AWS Glue database for the data lake
  • An EMR Studio and related VPC, subnet, routing table, security groups, and EMR Studio service IAM role
  • An IAM role with policies for the EMR Studio runtime

Complete the following steps in your consumer AWS account:

  1. Sign in to the console as an IAM administrator role.
  2. Launch the CloudFormation stack.

Allow approximately 5 minutes for the CloudFormation stack to complete creation. After the CloudFormation has completed launching, proceed with the following instructions.

  1. If you’re using the consumer account Lake Formation for the first time, on the Lake Formation console, create a database named default and grant describe permission on the default database to runtime role GlueViewBlog-EMRStudio-Consumer-RuntimeRole.

Accept AWS RAM shares in the consumer account

You can now log in to the AWS consumer account and accept the AWS RAM invitation:

  1. Open the AWS RAM console with the IAM role that has AWS RAM access.
  2. In the navigation pane, choose Resource shares under Shared with me.

You should see two pending resource shares from the producer account.

  1. Accept both invitations.

Create a resource link for the shared view

To access the view that was shared by the producer AWS account, you need to create a resource link in the consumer AWS account. A resource link is a Data Catalog object that is a link to a local or shared database, table, or view. After you create a resource link to a view, you can use the resource link name wherever you would use the view name. Furthermore, you can grant permission on the resource link to the job runtime role GlueViewBlog-EMRStudio-Consumer-RuntimeRole to access the view through EMR Serverless Spark.

To create a resource link, complete the following steps:

  1. Open the Lake Formation console as the Lake Formation data lake administrator in the consumer account.
  2. In the navigation pane, choose Tables.
  3. Choose Create and Resource link.
  4. For Resource link name, enter the name of the resource link (for example, customer_nonpii_view_rl).
  5. For Database, choose the glueviewblog_customer_<accountID>_db database.
  6. For Shared table region, choose the Region of the shared table.
  7. For Shared table, choose customer_nonpii_view.
  8. Choose Create.

Grant permissions on the database to the EMR job runtime role

Complete the following steps to grant permissions on the database glueviewblog_customer_<accountID>_db to the EMR job runtime role:

  1. On the Lake Formation console, in the navigation pane, choose Databases.
  2. Select the database glueviewblog_customer_<accountID>_db and on the Actions menu, choose Grant.
  3. In the Principles section, select IAM users and roles, and choose GlueViewBlog-EMRStudio-Consumer-RuntimeRole.
  4. In the Database permissions section, select Describe.
  5. Choose Grant.

Grant permissions on the resource link to the EMR job runtime role

Complete the following steps to grant permissions on the resource link customer_nonpii_view_rl to the EMR job runtime role:

  1. On the Lake Formation console, in the navigation pane, choose Tables.
  2. Select the resource link customer_nonpii_view_rl and on the Actions menu, choose Grant.
  3. In the Principles section, select IAM users and roles, and choose GlueViewBlog-EMRStudio-Consumer-RuntimeRole.
  4. In the Resource link permissions section, select Describe for Resource link permissions.
  5. Choose Grant.

This allows the EMR Serverless job runtime roles to describe the resource link. We don’t make any selections for grantable permissions because runtime roles shouldn’t be able to grant permissions to other principles.

Grant permissions on the target for the resource link to the EMR job runtime role

Complete the following steps to grant permissions on the target for the resource link customer_nonpii_view_rl to the EMR job runtime role:

  1. On the Lake Formation console, in the navigation pane, choose Tables.
  2. Select the resource link customer_nonpii_view_rl and on the Actions menu, choose Grant on target.
  3. In the Principles section, select IAM users and roles, and choose GlueViewBlog-EMRStudio-Consumer-RuntimeRole.
  4. In the View permissions section, select Select and Describe.
  5. Choose Grant.

Set up an EMR Serverless application and Workspace in the consumer account

Repeat the steps to create an EMR Serverless application in the consumer account.

Repeat the steps to create a Workspace in the consumer account. For Compute type, select EMR Serverless application and enter emr-glueview-application for the application and GlueViewBlog-EMRStudio-Consumer-RuntimeRole as the runtime role.

Verify access using interactive notebooks from EMR Studio

Complete the following steps to verify access in EMR Studio:

  1. Download the notebook glueviewblog_emr_consumer.ipynb. The code runs a select statement on the view shared from the producer.
  2. In your EMR Workspace emrs-glueviewblog-workspace, navigate to the File browser section and choose Upload files.
  3. Upload glueviewblog_emr_consumer.ipynb.
  4. Update the data lake bucket name, AWS account ID, and Region to match your resources.
  5. Update the database to match your resources.
  6. Save the notebook.
  7. Choose the double arrow icon to restart the kernel with PySpark kernel and rerun the notebook.

Verify access using interactive notebooks from AWS Glue Studio

Complete the following steps to verify access using AWS Glue Studio:

  1. Download the notebook glueviewblog_glue_consumer.ipynb
  2. Open the AWS Glue Studio console.
  3. Choose Notebook and then choose Upload notebook.
  4. Upload the notebook glueviewblog_glue_consumer.ipynb.
  5. For IAM role, choose GlueViewBlog-EMRStudio-Consumer-RuntimeRole.
  6. Choose Create notebook.
  7. Update the data lake bucket name, AWS account ID, and Region to match your resources.
  8. Update the database to match your resources.
  9. Save the notebook.
  10. Run all the cells to verify fine-grained access.

Verify access using the Athena query editor

Because the view from the producer account was shared to the consumer account, the Lake Formation administrator will have access to the view in the producer account. Also, because the lake admin role created the resource link pointing to the view, it will also have access to the resource link. Go to the Athena query editor and run a simple select query on the resource link.

The analytics team in the consumer account was able to access a subset of the data from a business data producer team, using their analytics tools of choice.

Clean up

To avoid incurring ongoing costs, clean up your resources:

  1. In your consumer account, delete AWS Glue notebook, stop and delete the EMR application, and then delete EMR Workspace.
  2. In your consumer account, delete the CloudFormation stack. This should remove the resources launched by the stack.
  3. In your producer account, log in to the Lake Formation console and revoke the LF-Tags based permissions you had granted to the consumer account.
  4. In your producer account, stop and delete the EMR application and then delete the EMR Workspace.
  5. In your producer account, delete the CloudFormation stack. This should delete the resources launched by the stack.
  6. Review and clean up any additional AWS Glue and Lake Formation resources and permissions.

Conclusion

In this post, we demonstrated a powerful, enterprise-grade solution for cross-account data sharing and analysis using AWS services. We walked you through how to do the following key steps:

  • Create a Data Catalog view using Spark in EMR Serverless within one AWS account
  • Securely share this view with another account using LF-TBAC
  • Access the shared view in the recipient account using Spark in both EMR Serverless and AWS Glue ETL
  • Implement this solution with Iceberg tables (it’s also compatible other open table formats like Apache Hudi and Delta Lake)

The solution approach with multi-dialect data catalog views provided in this post is particularly valuable for enterprises looking to modernize their data infrastructure while optimizing costs, improve cross-functional collaboration while enhancing data governance, and accelerate business insights while maintaining control over sensitive information.

Refer to the following information about Data Catalog views with individual analytics services, and try out the solution. Let us know your feedback and questions in the comments section.


About the Authors

Aarthi Srinivasan is a Senior Big Data Architect with Amazon SageMaker Lakehouse. As part of the SageMaker Lakehouse team, she works with AWS customers and partners to architect lake house solutions, enhance product features, and establish best practices for data governance.

Praveen Kumar is an Analytics Solutions Architect at AWS with expertise in designing, building, and implementing modern data and analytics platforms using cloud-based services. His areas of interest are serverless technology, data governance, and data-driven AI applications.

Dhananjay Badaya is a Software Developer at AWS, specializing in distributed data processing engines including Apache Spark and Apache Hadoop. As a member of the Amazon EMR team, he focuses on designing and implementing enterprise governance features for EMR Spark.

Build a centralized observability platform for Apache Spark on Amazon EMR on EKS using external Spark History Server

Post Syndicated from Sri Potluri original https://aws.amazon.com/blogs/big-data/build-a-centralized-observability-platform-for-apache-spark-on-amazon-emr-on-eks-using-external-spark-history-server/

Monitoring and troubleshooting Apache Spark applications become increasingly complex as companies scale their data analytics workloads. As data processing requirements grow, enterprises deploy these applications across multiple Amazon EMR on EKS clusters to handle diverse workloads efficiently. However, this approach creates a challenge in maintaining comprehensive visibility into Spark applications running across these separate clusters. Data engineers and platform teams need a unified view to effectively monitor and optimize their Spark applications.

Although Spark provides powerful built-in monitoring capabilities through Spark History Server (SHS), implementing a scalable and secure observability solution across multiple clusters requires careful architectural considerations. Organizations need a solution that not only consolidates Spark application metrics but extends its features by adding other performance monitoring and troubleshooting packages while providing secure access to these insights and maintaining operational efficiency.

This post demonstrates how to build a centralized observability platform using SHS for Spark applications running on EMR on EKS. We showcase how to enhance SHS with performance monitoring tools, with a pattern applicable to many monitoring solutions such as SparkMeasure and DataFlint. In this post, we use DataFlint as an example to demonstrate how you can integrate additional monitoring features. We explain how to collect Spark events from multiple EMR on EKS clusters into a central Amazon Simple Storage Service (Amazon S3) bucket; deploy SHS on a dedicated Amazon Elastic Kubernetes Service (Amazon EKS) cluster; and configure secure access using AWS Load Balancer Controller, AWS Private Certificate Authority, Amazon Route 53, and AWS Client VPN. This solution provides teams with a single, secure interface to monitor, analyze, and troubleshoot Spark applications across multiple clusters.

Overview of solution

Consider DataCorp Analytics, a data-driven enterprise running multiple business units with diverse Spark workloads. Their Financial Analytics team processes time-sensitive trading data requiring strict processing times and dedicated resources, and their Marketing Analytics team handles customer behavior data with flexible requirements, requiring multiple EMR on EKS clusters to accommodate these distinct workload patterns. As their Spark applications grow in number and complexity across these clusters, data and platform engineers struggle to maintain comprehensive visibility while maintaining secure access to monitoring tools.

This scenario presents an ideal use case for implementing a centralized observability platform using SHS and DataFlint. The solution deploys SHS on a dedicated EKS cluster, configured to read events from multiple EMR on EKS clusters through a centralized S3 bucket. Access is secured through Load Balancer Controller, AWS Private CA, Route 53, and Client VPN, and DataFlint enhances the monitoring capabilities with additional insights and visualizations. The following architecture diagram illustrates the components and their interactions.

Architecture diagram

The solution workflow is as follows:

  1. Spark applications on EMR on EKS use a custom EMR Docker image that includes DataFlint JARs for enhanced metrics collection. These applications generate detailed event logs containing execution metrics, performance data, and DataFlint-specific insights. The logs are written to a centralized Amazon S3 location through the following configuration (note especially the configurationOverrides section). For additional information, explore the StartJobRun guide to learn how to run Spark jobs and review the StartJobRun API reference.
{
  "name": "${SPARK_JOB_NAME}", 
  "virtualClusterId": "${VIRTUAL_CLUSTER_ID}",  
  "executionRoleArn": "${IAM_ROLE_ARN_FOR_JOB_EXECUTION}",
  "releaseLabel": "emr-7.2.0-latest", 
  "jobDriver": {
    "sparkSubmitJobDriver": {
      "entryPoint": "s3://${S3_BUCKET_NAME}/app/${SPARK_APP_FILE}",
      "entryPointArguments": [
        "--input-path",
        "s3://${S3_BUCKET_NAME}/data/input",
        "--output-path",
        "s3://${S3_BUCKET_NAME}/data/output"
      ],
       "sparkSubmitParameters": "--conf spark.driver.cores=1 --conf spark.driver.memory=4G --conf spark.kubernetes.driver.limit.cores=1200m --conf spark.executor.cores=2  --conf spark.executor.instances=3  --conf spark.executor.memory=4G"
    }
  }, 
  "configurationOverrides": {
    "applicationConfiguration": [
      {
        "classification": "spark-defaults", 
        "properties": {
          "spark.driver.memory":"2G",
          "spark.kubernetes.container.image": "${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/${EMR_REPO_NAME}:${EMR_IMAGE_TAG}",
          "spark.app.name": "${SPARK_JOB_NAME}"
          "spark.eventLog.enabled": "true",
          "spark.eventLog.dir": "s3://${S3_BUCKET_NAME}/spark-events/"
         }
      }
    ], 
    "monitoringConfiguration": {
      "persistentAppUI": "ENABLED",
      "s3MonitoringConfiguration": {
        "logUri": "s3://${S3_BUCKET_NAME}/spark-events/"
      }
    }
  }
}
  1. A dedicated SHS deployed on Amazon EKS reads these centralized logs. The Amazon S3 location is configured in the SHS to read from the central Amazon S3 location through the following code:
env:
  - name: SPARK_HISTORY_OPTS
    value: "-Dspark.history.fs.logDirectory=s3a://${S3_BUCKET}/spark-events/"
  1. We configure Load Balancer Controller, AWS Private CA, a Route 53 hosted zone, and Client VPN to securely access the SHS UI using a web browser.
  2. Finally, users can access the SHS web interface at https://spark-history-server.example.internal/.

You can find the code base in the AWS Samples GitHub repository.

Prerequisites

Before you deploy this solution, make sure the following prerequisites are in place:

Set up a common infrastructure

Complete the following steps to set up the infrastructure:

  1. Clone the repository to your local machine and set the two environment variables. Replace <AWS_REGION> with the AWS Region where you want to deploy these resources.
git clone [email protected]:aws-samples/sample-centralized-spark-history-server-emr-on-eks.git
cd sample-centralized-spark-history-server-emr-on-eks
export REPO_DIR=$(pwd)
export AWS_REGION=<AWS_REGION>
  1. Execute the following script to create the common infrastructure. The script creates a secure virtual private cloud (VPC) networking environment with public and private subnets and an encrypted S3 bucket to store Spark application logs.
cd ${REPO_DIR}/infra
./deploy_infra.sh
  1. To verify successful infrastructure deployment, open the AWS CloudFormation console, choose your stack, and check the Events, Resources, and Outputs tabs for completion status, details, and list of resources created.

Set up EMR on EKS clusters

This section covers building a custom EMR on EKS Docker image with DataFlint integration, launching two EMR on EKS clusters (datascience-cluster-v and analytics-cluster-v), and configuring the clusters for job submission. Additionally, we set up the necessary IAM roles for service accounts (IRSA) to enable Spark jobs to write events to the centralized S3 bucket. Complete the following steps:

  1. Deploy two EMR on EKS clusters:
cd ${REPO_DIR}/emr-on-eks
./deploy_emr_on_eks.sh
  1. To verify successful creation of the EMR on EKS clusters using the AWS CLI, execute the following command:
aws emr-containers list-virtual-clusters \
    --query "virtualClusters[?state=='RUNNING']"
  1. Execute the following command for the datascience-cluster-v and analytics-cluster-v clusters to verify their respective states, container provider information, and associated EKS cluster details. Replace <VIRTUAL-CLUSTER-ID> with the ID of each cluster obtained from the list-virtual-clusters output.
aws emr-containers describe-virtual-cluster \
    --id <VIRTUAL-CLUSTER-ID>

Configure and execute Spark jobs on EMR on EKS clusters

Complete the following steps to configure and execute Spark jobs on the EMR on EKS clusters:

  1. Generate custom EMR on EKS image and StartJobRun request JSON files to run Spark jobs:
cd ${REPO_DIR}/jobs
./configure_jobs.sh

The script performs the following tasks:

  • Prepares the environment by uploading the sample Spark application spark_history_demo.py to a designated S3 bucket for job execution.
  • Creates a custom Amazon EMR container image by extending the base EMR 7.2.0 image with the DataFlint JAR for additional insights and publishing it to an Amazon Elastic Container Registry (Amazon ECR) repository.
  • Generates cluster-specific StartJobRun request JSON files for datascience-cluster-v and analytics-cluster-v.

Review start-job-run-request-datascience-cluster-v.json and start-job-run-request-analytics-cluster-v.json for additional details.

  1. Execute the following commands to submit Spark jobs on the EMR on EKS virtual clusters:
aws emr-containers start-job-run \
--cli-input-json file://${REPO_DIR}/jobs/start-job-run/start-job-run-request-datascience-cluster-v.json
aws emr-containers start-job-run \
--cli-input-json file://${REPO_DIR}/jobs/start-job-run/start-job-run-request-analytics-cluster-v.json
  1. Verify the successful generation of the logs in the S3 bucket:

aws s3 ls s3://emr-spark-logs-<AWS_ACCOUNT_ID>-<AWS_REGION>/spark-events/

You have successfully set up an EMR on EKS environment, executed Spark jobs, and collected the logs in the centralized S3 bucket. Next, we will deploy SHS, configure its secure access, and visualize the logs using it.

Set up AWS Private CA and create a Route 53 private hosted zone

Use the following code to deploy AWS Private CA and create a Route 53 private hosted zone. This will provide a user-friendly URL to connect to SHS over HTTPS.

cd ${REPO_DIR}/ssl
./deploy_ssl.sh

Set up SHS on Amazon EKS

Complete the following steps to build a Docker image containing SHS with DataFlint, deploy it on an EKS cluster using a Helm chart, and expose it through a Kubernetes service of type LoadBalancer. We use a Spark 3.5.0 base image, which includes SHS by default. However, although this simplifies deployment, it results in a larger image size. For environments where image size is critical, consider building a custom image with just the standalone SHS component instead of using the complete Spark distribution.

  1. Deploy SHS on the spark-history-server EKS cluster:
cd ${REPO_DIR}/shs
./deploy_shs.sh
  1. Verify the deployment by listing the pods and viewing the pod logs:
kubectl get pods --namespace spark-history
kubectl logs <SHS-PODNAME> --namespace spark-history
  1. Review the logs and confirm there are no errors or exceptions.

You have successfully deployed SHS on the spark-history-server EKS cluster, and configured it to read logs from the emr-spark-logs-<AWS_ACCOUNT_ID>-<AWS_REGION> S3 bucket.

Deploy Client VPN and add entry to Route 53 for secure access

Complete the following steps to deploy Client VPN to securely connect your client machine (such as your laptop) to SHS and configure Route 53 to generate a user-friendly URL:

  1. Deploy the Client VPN:
cd ${REPO_DIR}/vpn
./deploy_vpn.sh
  1. Add entry to Route 53:
cd ${REPO_DIR}/dns
./deploy_dns.sh

Add certificates to local trusted stores

Complete the following steps to add the SSL certificate to your operating system’s trusted certificate stores for secure connections:

  1. For macOS users, using Keychain Access (GUI):
    1. Open Keychain Access from Applications, Utilities, choose the System keychain in the navigation pane, and choose File, Import Items.
    2. Browse to and choose ${REPO_DIR}/ssl/certificates/ca-certificate.pem, then choose the imported certificate.
    3. Expand the Trust section and set When using this certificate to Always Trust.
    4. Close and enter your password when prompted and save.
    5. Alternatively, you can execute the following command to include the certificate in Keychain and trust it:
sudo security add-trusted-cert -d -r trustRoot -k /Library/Keychains/System.keychain "${REPO_DIR}/ssl/certificates/ca-certificate.pem"
  1. For Windows users:
    1. Rename ca-certificate.pem to ca-certificate.crt.
    2. Choose (right-click) ca-certificate.crt and choose Install Certificate.
    3. Choose Local Machine (admin rights required).
    4. Select Place all certificates in the following store.
    5. Choose Browse and choose Trusted Root Certification Authorities.
    6. Complete the installation by choosing Next and Finish.

Set up Client VPN on your client machine for secure access

Complete the following steps to install and configure Client VPN on your client machine (such as your laptop) and create a VPN connection to the AWS Cloud:

  1. Download, install, and launch the Client VPN application from the official download page for your operating system.
  2. Create your VPN profile:
    1. Choose File in the menu bar, choose Manage Profiles, and choose Add Profile.
    2. Enter a name for your profile. Example: SparkHistoryServerUI
    3. Browse to ${REPO_DIR}/vpn/client_vpn_certs/client-config.ovpn, choose the certificate file, and choose Add Profile to save your configuration.
  3. Select your newly created profile, choose Connect, and wait for the connection confirmation to establish the VPN connection.

When you’re connected, you will have secure access to the AWS resources in your environment.

VPN connection details

Securely access the SHS URL

Complete the following steps to securely access SHS using a web browser:

  1. Execute the following command to get the SHS URL:

https://spark-history-server.example.internal/

  1. Copy this URL and enter it into your web browser to access the SHS UI.

The following screenshot shows an example of the UI.

Spark History Server job summary page

  1. Choose an App ID to view its detailed execution information and metrics.

Spark History Server job detail page

  1. Choose the DataFlint tab to view detailed application insights and analytics.

DataFlint insights page

DataFlint displays various helpful metrics, including alerts, as shown in the following screenshot.

DataFlint alerts page

Clean up

To avoid incurring future charges from the resources created in this tutorial, clean up your environment after completing the steps. To remove all provisioned resources:

  1. Disconnect from the Client VPN.
  2. Run the cleanup.sh script:
cd ${REPO_DIR}/
./cleanup.sh

Conclusion

In this post, we demonstrated how to build a centralized observability platform for Spark applications using SHS and enhance SHS with performance monitoring tools like DataFlint. The solution aggregates Spark events from multiple EMR on EKS clusters into a unified monitoring interface, providing comprehensive visibility into your Spark applications’ performance and resource utilization. By using a custom EMR image with performance monitoring tool integration, we enhanced the standard Spark metrics to gain deeper insights into application behavior. If your environment uses a mix of EMR on EKS, Amazon EMR on EC2, or Amazon EMR Serverless, you can seamlessly extend this architecture to aggregate the logs from EMR on EC2 and EMR Serverless in a similar way and visualize them using SHS.

Although this solution provides a robust foundation for Spark monitoring, production deployments should consider implementing authentication and authorization. SHS supports custom authentication through javax servlet filters and fine-grained authorization through access control lists (ACLs). We encourage you to explore implementing authentication filters for secure access control, configuring user- and group-based ACLs for view and modify permissions, and setting up group mapping providers for role-based access. For detailed guidance, refer to Spark’s web UI security documentation and SHS security features.

While AWS endeavors to apply best practices for security within this example, each organization has its own policies. Please make sure to use the specific policies of your organization when deploying this solution as a starting point for implementing centralized Spark monitoring in your data processing environment.


About the authors

Sri Potluri is a Cloud Infrastructure Architect at AWS. He is passionate about solving complex problems and delivering well-structured solutions for diverse customers. His expertise spans across a range of cloud technologies, providing scalable and reliable infrastructures tailored to each project’s unique challenges.

Suvojit Dasgupta is a Principal Data Architect at AWS. He leads a team of skilled engineers in designing and building scalable data solutions for AWS customers. He specializes in developing and implementing innovative data architectures to address complex business challenges.

Build a secure serverless streaming pipeline with Amazon MSK Serverless, Amazon EMR Serverless and IAM

Post Syndicated from Shubham Purwar original https://aws.amazon.com/blogs/big-data/build-a-secure-serverless-streaming-pipeline-with-amazon-msk-serverless-amazon-emr-serverless-and-iam/

The exponential growth and vast volume of streaming data have made it a vital resource for organizations worldwide. To unlock its full potential, real-time analytics are essential for extracting actionable insights. Derived from a wide range of sources, including social media, Internet of Things (IoT) sensors, and user interactions, streaming data empowers businesses to respond promptly to emerging trends and events, make informed decisions, and stay ahead of the competition.

Commonly streaming applications use Apache Kafka for data ingestion and Apache Spark Structured Streaming for processing. However, integrating and securing these components poses considerable challenges for users. The complexity of managing certificates, keystores, and TLS configurations to connect Spark Streaming to Kafka brokers demands specialized expertise. A managed, serverless framework would greatly simplify this process, alleviating the need for manual configuration and streamlining the integration of these critical components.

To simplify the management and security of traditional streaming architectures, you can use Amazon Managed Streaming for Apache Kafka (Amazon MSK). This fully managed service simplifies data ingestion and processing. Amazon MSK Serverless alleviates the need for cluster management and scaling, and further enhances security by integrating AWS Identity and Access Management (IAM) for authentication and authorization. This consolidated approach replaces complex certificate and key management require by TLS client authentication through AWS Certificate Manager, streamlining operations and bolstering data protection. For instance, when a client attempts to write data to the cluster, MSK Serverless verifies both the client’s identity and its permissions using IAM.

For efficient data processing, you can use Amazon EMR Serverless with a Spark application built on the Spark Structured Streaming framework, enabling near real-time data processing. This setup seamlessly handles large volumes of data from MSK Serverless, using IAM authentication for secure and swift data processing.

The post demonstrates a comprehensive, end-to-end solution for processing data from MSK Serverless using an EMR Serverless Spark Streaming job, secured with IAM authentication. Additionally, it demonstrates how to query the processed data using Amazon Athena, providing a seamless and integrated workflow for data processing and analysis. This solution enables near real-time querying of the latest data processed from MSK Serverless and EMR Serverless using Athena, providing instant insights and analytics.

Solution overview

The following diagram illustrates the architecture that you implement through this post.

The workflow consists of the following steps:

  1. The architecture begins with an MSK Serverless cluster set up with IAM authentication. An Amazon Elastic Compute Cloud (Amazon EC2) instance runs a Python script producer.py that acts as a data producer, sending sample data to a Kafka topic within the cluster.
  2. The Spark Streaming job retrieves data from the Kafka topic, stores it in Amazon Simple Storage Service (Amazon S3), and creates a corresponding table in the AWS Glue Data Catalog. As it continuously consumes data from the Kafka topic, the job stays up-to-date with the latest streaming data. With checkpointing enabled, the job tracks processed records, allowing it to resume from where it left off in case of a failure, providing seamless data processing.
  3. To analyze this data, users can use Athena, a serverless query service. Athena enables interactive SQL-based exploration of data directly in Amazon S3 without the need for complex infrastructure management.

Prerequisites

Before getting started, make sure you have the following:

  • An active AWS account with billing enabled
  • An IAM user with administrator access (AdministratorAccess policy) or specific permissions to create and manage resources such as a virtual private cloud (VPC), subnet, security group, IAM roles, NAT gateway, internet gateway, EC2 client, MSK Serverless, EMR Serverless, Amazon EMR Studio, and S3 buckets
  • Sufficient VPC capacity in your chosen AWS Region

Although using an IAM user with administrator access will work, it’s recommended to follow the principle of least privilege in production environments by creating custom IAM policies with only the necessary permissions. The IAM user we create has the AdministrativeAccess policy attached to it. However, you might not need such elevated access.

For this post, we create the solution resources in the us-east-2 Region using AWS CloudFormation templates. In the following sections, we show you how to configure your resources and implement the solution.

Create MSK Serverless and EMR Serverless resources

The vpc-msk-emr-serverless-studio.yaml stack creates a VPC, subnet, security group, IAM roles, NAT gateway, internet gateway, EC2 client, MSK Serverless, EMR Serverless, EMR Studio, and S3 buckets. To create the solution resources, complete the following steps:

  1. Launch the stack vpc-msk-emr-serverless-studio using the CloudFormation template:

  1. Provide the parameter values as listed in the following table.
Parameters Description Sample value
EnvironmentName An environment name that is prefixed to resource names. msk-emr-serverless-pipeline
InstanceType Amazon MSK client EC2 instance type. t2.micro
LatestAmiId Latest AMI ID of Amazon Linux 2023 for ec2 instance. You can use the default value. /aws/service/ami-amazon-linux-latest/al2023-ami-kernel-6.1-x86_64
VpcCIDR IP range (CIDR notation) for this VPC. 10.192.0.0/16
PublicSubnet1CIDR IP range (CIDR notation) for the public subnet in the first Availability Zone. 10.192.10.0/24
PublicSubnet2CIDR IP range (CIDR notation) for the public subnet in the second Availability Zone. 10.192.11.0/24
PrivateSubnet1CIDR IP range (CIDR notation) for the private subnet in the first Availability Zone. 10.192.20.0/24
PrivateSubnet2CIDR IP range (CIDR notation) for the private subnet in the second Availability Zone. 10.192.21.0/24

The stack creation process can take approximately 10 minutes to complete. You can check the Outputs tab for the stack after the stack is created.

Next, you set up the data ingestion to the Kafka topic from the Kafka EC2 instance.

Produce records to Kafka topic

Complete the following steps to set up data ingestion:

  1. On the Amazon EC2 console, go to the EC2 instance that you created using the CloudFormation template.

  1. Log in to the EC2 instance using Session Manager, a capability of AWS Systems Manager.
  2. Choose the instance msk-emr-serverless-blog and then choose Connect.

  1. Create a Kafka topic in MSK Serverless from the EC2 instance.
    1. In the following export command, replace my-endpoint with the MSKBootstrapServers value from the CloudFormation stack output:
      $ sudo su - ec2-user
      $ BS=<your-msk-serverless-endpoint (e.g.) boot-xxxxxx.yy.kafka-serverless.us-east-2.amazonaws.com:9098>

    2. Run the following command on the EC2 instance to create a topic called sales_data_topic:

Kafka client already installed at ec2-user home directory (/home/ec2-user) with MSK IAM Authentication jar and client configuration also created (/home/ec2-user/kafka_2.12-2.8.1/bin/client.properties) with IAM authentication properties.

The following code shows the contents of client.properties:

security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

/home/ec2-user/kafka_2.12-2.8.1/bin/kafka-topics.sh \
--bootstrap-server $BS \
--command-config /home/ec2-user/kafka_2.12-2.8.1/bin/client.properties \
--create --topic sales_data_topic \
--partitions 10

Created topic sales_data_topic.
  1. Run the following command to produce records to the Kafka topic using the syntheticSalesDataProducer.py Python script present in EC2 instance. Update the Region accordingly.
nohup python3 -u syntheticSalesDataProducer.py --num_records 1000 \
--sales_data_topic sales_data_topic --bootstrap_server $BS \
--region=us-east-2 > syntheticSalesDataProducer.log &

Understanding Amazon MSK IAM authentication with EMR Serverless

Amazon MSK IAM authentication enables secure authentication and authorization for Kafka clusters (MSK Serverless) using IAM roles. When integrating with EMR Serverless Spark Streaming, Amazon MSK IAM authentication allows Spark jobs to access Kafka topics securely, using IAM roles for fine-grained access control. This provides secure data processing and streaming.

IAM policy configuration

To enable EMR Serverless jobs to authenticate with an MSK Serverless cluster using IAM, you need to attach specific Kafka-related IAM permissions to the EMR Serverless job execution role. These permissions allow the job to perform essential operations on the Kafka cluster, topics, and consumer groups.The following IAM policy must be attached to the EMR Serverless job execution role to enable necessary permissions:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:DescribeCluster"
            ],
            "Resource": [
                "arn:aws:kafka:<AWS-REGION>:<ACCOUNTID>:cluster/<SERVERLESS_CLUSTER_NAME>/<ID>"
            ],
            "Effect": "Allow"
        },
        {
            "Action": [
                "kafka-cluster:CreateTopic",
                "kafka-cluster:DescribeTopic",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                "arn:aws:kafka:<AWS-REGION>:<ACCOUNTID>:topic/<SERVERLESS_CLUSTER_NAME>/*/*"
            ],
            "Effect": "Allow"
        },
        {
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:<AWS-REGION>:<ACCOUNTID>:group/<SERVERLESS_CLUSTER_NAME>/*/*"
            ],
            "Effect": "Allow"
        }
    ]
}

This code refers to the following actions:

  • Connect, DescribeCluster – Required to initiate a secure connection and obtain metadata
  • DescribeTopic, ReadData, WriteData – Enables data consumption and production
  • CreateTopic (optional) – Allows dynamic topic creation
  • AlterGroup, DescribeGroup – Needed for consumer group management in streaming jobs

These permissions make sure that the Spark Streaming job can securely authenticate and interact with MSK Serverless resources using its IAM role.

Required dependencies

To enable Amazon MSK IAM authentication in Spark (especially on EMR Serverless), specific JAR dependencies must be included in your Spark Streaming job using sparkSubmitParameters:

  • spark-sql-kafka-0-10_2.12 – This is the Kafka connector for Spark Structured Streaming. It provides the DataFrame API to read from and write to Kafka.
  • aws-msk-iam-auth – This JAR provides the IAM authentication mechanism required to connect to MSK Serverless using the AWS_MSK_IAM SASL mechanism.

You can include these dependencies directly by specifying them in the --packages argument when submitting the EMR Serverless job. For example:

--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,software.amazon.msk:aws-msk-iam-auth:2.2.0

When the job is submitted, EMR Serverless will automatically download these JARs from Maven Central (or another configured repository) at runtime. You don’t need to bundle them manually unless offline usage or specific versions are required.

Spark Streaming job configuration for Amazon MSK IAM authentication

In your Spark Streaming application, configure the Kafka source with SASL properties to enable IAM based authentication. The following code shows the relevant configuration:

topic_df = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers)
    .option("subscribe", topic_input)
    .option("startingOffsets", "earliest")
    .option("kafka.security.protocol","SASL_SSL")
    option("kafka.sasl.mechanism","AWS_MSK_IAM")
    .option("kafka.sasl.jaas.config","software.amazon.msk.auth.iam.IAMLoginModule required;")
    .option("kafka.sasl.client.callback.handler.class","software.amazon.msk.auth.iam.IAMClientCallbackHandler")
    .load()
    .selectExpr("CAST(value AS STRING)")
    )

Key properties include:

  • kafka.security.protocol = SASL_SSL – Enables encrypted communication over SSL with SASL authentication
  • kafka.sasl.mechanism = AWS_MSK_IAM – Tells Kafka to use the IAM based SASL mechanism
  • kafka.sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required; – Specifies the login module provided by AWS for IAM integration
  • kafka.sasl.client.callback.handler.class = software.amazon.msk.auth.iam.IAMClientCallbackHandler – Handles the actual signing and authentication using the IAM role

With these settings, Spark uses the IAM credentials attached to the EMR Serverless job execution role to authenticate to MSK Serverless without needing additional credentials, certificates, or secrets.

Data processing using an EMR Serverless streaming job with Amazon MSK IAM authentication

Complete the following steps to run a Spark Streaming job to process the data from MSK Serverless:

  1. Submit the Spark Streaming job to EMR Serverless using the AWS Command Line Interface (AWS CLI), which is already installed on the EC2 instance.
  2. Log in to the EC2 instance using Session Manager. Choose the instance msk-emr-serverless-blog and then choose Connect.
  3. Run the following command to submit the streaming job. Provide the parameters from the CloudFormation stack output.
sudo su - ec2-user

aws emr-serverless start-job-run \
--application-id <APPLICATION ID> \
--execution-role-arn <EXECUTION ROLE ARN> \
--mode 'STREAMING' \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<EMR BLOG SCRIPT BUCKET>/emr_pyspark_streaming_script/pysparkStreamingBlog.py",
"entryPointArguments":["--topic_input","sales_data_topic","--kafka_bootstrap_servers","<BOOTSTRAP URL WITH PORT>","--output_s3_path","s3://<EMR STREAMING OUTPUT BUCKET>/output/sales-order-data/","--checkpointLocation","s3://<EMR STREAMING OUTPUT BUCKET>/checkpointing/checkpoint-sales-order-data/","--database_name","emrblog","--table_name","sales_order_data"],
"sparkSubmitParameters": "--conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory --conf spark.executor.cores=2 --conf spark.executor.memory=5g --conf spark.driver.cores=2 --conf spark.driver.memory=5g --conf spark.executor.instances=5 --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,software.amazon.msk:aws-msk-iam-auth:2.2.0"
}}'
  1. After you submit the job, log in to EMR Studio using the URL in the EmrServerlessStudioURL value from the CloudFormation stack output.
  2. In the navigation pane, choose Applications under Serverless.
  3. Choose the application ID in the EmrServerlessSparkApplicationID value from the CloudFormation stack output.
  4. On the Streaming job runs tab, verify that the job has been submitted and wait for it to begin running.

Validate the data in Athena

After the EMR Serverless Spark Streaming job ran and created the table for the processed data in the Data Catalog, follow these steps to validate the data using Athena:

  1. On the Athena console, open the query editor.
  2. Choose the Data Catalog as the data source.
  3. Choose the database emrblog that the streaming job created.
  4. To validate the data, run the following query:
SELECT 
    DATE_TRUNC('minute', date) AS minute_window, 
    ROUND(SUM(total_amount), 2) AS total_amount
FROM 
    emrblog.sales_order_data
WHERE 
    DATE_TRUNC('day', date) = CURRENT_DATE
GROUP BY 
    DATE_TRUNC('minute', date)
ORDER BY 
    minute_window DESC;

Clean up

To clean up your resources, complete the following steps:

  1. Log in to EMR Studio using the URL from the EmrServerlessStudioURL value in the CloudFormation stack output.
  2. In the navigation pane, choose Applications under Serverless.
  3. Choose the application ID from the EmrServerlessSparkApplicationID value in the CloudFormation stack output.
  4. On the Streaming job runs tab, select the job that has been running and cancel the job run.
  5. On the AWS CloudFormation console, delete the CloudFormation stack vpc-msk-emr-serverless-studio.

Conclusion

In this post, we showcased a serverless pipeline for streaming data with IAM authentication, empowering you to focus on deriving insights from your analytics. You can customize the EMR Serverless Spark Streaming code to apply transformations and filters, so only valid data is loaded into Amazon S3. This solution combines the power of Amazon EMR Spark Serverless streaming with MSK Serverless, securely integrated through IAM authentication. Now you can streamline your streaming processes without the complexity of managing Amazon MSK and Amazon EMR Spark Streaming integrations.


About the Authors

Shubham Purwar is an AWS Analytics Specialist Solution Architect. He helps organizations unlock the full potential of their data by designing and implementing scalable, secure, and high-performance analytics solutions on the AWS platform. With deep expertise in AWS analytics services, he collaborates with customers to uncover their distinct business requirements and create customized solutions that deliver actionable insights and drive business growth. In his free time, Shubham loves to spend time with his family and travel around the world.

Nitin Kumar is a Cloud Engineer (ETL) at AWS, specialized in AWS Glue. With a decade of experience, he excels in aiding customers with their big data workloads, focusing on data processing and analytics. He is committed to helping customers overcome ETL challenges and develop scalable data processing and analytics pipelines on AWS. In his free time, he likes to watch movies and spend time with his family.

Prashanthi Chinthala is a Cloud Engineer (DIST) at AWS. She helps customers overcome EMR challenges and develop scalable data processing and analytics pipelines on AWS.