Tag Archives: Amazon EMR

Introducing Amazon EMR on EKS job submission with Spark Operator and spark-submit

Post Syndicated from Lotfi Mouhib original https://aws.amazon.com/blogs/big-data/introducing-amazon-emr-on-eks-job-submission-with-spark-operator-and-spark-submit/

Amazon EMR on EKS provides a deployment option for Amazon EMR that allows organizations to run open-source big data frameworks on Amazon Elastic Kubernetes Service (Amazon EKS). With EMR on EKS, Spark applications run on the Amazon EMR runtime for Apache Spark. This performance-optimized runtime offered by Amazon EMR makes your Spark jobs run fast and cost-effectively. The EMR runtime provides up to 5.37 times better performance and 76.8% cost savings, when compared to using open-source Apache Spark on Amazon EKS.

Building on the success of Amazon EMR on EKS, customers have been running and managing jobs using the emr-containers API, creating EMR virtual clusters, and submitting jobs to the EKS cluster, either through the AWS Command Line Interface (AWS CLI) or Apache Airflow scheduler. However, other customers running Spark applications have chosen Spark Operator or native spark-submit to define and run Apache Spark jobs on Amazon EKS, but without taking advantage of the performance gains from running Spark on the optimized EMR runtime. In response to this need, starting from EMR 6.10, we have introduced a new feature that lets you use the optimized EMR runtime while submitting and managing Spark jobs through either Spark Operator or spark-submit. This means that anyone running Spark workloads on EKS can take advantage of EMR’s optimized runtime.

In this post, we walk through the process of setting up and running Spark jobs using both Spark Operator and spark-submit, integrated with the EMR runtime feature. We provide step-by-step instructions to assist you in setting up the infrastructure and submitting a job with both methods. Additionally, you can use the Data on EKS blueprint to deploy the entire infrastructure using Terraform templates.

Infrastructure overview

In this post, we walk through the process of deploying a comprehensive solution using eksctl, Helm, and AWS CLI. Our deployment includes the following resources:

  • A VPC, EKS cluster, and managed node group, set up with the eksctl tool
  • Essential Amazon EKS managed add-ons, such as the VPC CNI, CoreDNS, and KubeProxy set up with the eksctl tool
  • Cluster Autoscaler and Spark Operator add-ons, set up using Helm
  • A Spark job execution AWS Identity and Access Management (IAM) role, IAM policy for Amazon Simple Storage Service (Amazon S3) bucket access, service account, and role-based access control, set up using the AWS CLI and eksctl


Verify that the following prerequisites are installed on your machine:

Set up AWS credentials

Before proceeding to the next step and running the eksctl command, you need to set up your local AWS credentials profile. For instructions, refer to Configuration and credential file settings.

Deploy the VPC, EKS cluster, and managed add-ons

The following configuration uses us-west-1 as the default Region. To run in a different Region, update the region and availabilityZones fields accordingly. Also, verify that the same Region is used in the subsequent steps throughout the post.

Enter the following code snippet into the terminal where your AWS credentials are set up. Make sure to update the publicAccessCIDRs field with your IP before you run the command below. This will create a file named eks-cluster.yaml:

cat <<EOF >eks-cluster.yaml
apiVersion: eksctl.io/v1alpha5
kind: ClusterConfig
  name: emr-spark-operator
  region: us-west-1 # replace with your region
  version: "1.25"
    publicAccess: true
    privateAccess: true
  publicAccessCIDRs: ["YOUR-IP/32"]
availabilityZones: ["us-west-1a","us-west-1b"] # replace with your region
  withOIDC: true
  - metadata:
      name: cluster-autoscaler
      namespace: kube-system
      autoScaler: true
    roleName: eksctl-cluster-autoscaler-role
  - name: m5x
    instanceType: m5.xlarge
    availabilityZones: ["us-west-1a"]
    volumeSize: 100
    volumeType: gp3
    minSize: 2
    desiredCapacity: 2
    maxSize: 10
      k8s.io/cluster-autoscaler/enabled: "true"
      k8s.io/cluster-autoscaler/eks-nvme: "owned" 
  - name: vpc-cni
    version: latest
  - name: coredns
    version: latest
  - name: kube-proxy
    version: latest
    enableTypes: ["*"]

Use the following command to create the EKS cluster : eksctl create cluster -f eks-cluster.yaml

Deploy Cluster Autoscaler

Cluster Autoscaler is crucial for automatically adjusting the size of your Kubernetes cluster based on the current resource demands, optimizing resource utilization and cost. Create an autoscaler-helm-values.yaml file and install the Cluster Autoscaler using Helm:

cat <<EOF >autoscaler-helm-values.yaml
    clusterName: emr-spark-operator
      - k8s.io/cluster-autoscaler/enabled
      - k8s.io/cluster-autoscaler/{{ .Values.autoDiscovery.clusterName }}
awsRegion: us-west-1 # Make sure the region same as the EKS Cluster
    create: false
    name: cluster-autoscaler
helm repo add autoscaler https://kubernetes.github.io/autoscaler
helm install nodescaler autoscaler/cluster-autoscaler \
--namespace kube-system \
--values autoscaler-helm-values.yaml --debug

You can also set up Karpenter as a cluster autoscaler to automatically launch the right compute resources to handle your EKS cluster’s applications. You can follow this blog on how to setup and configure Karpenter.

Deploy Spark Operator

Spark Operator is an open-source Kubernetes operator specifically designed to manage and monitor Spark applications running on Kubernetes. It streamlines the process of deploying and managing Spark jobs, by providing a Kubernetes custom resource to define, configure and run Spark applications, as well as manage the job life cycle through Kubernetes API. Some customers prefer using Spark Operator to manage Spark jobs because it enables them to manage Spark applications just like other Kubernetes resources.

Currently, customers are building their open-source Spark images and using S3a committers as part of job submissions with Spark Operator or spark-submit. However, with the new job submission option, you can now benefit from the EMR runtime in conjunction with EMRFS. Starting with Amazon EMR 6.10 and for each upcoming version of the EMR runtime, we will release the Spark Operator and its Helm chart to use the EMR runtime.

In this section, we show you how to deploy a Spark Operator Helm chart from an Amazon Elastic Container Registry (Amazon ECR) repository and submit jobs using EMR runtime images, benefiting from the performance enhancements provided by the EMR runtime.

Install Spark Operator with Helm from Amazon ECR

The Spark Operator Helm chart is stored in an ECR repository. To install the Spark Operator, you first need to authenticate your Helm client with the ECR repository. The charts are stored under the following path: ECR_URI/spark-operator.

Authenticate your Helm client and install the Spark Operator:

aws ecr get-login-password \
--region us-west-1 | helm registry login \
--username AWS \
--password-stdin 608033475327.dkr.ecr.us-west-1.amazonaws.com

You can authenticate to other EMR on EKS supported Regions by obtaining the AWS account ID for the corresponding Region. For more information, refer to how to select a base image URI.

Install Spark Operator

You can now install Spark Operator using the following command:

helm install spark-operator-demo \
oci://608033475327.dkr.ecr.us-west-1.amazonaws.com/spark-operator \
--set emrContainers.awsRegion=us-west-1 \
--version 1.1.26-amzn-0 \
--set serviceAccounts.spark.create=false \
--namespace spark-operator \

To verify that the operator has been installed correctly, run the following command:

helm list --namespace spark-operator -o yaml

Set up the Spark job execution role and service account

In this step, we create a Spark job execution IAM role and a service account, which will be used in Spark Operator and spark-submit job submission examples.

First, we create an IAM policy that will be used by the IAM Roles for Service Accounts (IRSA). This policy enables the driver and executor pods to access the AWS services specified in the policy. Complete the following steps:

  1. As a prerequisite, either create an S3 bucket (aws s3api create-bucket --bucket <ENTER-S3-BUCKET> --create-bucket-configuration LocationConstraint=us-west-1 --region us-west-1) or use an existing S3 bucket. Replace <ENTER-S3-BUCKET> in the following code with the bucket name.
  2. Create a policy file that allows read and write access to an S3 bucket:
    cat >job-execution-policy.json <<EOL
        "Version": "2012-10-17",
        "Statement": [
                "Effect": "Allow",
                "Action": [
                "Resource": [

  3. Create the IAM policy with the following command:
    aws iam create-policy --policy-name emr-job-execution-policy --policy-document file://job-execution-policy.json

  4. Next, create the service account named emr-job-execution-sa-role as well as the IAM roles. The following eksctl command creates a service account scoped to the namespace and service account defined to be used by the executor and driver. Make sure to replace <ENTER_YOUR_ACCOUNT_ID> with your account ID before running the command:
    eksctl create iamserviceaccount \
    --cluster=emr-spark-operator \
    --region us-west-1 \
    --name=emr-job-execution-sa \
    --attach-policy-arn=arn:aws:iam::<ENTER_YOUR_ACCOUNT_ID>:policy/emr-job-execution-policy \
    --role-name=emr-job-execution-irsa \
    --namespace=data-team-a \

  5. Create an S3 bucket policy to allow only the execution role create in step 4 to write and read from the S3 bucket create in step 1. Make sure to replace <ENTER_YOUR_ACCOUNT_ID> with your account ID before running the command:
    cat > bucketpolicy.json<<EOL
        "Version": "2012-10-17",
        "Statement": [
                "Effect": "Allow",
                "Action": [
                ], "Principal": {
                    "AWS": "arn:aws:iam::<ENTER_YOUR_ACCOUNT_ID>:role/emr-job-execution-irsa"
                "Resource": [
    aws s3api put-bucket-policy --bucket ENTER-S3-BUCKET-NAME --policy file://bucketpolicy.json

  6. Create a Kubernetes role and role binding required for the service account used in the Spark job run:
    cat <<EOF >emr-job-execution-rbac.yaml
    apiVersion: rbac.authorization.k8s.io/v1
    kind: Role
      name: emr-job-execution-sa-role
      namespace: data-team-a
      - apiGroups: ["", "batch","extensions"]
        resources: ["configmaps","serviceaccounts","events","pods","pods/exec","pods/log","pods/portforward","secrets","services","persistentvolumeclaims"]
        verbs: ["create","delete","get","list","patch","update","watch"]
    apiVersion: rbac.authorization.k8s.io/v1
    kind: RoleBinding
      name: emr-job-execution-sa-rb
      namespace: data-team-a
      apiGroup: rbac.authorization.k8s.io
      kind: Role
      name: emr-job-execution-sa-role
      - kind: ServiceAccount
        name: emr-job-execution-sa
        namespace: data-team-a

  7. Apply the Kubernetes role and role binding definition with the following command:
kubectl apply -f emr-job-execution-rbac.yaml

So far, we have completed the infrastructure setup, including the Spark job execution roles. In the following steps, we run sample Spark jobs using both Spark Operator and spark-submit with the EMR runtime.

Configure the Spark Operator job with the EMR runtime

In this section, we present a sample Spark job that reads data from public datasets stored in S3 buckets, processes them, and writes the results to your own S3 bucket. Make sure that you update the S3 bucket in the following configuration by replacing <ENTER_S3_BUCKET> with the URI to your own S3 bucket refered in step 2 of the “Set up the Spark job execution role and service account section. Also, note that we are using data-team-a as a namespace and emr-job-execution-sa as a service account, which we created in the previous step. These are necessary to run the Spark job pods in the dedicated namespace, and the IAM role linked to the service account is used to access the S3 bucket for reading and writing data.

Most importantly, notice the image field with the EMR optimized runtime Docker image, which is currently set to emr-6.10.0. You can change this to a newer version when it’s released by the Amazon EMR team. Also, when configuring your jobs, make sure that you include the sparkConf and hadoopConf settings as defined in the following manifest. These configurations enable you to benefit from EMR runtime performance, AWS Glue Data Catalog integration, and the EMRFS optimized connector.

  1. Create the file (emr-spark-operator-example.yaml) locally and update the S3 bucket location so that you can submit the job as part of the next step:
    cat <<EOF >emr-spark-operator-example.yaml
    apiVersion: "sparkoperator.k8s.io/v1beta2"
    kind: SparkApplication
      name: taxi-example
      namespace: data-team-a
      type: Scala
      mode: cluster
      # EMR optimized runtime image
      image: "483788554619.dkr.ecr.eu-west-1.amazonaws.com/spark/emr-6.10.0:latest"
      imagePullPolicy: Always
      mainClass: ValueZones
      mainApplicationFile: s3://aws-data-lake-workshop/spark-eks/spark-eks-assembly-3.3.0.jar
        - s3://nyc-tlc/csv_backup
        - "2017"
        - s3://nyc-tlc/misc/taxi _zone_lookup.csv
        - s3://<ENTER_S3_BUCKET>/emr-eks-results
        - emr_eks_demo
        # EMRFS filesystem config
        fs.s3.customAWSCredentialsProvider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider
        fs.s3.impl: com.amazon.ws.emr.hadoop.fs.EmrFileSystem
        fs.AbstractFileSystem.s3.impl: org.apache.hadoop.fs.s3.EMRFSDelegate
        fs.s3.buffer.dir: /mnt/s3
        fs.s3.getObject.initialSocketTimeoutMilliseconds: "2000"
        mapreduce.fileoutputcommitter.algorithm.version.emr_internal_use_only.EmrFileSystem: "2"
        mapreduce.fileoutputcommitter.cleanup-failures.ignored.emr_internal_use_only.EmrFileSystem: "true"
        spark.eventLog.enabled: "true"
        spark.eventLog.dir: "s3://<ENTER_S3_BUCKET>/"
        spark.kubernetes.driver.pod.name: driver-nyc-taxi-etl
        # Required for EMR Runtime and Glue Catalogue
        spark.driver.extraClassPath: /usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/home/hadoop/extrajars/*
        spark.driver.extraLibraryPath: /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native
        spark.executor.extraClassPath: /usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/home/hadoop/extrajars/*
        spark.executor.extraLibraryPath: /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native
        # EMRFS commiter
        spark.sql.parquet.output.committer.class: com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
        spark.sql.parquet.fs.optimized.committer.optimization-enabled: "true"
        spark.sql.emr.internal.extensions: com.amazonaws.emr.spark.EmrSparkSessionExtensions
        spark.executor.defaultJavaOptions: -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseParallelGC -XX:InitiatingHeapOccupancyPercent=70 -XX:OnOutOfMemoryError='kill -9 %p'
        spark.driver.defaultJavaOptions:  -XX:OnOutOfMemoryError='kill -9 %p' -XX:+UseParallelGC -XX:InitiatingHeapOccupancyPercent=70
      sparkVersion: "3.3.1"
        type: Never
        cores: 1
        memory: "4g"
        serviceAccount: emr-job-execution-sa
        cores: 1
        instances: 4
        memory: "4g"
        serviceAccount: emr-job-execution-sa

  2. Run the following command to submit the job to the EKS cluster:
    kubectl apply -f emr-spark-operator-example.yaml

    The job may take 4–5 minutes to complete, and you can verify the successful message in the driver pod logs.

  3. Verify the job by running the following command:
kubectl get pods -n data-team-a

Enable access to the Spark UI

The Spark UI is an important tool for data engineers because it allows you to track the progress of tasks, view detailed job and stage information, and analyze resource utilization to identify bottlenecks and optimize your code. For Spark jobs running on Kubernetes, the Spark UI is hosted on the driver pod and its access is restricted to the internal network of Kubernetes. To access it, we need to forward the traffic to the pod with kubectl. The following steps take you through how to set it up.

Run the following command to forward traffic to the driver pod:

kubectl port-forward <driver-pod-name> 4040:4040

You should see text similar to the following:

Forwarding from -> 4040
Forwarding from [::1]:4040 → 4040

If you didn’t specify the driver pod name at the submission of the SparkApplication, you can get it with the following command:

kubectl get pods -l spark-role=driver,spark-app-name=<your-spark-app-name> -o jsonpath='{.items[0].metadata.name}'

Open a browser and enter http://localhost:4040 in the address bar. You should be able to connect to the Spark UI.


Spark History Server

If you want to explore your job after its run, you can view it through the Spark History Server. The preceding SparkApplication definition has the event log enabled and stores the events in an S3 bucket with the following path: s3://YOUR-S3-BUCKET/. For instructions on setting up the Spark History Server and exploring the logs, refer to Launching the Spark history server and viewing the Spark UI using Docker.


spark-submit is a command line interface for running Apache Spark applications on a cluster or locally. It allows you to submit applications to Spark clusters. The tool enables simple configuration of application properties, resource allocation, and custom libraries, streamlining the deployment and management of Spark jobs.

Beginning with Amazon EMR 6.10, spark-submit is supported as a job submission method. This method currently only supports cluster mode as the submission mechanism. To submit jobs using the spark-submit method, we reuse the IAM role for the service account we set up earlier. We also use the S3 bucket used for the Spark Operator method. The steps in this section take you through how to configure and submit jobs with spark-submit and benefit from EMR runtime improvements.

  1. In order to submit a job, you need to use the Spark version that matches the one available in Amazon EMR. For Amazon EMR 6.10, you need to download the Spark 3.3 version.
  2. You also need to make sure you have Java installed in your environment.
  3. Unzip the file and navigate to the root of the Spark directory.
  4. In the following code, replace the EKS endpoint as well as the S3 bucket then run the script:
./bin/spark-submit \
--class ValueZones \
--master k8s://EKS-ENDPOINT \
--conf spark.kubernetes.namespace=data-team-a \
--conf spark.kubernetes.container.image=608033475327.dkr.ecr.us-west-1.amazonaws.com/spark/emr-6.10.0:latest \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=emr-job-execution-sa \
--conf spark.kubernetes.authenticate.executor.serviceAccountName=emr-job-execution-sa \
--conf spark.driver.extraClassPath="/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/home/hadoop/extrajars/*" \
--conf spark.driver.extraLibraryPath="/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native" \
--conf spark.executor.extraClassPath="/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/home/hadoop/extrajars/*" \
--conf spark.executor.extraLibraryPath="/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native" \
--conf spark.hadoop.fs.s3.customAWSCredentialsProvider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider \
--conf spark.hadoop.fs.s3.impl=com.amazon.ws.emr.hadoop.fs.EmrFileSystem \
--conf spark.hadoop.fs.AbstractFileSystem.s3.impl=org.apache.hadoop.fs.s3.EMRFSDelegate \
--conf spark.hadoop.fs.s3.buffer.dir=/mnt/s3 \
--conf spark.hadoop.fs.s3n.impl=com.amazon.ws.emr.hadoop.fs.EmrFileSystem \
--deploy-mode cluster \
s3://aws-data-lake-workshop/spark-eks/spark-eks-assembly-3.3.0.jar s3://nyc-tlc/csv_backup 2017 s3://nyc-tlc/misc/taxi_zone_lookup.csv s3://S3_BUCKET/emr-eks-results emr_eks_demo

The job takes about 7 minutes to complete with two executors of one core and 1 G of memory.

Using custom kubernetes schedulers

Customers running a large volume of jobs concurrently might face challenges related to providing fair access to compute capacity that they aren’t able to solve with the standard scheduling and resource utilization management Kubernetes offers. In addition, customers that are migrating from Amazon EMR on Amazon Elastic Compute Cloud (Amazon EC2) and are managing their scheduling with YARN queues will not be able to transpose them to Kubernetes scheduling capabilities.

To overcome this issue, you can use custom schedulers like Apache Yunikorn or Volcano.Spark Operator natively supports these schedulers, and with them you can schedule Spark applications based on factors such as priority, resource requirements, and fairness policies, while Spark Operator simplifies application deployment and management. To set up Yunikorn with gang scheduling and use it in Spark applications submitted through Spark Operator, refer to Spark Operator with YuniKorn.

Clean up

To avoid unwanted charges to your AWS account, delete all the AWS resources created during this deployment:

eksctl delete cluster -f eks-cluster.yaml


In this post, we introduced the EMR runtime feature for Spark Operator and spark-submit, and explored the advantages of using this feature on an EKS cluster. With the optimized EMR runtime, you can significantly enhance the performance of your Spark applications while optimizing costs. We demonstrated the deployment of the cluster using the eksctl tool, , you can also use the Data on EKS blueprints for deploying a production-ready EKS which you can use for EMR on EKS and leverage these new deployment methods in addition to the EMR on EKS API job submission method. By running your applications on the optimized EMR runtime, you can further enhance your Spark application workflows and drive innovation in your data processing pipelines.

About the Authors

Lotfi Mouhib is a Senior Solutions Architect working for the Public Sector team with Amazon Web Services. He helps public sector customers across EMEA realize their ideas, build new services, and innovate for citizens. In his spare time, Lotfi enjoys cycling and running.

Vara Bonthu is a dedicated technology professional and Worldwide Tech Leader for Data on EKS, specializing in assisting AWS customers ranging from strategic accounts to diverse organizations. He is passionate about open-source technologies, data analytics, AI/ML, and Kubernetes, and boasts an extensive background in development, DevOps, and architecture. Vara’s primary focus is on building highly scalable data and AI/ML solutions on Kubernetes platforms, helping customers harness the full potential of cutting-edge technology for their data-driven pursuits.

Improve operational efficiencies of Apache Iceberg tables built on Amazon S3 data lakes

Post Syndicated from Avijit Goswami original https://aws.amazon.com/blogs/big-data/improve-operational-efficiencies-of-apache-iceberg-tables-built-on-amazon-s3-data-lakes/

Apache Iceberg is an open table format for large datasets in Amazon Simple Storage Service (Amazon S3) and provides fast query performance over large tables, atomic commits, concurrent writes, and SQL-compatible table evolution. When you build your transactional data lake using Apache Iceberg to solve your functional use cases, you need to focus on operational use cases for your S3 data lake to optimize the production environment. Some of the important non-functional use cases for an S3 data lake that organizations are focusing on include storage cost optimizations, capabilities for disaster recovery and business continuity, cross-account and multi-Region access to the data lake, and handling increased Amazon S3 request rates.

In this post, we show you how to improve operational efficiencies of your Apache Iceberg tables built on Amazon S3 data lake and Amazon EMR big data platform.

Optimize data lake storage

One of the major advantages of building modern data lakes on Amazon S3 is it offers lower cost without compromising on performance. You can use Amazon S3 Lifecycle configurations and Amazon S3 object tagging with Apache Iceberg tables to optimize the cost of your overall data lake storage. An Amazon S3 Lifecycle configuration is a set of rules that define actions that Amazon S3 applies to a group of objects. There are two types of actions:

  • Transition actions – These actions define when objects transition to another storage class; for example, Amazon S3 Standard to Amazon S3 Glacier.
  • Expiration actions – These actions define when objects expire. Amazon S3 deletes expired objects on your behalf.

Amazon S3 uses object tagging to categorize storage where each tag is a key-value pair. From an Apache Iceberg perspective, it supports custom Amazon S3 object tags that can be added to S3 objects while writing and deleting into the table. Iceberg also let you configure a tag-based object lifecycle policy at the bucket level to transition objects to different Amazon S3 tiers. With the s3.delete.tags config property in Iceberg, objects are tagged with the configured key-value pairs before deletion. When the catalog property s3.delete-enabled is set to false, the objects are not hard-deleted from Amazon S3. This is expected to be used in combination with Amazon S3 delete tagging, so objects are tagged and removed using an Amazon S3 lifecycle policy. This property is set to true by default.

The example notebook in this post shows an example implementation of S3 object tagging and lifecycle rules for Apache Iceberg tables to optimize storage cost.

Implement business continuity

Amazon S3 gives any developer access to the same highly scalable, reliable, fast, inexpensive data storage infrastructure that Amazon uses to run its own global network of web sites. Amazon S3 is designed for 99.999999999% (11 9’s) of durability, S3 Standard is designed for 99.99% availability, and Standard – IA is designed for 99.9% availability. Still, to make your data lake workloads highly available in an unlikely outage situation, you can replicate your S3 data to another AWS Region as a backup. With S3 data residing in multiple Regions, you can use an S3 multi-Region access point as a solution to access the data from the backup Region. With Amazon S3 multi-Region access point failover controls, you can route all S3 data request traffic through a single global endpoint and directly control the shift of S3 data request traffic between Regions at any time. During a planned or unplanned regional traffic disruption, failover controls let you control failover between buckets in different Regions and accounts within minutes. Apache Iceberg supports access points to perform S3 operations by specifying a mapping of bucket to access points. We include an example implementation of an S3 access point with Apache Iceberg later in this post.

Increase Amazon S3 performance and throughput

Amazon S3 supports a request rate of 3,500 PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per prefix in a bucket. The resources for this request rate aren’t automatically assigned when a prefix is created. Instead, as the request rate for a prefix increases gradually, Amazon S3 automatically scales to handle the increased request rate. For certain workloads that need a sudden increase in the request rate for objects in a prefix, Amazon S3 might return 503 Slow Down errors, also known as S3 throttling. It does this while it scales in the background to handle the increased request rate. Also, if supported request rates are exceeded, it’s a best practice to distribute objects and requests across multiple prefixes. Implementing this solution to distribute objects and requests across multiple prefixes involves changes to your data ingress or data egress applications. Using Apache Iceberg file format for your S3 data lake can significantly reduce the engineering effort through enabling the ObjectStoreLocationProvider feature, which adds an S3 hash [0*7FFFFF] prefix in your specified S3 object path.

Iceberg by default uses the Hive storage layout, but you can switch it to use the ObjectStoreLocationProvider. This option is not enabled by default to provide flexibility to choose the location where you want to add the hash prefix. With ObjectStoreLocationProvider, a deterministic hash is generated for each stored file and a subfolder is appended right after the S3 folder specified using the parameter write.data.path (write.object-storage-path for Iceberg version 0.12 and below). This ensures that files written to Amazon S3 are equally distributed across multiple prefixes in your S3 bucket, thereby minimizing the throttling errors. In the following example, we set the write.data.path value as s3://my-table-data-bucket, and Iceberg-generated S3 hash prefixes will be appended after this location:

CREATE TABLE my_catalog.my_ns.my_table
( id bigint,
data string,
category string)
( 'write.object-storage.enabled'=true,
PARTITIONED BY (category);

Your S3 files will be arranged under MURMUR3 S3 hash prefixes like the following:

2021-11-01 05:39:24 809.4 KiB 7ffbc860/my_ns/my_table/00328-1642-5ce681a7-dfe3-4751-ab10-37d7e58de08a-00015.parquet
2021-11-01 06:00:10 6.1 MiB 7ffc1730/my_ns/my_table/00460-2631-983d19bf-6c1b-452c-8195-47e450dfad9d-00001.parquet
2021-11-01 04:33:24 6.1 MiB 7ffeeb4e/my_ns/my_table/00156-781-9dbe3f08-0a1d-4733-bd90-9839a7ceda00-00002.parquet

Using Iceberg ObjectStoreLocationProvider is not a foolproof mechanism to avoid S3 503 errors. You still need to set appropriate EMRFS retries to provide additional resiliency. You can adjust your retry strategy by increasing the maximum retry limit for the default exponential backoff retry strategy or enabling and configuring the additive-increase/multiplicative-decrease (AIMD) retry strategy. AIMD is supported for Amazon EMR releases 6.4.0 and later. For more information, refer to Retry Amazon S3 requests with EMRFS.

In the following sections, we provide examples for these use cases.

Storage cost optimizations

In this example, we use Iceberg’s S3 tags feature with the write tag as write-tag-name=created and delete tag as delete-tag-name=deleted. This example is demonstrated on an EMR version emr-6.10.0 cluster with installed applications Hadoop 3.3.3, Jupyter Enterprise Gateway 2.6.0, and Spark 3.3.1. The examples are run on a Jupyter Notebook environment attached to the EMR cluster. To learn more about how to create an EMR cluster with Iceberg and use Amazon EMR Studio, refer to Use an Iceberg cluster with Spark and the Amazon EMR Studio Management Guide, respectively.

The following examples are also available in the sample notebook in the aws-samples GitHub repo for quick experimentation.

Configure Iceberg on a Spark session

Configure your Spark session using the %%configure magic command. You can use either the AWS Glue Data Catalog (recommended) or a Hive catalog for Iceberg tables. In this example, we use a Hive catalog, but we can change to the Data Catalog with the following configuration:


Before you run this step, create a S3 bucket and an iceberg folder in your AWS account with the naming convention <your-iceberg-storage-blog>/iceberg/.

Update your-iceberg-storage-blog in the following configuration with the bucket that you created to test this example. Note the configuration parameters s3.write.tags.write-tag-name and s3.delete.tags.delete-tag-name, which will tag the new S3 objects and deleted objects with corresponding tag values. We use these tags in later steps to implement S3 lifecycle policies to transition the objects to a lower-cost storage tier or expire them based on the use case.

%%configure -f { "conf":{ "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", "spark.sql.catalog.dev":"org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.dev.catalog-impl":"org.apache.iceberg.hive.HiveCatalog", "spark.sql.catalog.dev.io-impl":"org.apache.iceberg.aws.s3.S3FileIO", "spark.sql.catalog.dev.warehouse":"s3://&amp;amp;lt;your-iceberg-storage-blog&amp;amp;gt;/iceberg/", "spark.sql.catalog.dev.s3.write.tags.write-tag-name":"created", "spark.sql.catalog.dev.s3.delete.tags.delete-tag-name":"deleted", "spark.sql.catalog.dev.s3.delete-enabled":"false" } }

Create an Apache Iceberg table using Spark-SQL

Now we create an Iceberg table for the Amazon Product Reviews Dataset:

spark.sql(""" DROP TABLE if exists dev.db.amazon_reviews_iceberg""")
spark.sql(""" CREATE TABLE dev.db.amazon_reviews_iceberg (
marketplace string,
customer_id string,
review_id string,
product_id string,
product_parent string,
product_title string,
star_rating int,
helpful_votes int,
total_votes int,
vine string,
verified_purchase string,
review_headline string,
review_body string,
review_date date,
year int)
USING iceberg
location 's3://<your-iceberg-storage-blog>/iceberg/db/amazon_reviews_iceberg'
PARTITIONED BY (years(review_date))""")

In the next step, we load the table with the dataset using Spark actions.

Load data into the Iceberg table

While inserting the data, we partition the data by review_date as per the table definition. Run the following Spark commands in your PySpark notebook:

df = spark.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Electronics/*.parquet")


Insert a single record into the same Iceberg table so that it creates a partition with the current review_date:

spark.sql("""insert into dev.db.amazon_reviews_iceberg values ("US", "99999999","R2RX7KLOQQ5VBG","B00000JBAT","738692522","Diamond Rio Digital",3,0,0,"N","N","Why just 30 minutes?","RIO is really great",date("2023-04-06"),2023)""")

You can check the new snapshot is created after this append operation by querying the Iceberg snapshot:

spark.sql("""SELECT * FROM dev.db.amazon_reviews_iceberg.snapshots""").show()

You will see an output similar to the following showing the operations performed on the table.

Check the S3 tag population

You can use the AWS Command Line Interface (AWS CLI) or the AWS Management Console to check the tags populated for the new writes. Let’s check the tag corresponding to the object created by a single row insert.

On the Amazon S3 console, check the S3 folder s3://your-iceberg-storage-blog/iceberg/db/amazon_reviews_iceberg/data/ and point to the partition review_date_year=2023/. Then check the Parquet file under this folder to check the tags associated with the data file in Parquet format.

From the AWS CLI, run the following command to see that the tag is created based on the Spark configuration spark.sql.catalog.dev.s3.write.tags.write-tag-name":"created":

[email protected] ~ % aws s3api get-object-tagging --bucket your-iceberg-storage-blog --key iceberg/db/amazon_reviews_iceberg/data/review_date_year=2023/00000-43-2fb892e3-0a3f-4821-a356-83204a69fa74-00001.parquet

You will see an output, similar to the below, showing the associated tags for the file

{ "VersionId": "null", "TagSet": [{ "Key": "write-tag-name", "Value": "created" } ] }

Delete a record and expire a snapshot

In this step, we delete a record from the Iceberg table and expire the snapshot corresponding to the deleted record. We delete the new single record that we inserted with the current review_date:

spark.sql("""delete from dev.db.amazon_reviews_iceberg where review_date = '2023-04-06'""")

We can now check that a new snapshot was created with the operation flagged as delete:

spark.sql("""SELECT * FROM dev.db.amazon_reviews_iceberg.snapshots""").show()

This is useful if we want to time travel and check the deleted row in the future. In that case, we have to query the table with the snapshot-id corresponding to the deleted row. However, we don’t discuss time travel as part of this post.

We expire the old snapshots from the table and keep only the last two. You can modify the query based on your specific requirements to retain the snapshots:

spark.sql ("""CALL dev.system.expire_snapshots(table => 'dev.db.amazon_reviews_iceberg', older_than => DATE '2024-01-01', retain_last => 2)""")

If we run the same query on the snapshots, we can see that we have only two snapshots available:

spark.sql("""SELECT * FROM dev.db.amazon_reviews_iceberg.snapshots""").show()

From the AWS CLI, you can run the following command to see that the tag is created based on the Spark configuration spark.sql.catalog.dev.s3. delete.tags.delete-tag-name":"deleted":

[email protected] ~ % aws s3api get-object-tagging --bucket avijit-iceberg-storage-blog --key iceberg/db/amazon_reviews_iceberg/data/review_date_year=2023/00000-43-2fb892e3-0a3f-4821-a356-83204a69fa74-00001.parquet

You will see output similar to below showing the associated tags for the file

{ "VersionId": "null", "TagSet": [ { "Key": "delete-tag-name", "Value": "deleted" }, { "Key": "write-tag-name", "Value": "created" } ] }

You can view the existing metadata files from the metadata log entries metatable after the expiration of snapshots:

spark.sql("""SELECT * FROM dev.db.amazon_reviews_iceberg.metadata_log_entries""").show()

The snapshots that have expired show the latest snapshot ID as null.

Create S3 lifecycle rules to transition the buckets to a different storage tier

Create a lifecycle configuration for the bucket to transition objects with the delete-tag-name=deleted S3 tag to the Glacier Instant Retrieval class. Amazon S3 runs lifecycle rules one time every day at midnight Universal Coordinated Time (UTC), and new lifecycle rules can take up to 48 hours to complete the first run. Amazon S3 Glacier is well suited to archive data that needs immediate access (with milliseconds retrieval). With S3 Glacier Instant Retrieval, you can save up to 68% on storage costs compared to using the S3 Standard-Infrequent Access (S3 Standard-IA) storage class, when the data is accessed once per quarter.

When you want to access the data back, you can bulk restore the archived objects. After you restore the objects back in S3 Standard class, you can register the metadata and data as an archival table for query purposes. The metadata file location can be fetched from the metadata log entries metatable as illustrated earlier. As mentioned before, the latest snapshot ID with Null values indicates expired snapshots. We can take one of the expired snapshots and do the bulk restore:

spark.sql("""CALL dev.system.register_table(table => 'db.amazon_reviews_iceberg_archive', metadata_file => 's3://avijit-iceberg-storage-blog/iceberg/db/amazon_reviews_iceberg/metadata/00000-a010f15c-7ac8-4cd1-b1bc-bba99fa7acfc.metadata.json')""").show()

Capabilities for disaster recovery and business continuity, cross-account and multi-Region access to the data lake

Because Iceberg doesn’t support relative paths, you can use access points to perform Amazon S3 operations by specifying a mapping of buckets to access points. This is useful for multi-Region access, cross-Region access, disaster recovery, and more.

For cross-Region access points, we need to additionally set the use-arn-region-enabled catalog property to true to enable S3FileIO to make cross-Region calls. If an Amazon S3 resource ARN is passed in as the target of an Amazon S3 operation that has a different Region than the one the client was configured with, this flag must be set to ‘true‘ to permit the client to make a cross-Region call to the Region specified in the ARN, otherwise an exception will be thrown. However, for the same or multi-Region access points, the use-arn-region-enabled flag should be set to ‘false’.

For example, to use an S3 access point with multi-Region access in Spark 3.3, you can start the Spark SQL shell with the following code:

spark-sql --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket2/my/key/prefix \
--conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
--conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.my_catalog.s3.use-arn-region-enabled=false \
--conf spark.sql.catalog.test.s3.access-points.my-bucket1=arn:aws:s3::123456789012:accesspoint:mfzwi23gnjvgw.mrap \
--conf spark.sql.catalog.test.s3.access-points.my-bucket2=arn:aws:s3::123456789012:accesspoint:mfzwi23gnjvgw.mrap

In this example, the objects in Amazon S3 on my-bucket1 and my-bucket2 buckets use the arn:aws:s3::123456789012:accesspoint:mfzwi23gnjvgw.mrap access point for all Amazon S3 operations.

For more details on using access points, refer to Using access points with compatible Amazon S3 operations.

Let’s say your table path is under mybucket1, so both mybucket1 in Region 1 and mybucket2 in Region have paths of mybucket1 inside the metadata files. At the time of the S3 (GET/PUT) call, we replace the mybucket1 reference with a multi-Region access point.

Handling increased S3 request rates

When using ObjectStoreLocationProvider (for more details, see Object Store File Layout), a deterministic hash is generated for each stored file, with the hash appended directly after the write.data.path. The problem with this is that the default hashing algorithm generates hash values up to Integer MAX_VALUE, which in Java is (2^31)-1. When this is converted to hex, it produces 0x7FFFFFFF, so the first character variance is restricted to only [0-8]. As per Amazon S3 recommendations, we should have the maximum variance here to mitigate this.

Starting from Amazon EMR 6.10, Amazon EMR added an optimized location provider that makes sure the generated prefix hash has uniform distribution in the first two characters using the character set from [0-9][A-Z][a-z].

This location provider has been recently open sourced by Amazon EMR via Core: Improve bit density in object storage layout and should be available starting from Iceberg 1.3.0.

To use, make sure the iceberg.enabled classification is set to true, and write.location-provider.impl is set to org.apache.iceberg.emr.OptimizedS3LocationProvider.

The following is a sample Spark shell command:

spark-shell --conf spark.driver.memory=4g \
--conf spark.executor.cores=4 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket/iceberg-V516168123 \
--conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
--conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.my_catalog.table-override.write.location-provider.impl=org.apache.iceberg.emr.OptimizedS3LocationProvider

The following example shows that when you enable the object storage in your Iceberg table, it adds the hash prefix in your S3 path directly after the location you provide in your DDL.

Define the table write.object-storage.enabled parameter and provide the S3 path, after which you want to add the hash prefix using write.data.path (for Iceberg Version 0.13 and above) or write.object-storage.path (for Iceberg Version 0.12 and below) parameters.

Insert data into the table you created.

The hash prefix is added right after the /current/ prefix in the S3 path as defined in the DDL.

Clean up

After you complete the test, clean up your resources to avoid any recurring costs:

  1. Delete the S3 buckets that you created for this test.
  2. Delete the EMR cluster.
  3. Stop and delete the EMR notebook instance.


As companies continue to build newer transactional data lake use cases using Apache Iceberg open table format on very large datasets on S3 data lakes, there will be an increased focus on optimizing those petabyte-scale production environments to reduce cost, improve efficiency, and implement high availability. This post demonstrated mechanisms to implement the operational efficiencies for Apache Iceberg open table formats running on AWS.

To learn more about Apache Iceberg and implement this open table format for your transactional data lake use cases, refer to the following resources:

About the Authors

Avijit Goswami is a Principal Solutions Architect at AWS specialized in data and analytics. He supports AWS strategic customers in building high-performing, secure, and scalable data lake solutions on AWS using AWS managed services and open-source solutions. Outside of his work, Avijit likes to travel, hike in the San Francisco Bay Area trails, watch sports, and listen to music.

Rajarshi Sarkar is a Software Development Engineer at Amazon EMR/Athena. He works on cutting-edge features of Amazon EMR/Athena and is also involved in open-source projects such as Apache Iceberg and Trino. In his spare time, he likes to travel, watch movies, and hang out with friends.

Prashant Singh is a Software Development Engineer at AWS. He is interested in Databases and Data Warehouse engines and has worked on Optimizing Apache Spark performance on EMR. He is an active contributor in open source projects like Apache Spark and Apache Iceberg. During his free time, he enjoys exploring new places, food and hiking.

How Zoom implemented streaming log ingestion and efficient GDPR deletes using Apache Hudi on Amazon EMR

Post Syndicated from Sekar Srinivasan original https://aws.amazon.com/blogs/big-data/how-zoom-implemented-streaming-log-ingestion-and-efficient-gdpr-deletes-using-apache-hudi-on-amazon-emr/

In today’s digital age, logging is a critical aspect of application development and management, but efficiently managing logs while complying with data protection regulations can be a significant challenge. Zoom, in collaboration with the AWS Data Lab team, developed an innovative architecture to overcome these challenges and streamline their logging and record deletion processes. In this post, we explore the architecture and the benefits it provides for Zoom and its users.

Application log challenges: Data management and compliance

Application logs are an essential component of any application; they provide valuable information about the usage and performance of the system. These logs are used for a variety of purposes, such as debugging, auditing, performance monitoring, business intelligence, system maintenance, and security. However, although these application logs are necessary for maintaining and improving the application, they also pose an interesting challenge. These application logs may contain personally identifiable data, such as user names, email addresses, IP addresses, and browsing history, which creates a data privacy concern.

Laws such as the General Data Protection Regulation (GDPR) and the California Consumer Privacy Act (CCPA) require organizations to retain application logs for a specific period of time. The exact length of time required for data storage varies depending on the specific regulation and the type of data being stored. The reason for these data retention periods is to ensure that companies aren’t keeping personal data longer than necessary, which could increase the risk of data breaches and other security incidents. This also helps ensure that companies aren’t using personal data for purposes other than those for which it was collected, which could be a violation of privacy laws. These laws also give individuals the right to request the deletion of their personal data, also known as the “right to be forgotten.” Individuals have the right to have their personal data erased, without undue delay.

So, on one hand, organizations need to collect application log data to ensure the proper functioning of their services, and keep the data for a specific period of time. But on the other hand, they may receive requests from individuals to delete their personal data from the logs. This creates a balancing act for organizations because they must comply with both data retention and data deletion requirements.

This issue becomes increasingly challenging for larger organizations that operate in multiple countries and states, because each country and state may have their own rules and regulations regarding data retention and deletion. For example, the Personal Information Protection and Electronic Documents Act (PIPEDA) in Canada and the Australian Privacy Act in Australia are similar laws to GDPR, but they may have different retention periods or different exceptions. Therefore, organizations big or small must navigate this complex landscape of data retention and deletion requirements, while also ensuring that they are in compliance with all applicable laws and regulations.

Zoom’s initial architecture

During the COVID-19 pandemic, the use of Zoom skyrocketed as more and more people were asked to work and attend classes from home. The company had to rapidly scale its services to accommodate the surge and worked with AWS to deploy capacity across most Regions globally. With a sudden increase in the large number of application endpoints, they had to rapidly evolve their log analytics architecture and worked with the AWS Data Lab team to quickly prototype and deploy an architecture for their compliance use case.

At Zoom, the data ingestion throughput and performance needs are very stringent. Data had to be ingested from several thousand application endpoints that produced over 30 million messages every minute, resulting in over 100 TB of log data per day. The existing ingestion pipeline consisted of writing the data to Apache Hadoop HDFS storage through Apache Kafka first and then running daily jobs to move the data to persistent storage. This took several hours while also slowing the ingestion and creating the potential for data loss. Scaling the architecture was also an issue because HDFS data would have to be moved around whenever nodes were added or removed. Furthermore, transactional semantics on billions of records were necessary to help meet compliance-related data delete requests, and the existing architecture of daily batch jobs was operationally inefficient.

It was at this time, through conversations with the AWS account team, that the AWS Data Lab team got involved to assist in building a solution for Zoom’s hyper-scale.

Solution overview

The AWS Data Lab offers accelerated, joint engineering engagements between customers and AWS technical resources to create tangible deliverables that accelerate data, analytics, artificial intelligence (AI), machine learning (ML), serverless, and container modernization initiatives. The Data Lab has three offerings: the Build Lab, the Design Lab, and Resident Architect. During the Build and Design Labs, AWS Data Lab Solutions Architects and AWS experts supported Zoom specifically by providing prescriptive architectural guidance, sharing best practices, building a working prototype, and removing technical roadblocks to help meet their production needs.

Zoom and the AWS team (collectively referred to as “the team” going forward) identified two major workflows for data ingestion and deletion.

Data ingestion workflow

The following diagram illustrates the data ingestion workflow.

Data Ingestion Workflow

The team needed to quickly populate millions of Kafka messages in the dev/test environment to achieve this. To expedite the process, we (the team) opted to use Amazon Managed Streaming for Apache Kafka (Amazon MSK), which makes it simple to ingest and process streaming data in real time, and we were up and running in under a day.

To generate test data that resembled production data, the AWS Data Lab team created a custom Python script that evenly populated over 1.2 billion messages across several Kafka partitions. To match the production setup in the development account, we had to increase the cloud quota limit via a support ticket.

We used Amazon MSK and the Spark Structured Streaming capability in Amazon EMR to ingest and process the incoming Kafka messages with high throughput and low latency. Specifically, we inserted the data from the source into EMR clusters at a maximum incoming rate of 150 million Kafka messages every 5 minutes, with each Kafka message holding 7–25 log data records.

To store the data, we chose to use Apache Hudi as the table format. We opted for Hudi because it’s an open-source data management framework that provides record-level insert, update, and delete capabilities on top of an immutable storage layer like Amazon Simple Storage Service (Amazon S3). Additionally, Hudi is optimized for handling large datasets and works well with Spark Structured Streaming, which was already being used at Zoom.

After 150 million messages were buffered, we processed the messages using Spark Structured Streaming on Amazon EMR and wrote the data into Amazon S3 in Apache Hudi-compatible format every 5 minutes. We first flattened the message array, creating a single record from the nested array of messages. Then we added a unique key, known as the Hudi record key, to each message. This key allows Hudi to perform record-level insert, update, and delete operations on the data. We also extracted the field values, including the Hudi partition keys, from incoming messages.

This architecture allowed end-users to query the data stored in Amazon S3 using Amazon Athena with the AWS Glue Data Catalog or using Apache Hive and Presto.

Data deletion workflow

The following diagram illustrates the data deletion workflow.

Data Deletion Workflow

Our architecture allowed for efficient data deletions. To help comply with the customer-initiated data retention policy for GDPR deletes, scheduled jobs ran daily to identify the data to be deleted in batch mode.

We then spun up a transient EMR cluster to run the GDPR upsert job to delete the records. The data was stored in Amazon S3 in Hudi format, and Hudi’s built-in index allowed us to efficiently delete records using bloom filters and file ranges. Because only those files that contained the record keys needed to be read and rewritten, it only took about 1–2 minutes to delete 1,000 records out of the 1 billion records, which had previously taken hours to complete as entire partitions were read.

Overall, our solution enabled efficient deletion of data, which provided an additional layer of data security that was critical for Zoom, in light of its GDPR requirements.

Architecting to optimize scale, performance, and cost

In this section, we share the following strategies Zoom took to optimize scale, performance, and cost:

  • Optimizing ingestion
  • Optimizing throughput and Amazon EMR utilization
  • Decoupling ingestion and GDPR deletion using EMRFS
  • Efficient deletes with Apache Hudi
  • Optimizing for low-latency reads with Apache Hudi
  • Monitoring

Optimizing ingestion

To keep the storage in Kafka lean and optimal, as well as to get a real-time view of data, we created a Spark job to read incoming Kafka messages in batches of 150 million messages and wrote to Amazon S3 in Hudi-compatible format every 5 minutes. Even during the initial stages of the iteration, when we hadn’t started scaling and tuning yet, we were able to successfully load all Kafka messages consistently under 2.5 minutes using the Amazon EMR runtime for Apache Spark.

Optimizing throughput and Amazon EMR utilization

We launched a cost-optimized EMR cluster and switched from uniform instance groups to using EMR instance fleets. We chose instance fleets because we needed the flexibility to use Spot Instances for task nodes and wanted to diversify the risk of running out of capacity for a specific instance type in our Availability Zone.

We started experimenting with test runs by first changing the number of Kafka partitions from 400 to 1,000, and then changing the number of task nodes and instance types. Based on the results of the run, the AWS team came up with the recommendation to use Amazon EMR with three core nodes (r5.16xlarge (64 vCPUs each)) and 18 task nodes using Spot fleet instances (a combination of r5.16xlarge (64 vCPUs), r5.12xlarge (48 vCPUs), r5.8xlarge (32 vCPUs)). These recommendations helped Zoom to reduce their Amazon EMR costs by more than 80% while meeting their desired performance goals of ingesting 150 million Kafka messages under 5 minutes.

Decoupling ingestion and GDPR deletion using EMRFS

A well-known benefit of separation of storage and compute is that you can scale the two independently. But a not-so-obvious advantage is that you can decouple continuous workloads from sporadic workloads. Previously data was stored in HDFS. Resource-intensive GDPR delete jobs and data movement jobs would compete for resources with the stream ingestion, causing a backlog of more than 5 hours in upstream Kafka clusters, which was close to filling up the Kafka storage (which only had 6 hours of data retention) and potentially causing data loss. Offloading data from HDFS to Amazon S3 allowed us the freedom to launch independent transient EMR clusters on demand to perform data deletion, helping to ensure that the ongoing data ingestion from Kafka into Amazon EMR is not starved for resources. This enabled the system to ingest data every 5 minutes and complete each Spark Streaming read in 2–3 minutes. Another side effect of using EMRFS is a cost-optimized cluster, because we removed reliance on Amazon Elastic Block Store (Amazon EBS) volumes for over 300 TB storage that was used for three copies (including two replicas) of HDFS data. We now pay for only one copy of the data in Amazon S3, which provides 11 9s of durability and is relatively inexpensive storage.

Efficient deletes with Apache Hudi

What about the conflict between ingest writes and GDPR deletes when running concurrently? This is where the power of Apache Hudi stands out.

Apache Hudi provides a table format for data lakes with transactional semantics that enables the separation of ingestion workloads and updates when run concurrently. The system was able to consistently delete 1,000 records in less than a minute. There were some limitations in concurrent writes in Apache Hudi 0.7.0, but the Amazon EMR team quickly addressed this by back-porting Apache Hudi 0.8.0, which supports optimistic concurrency control, to the current (at the time of the AWS Data Lab collaboration) Amazon EMR 6.4 release. This saved time in testing and allowed for a quick transition to the new version with minimal testing. This enabled us to query the data directly using Athena quickly without having to spin up a cluster to run ad hoc queries, as well as to query the data using Presto, Trino, and Hive. The decoupling of the storage and compute layers provided the flexibility to not only query data across different EMR clusters, but also delete data using a completely independent transient cluster.

Optimizing for low-latency reads with Apache Hudi

To optimize for low-latency reads with Apache Hudi, we needed to address the issue of too many small files being created within Amazon S3 due to the continuous streaming of data into the data lake.

We utilized Apache Hudi’s features to tune file sizes for optimal querying. Specifically, we reduced the degree of parallelism in Hudi from the default value of 1,500 to a lower number. Parallelism refers to the number of threads used to write data to Hudi; by reducing it, we were able to create larger files that were more optimal for querying.

Because we needed to optimize for high-volume streaming ingestion, we chose to implement the merge on read table type (instead of copy on write) for our workload. This table type allowed us to quickly ingest the incoming data into delta files in row format (Avro) and asynchronously compact the delta files into columnar Parquet files for fast reads. To do this, we ran the Hudi compaction job in the background. Compaction is the process of merging row-based delta files to produce new versions of columnar files. Because the compaction job would use additional compute resources, we adjusted the degree of parallelism for insertion to a lower value of 1,000 to account for the additional resource usage. This adjustment allowed us to create larger files without sacrificing performance throughput.

Overall, our approach to optimizing for low-latency reads with Apache Hudi allowed us to better manage file sizes and improve the overall performance of our data lake.


The team monitored MSK clusters with Prometheus (an open-source monitoring tool). Additionally, we showcased how to monitor Spark streaming jobs using Amazon CloudWatch metrics. For more information, refer to Monitor Spark streaming applications on Amazon EMR.


The collaboration between Zoom and the AWS Data Lab demonstrated significant improvements in data ingestion, processing, storage, and deletion using an architecture with Amazon EMR and Apache Hudi. One key benefit of the architecture was a reduction in infrastructure costs, which was achieved through the use of cloud-native technologies and the efficient management of data storage. Another benefit was an improvement in data management capabilities.

We showed that the costs of EMR clusters can be reduced by about 82% while bringing the storage costs down by about 90% compared to the prior HDFS-based architecture. All of this while making the data available in the data lake within 5 minutes of ingestion from the source. We also demonstrated that data deletions from a data lake containing multiple petabytes of data can be performed much more efficiently. With our optimized approach, we were able to delete approximately 1,000 records in just 1–2 minutes, as compared to the previously required 3 hours or more.


In conclusion, the log analytics process, which involves collecting, processing, storing, analyzing, and deleting log data from various sources such as servers, applications, and devices, is critical to aid organizations in working to meet their service resiliency, security, performance monitoring, troubleshooting, and compliance needs, such as GDPR.

This post shared what Zoom and the AWS Data Lab team have accomplished together to solve critical data pipeline challenges, and Zoom has extended the solution further to optimize extract, transform, and load (ETL) jobs and resource efficiency. However, you can also use the architecture patterns presented here to quickly build cost-effective and scalable solutions for other use cases. Please reach out to your AWS team for more information or contact Sales.

About the Authors

Sekar Srinivasan is a Sr. Specialist Solutions Architect at AWS focused on Big Data and Analytics. Sekar has over 20 years of experience working with data. He is passionate about helping customers build scalable solutions modernizing their architecture and generating insights from their data. In his spare time he likes to work on non-profit projects focused on underprivileged Children’s education.

Chandra DhandapaniChandra Dhandapani is a Senior Solutions Architect at AWS, where he specializes in creating solutions for customers in Analytics, AI/ML, and Databases. He has a lot of experience in building and scaling applications across different industries including Healthcare and Fintech. Outside of work, he is an avid traveler and enjoys sports, reading, and entertainment.

Amit Kumar Agrawal is a Senior Solutions Architect at AWS, based out of San Francisco Bay Area. He works with large strategic ISV customers to architect cloud solutions that address their business challenges. During his free time he enjoys exploring the outdoors with his family.

Viral Shah is a Analytics Sales Specialist working with AWS for 5 years helping customers to be successful in their data journey. He has over 20+ years of experience working with enterprise customers and startups, primarily in the data and database space. He loves to travel and spend quality time with his family.

Improve reliability and reduce costs of your Apache Spark workloads with vertical autoscaling on Amazon EMR on EKS

Post Syndicated from Rajkishan Gunasekaran original https://aws.amazon.com/blogs/big-data/improve-reliability-and-reduce-costs-of-your-apache-spark-workloads-with-vertical-autoscaling-on-amazon-emr-on-eks/

Amazon EMR on Amazon EKS is a deployment option offered by Amazon EMR that enables you to run Apache Spark applications on Amazon Elastic Kubernetes Service (Amazon EKS) in a cost-effective manner. It uses the EMR runtime for Apache Spark to increase performance so that your jobs run faster and cost less.

Apache Spark allows you to configure the amount of Memory and vCPU cores that a job will utilize. However, tuning these values is a manual process that can be complex and ripe with pitfalls. For example, allocating too little memory can result in out-of-memory exceptions and poor job reliability. On the other hand, too much can result in over-spending on idle resources, poor cluster utilization and high costs. Moreover, it’s hard to right-size these settings for some use cases such as interactive analytics due to lack of visibility into future requirements. In the case of recurring jobs, keeping these settings up to date taking into account changing load patterns (due to external seasonal factors for instance) remains a challenge.

To address this, Amazon EMR on EKS has recently announced support for vertical autoscaling, a feature that uses the Kubernetes Vertical Pod Autoscaler (VPA) to automatically tune the memory and CPU resources of EMR Spark applications to adapt to the needs of the provided workload, simplifying the process of tuning resources and optimizing costs for these applications. You can use vertical autoscaling’s ability to tune resources based on historic data to keep memory and CPU settings up to date even when the profile of the workload varies over time. Additionally, you can use its ability to react to real-time signals to enable applications recover from out-of-memory (OOM) exceptions, helping improve job reliability.

Vertical autoscaling vs existing autoscaling solutions

Vertical autoscaling complements existing Spark autoscaling solutions such as Dynamic Resource Allocation (DRA) and Kubernetes autoscaling solutions such as Karpenter.

Features such as DRA typically work on the horizontal axis, where an increase in load results in an increase in the number of Kubernetes pods that will process the load. In the case of Spark, this results in data being processed across additional executors. When DRA is enabled, Spark starts with an initial number of executors and scales this up if it observes that there are tasks sitting and waiting for executors to run on. DRA works at the pod level and would need an underlying cluster-level auto-scaler such as Karpenter to bring in additional nodes or scale down unused nodes in response to these pods getting created and deleted.

However, for a given data profile and a query plan, sometimes the parallelism and the number of executors can’t be easily changed. As an example, if you’re attempting to join two tables that store data already sorted and bucketed by the join keys, Spark can efficiently join the data by using a fixed number of executors that equals the number of buckets in the source data. Since the number of executors cannot be changed, vertical autoscaling can help here by offering additional resources or scaling down unused resources at the executor level. This has a few advantages:

  • If a pod is optimally sized, the Kubernetes scheduler can efficiently pack in more pods in a single node, leading to better utilization of the underlying cluster.
  • The Amazon EMR on EKS uplift is charged based on the vCPU and memory resources consumed by a Kubernetes pod.This means an optimally sized pod is cheaper.

How vertical autoscaling works

Vertical autoscaling is a feature that you can opt into at the time of submitting an EMR on EKS job. When enabled, it uses VPA to track the resource utilization of your EMR Spark jobs and derive recommendations for resource assignments for Spark executor pods based on this data. The data, fetched from the Kubernetes Metric Server, feeds into statistical models that VPA constructs in order to build recommendations. When new executor pods spin up belonging to a job that has vertical autoscaling enabled, they’re autoscaled based on this recommendation, ignoring the usual sizing done via Spark’s executor memory configuration (controlled by the spark.executor.memory Spark setting).

Vertical autoscaling does not impact pods that are running, since in-place resizing of pods remains unsupported as of Kubernetes version 1.26, the latest supported version of Kubernetes on Amazon EKS as of this writing. However, it’s useful in the case of a recurring job where we can perform autoscaling based on historic data as well as scenarios when some pods go out-of-memory and get re-started by Spark, where vertical autoscaling can be used to selectively scale up the re-started pods and facilitate automatic recovery.

Data tracking and recommendations

To recap, vertical autoscaling uses VPA to track resource utilization for EMR jobs. For a deep-dive into the functionality, refer to the VPA Github repo. In short, vertical autoscaling sets up VPA to track the container_memory_working_set_bytes metric for the Spark executor pods that have vertical autoscaling enabled.

Real-time metric data is fetched from the Kubernetes Metric Server. By default, vertical autoscaling tracks the peak memory working set size for each pod and makes recommendations based on the p90 of the peak with a 40% safety margin added in. It also listens to pod events such as OOM events and reacts to these events. In the case of OOM events, VPA automatically bumps up the recommended resource assignment by 20%.

The statistical models, which also represent historic resource utilization data are stored as custom resource objects on your EKS cluster. This means that deleting these objects also purges old recommendations.

Customized recommendations through job signature

One of the major use-cases of vertical autoscaling is to aggregate usage data across different runs of EMR Spark jobs to derive resource recommendations. To do so, you need to provide a job signature. This can be a unique name or identifier that you configure at the time of submitting your job. If your job recurs at a fixed schedule (such as daily or weekly), it’s important that your job signature doesn’t change for each new instance of the job in order for VPA to aggregate and compute recommendations across different runs of the job.

A job signature can be the same even across different jobs if you believe they’ll have similar resource profiles. You can therefore use the signature to combine tracking and resource modeling across different jobs that you expect to behave similarly. Conversely, if a job’s behavior is changing at some point in time, such as due to a change in the upstream data or the query pattern, you can easily purge the old recommendations by either changing your signature or deleting the VPA custom resource for this signature (as explained later in this post).

Monitoring mode

You can use vertical autoscaling in a monitoring mode where no autoscaling is actually performed. Recommendations are reported to Prometheus if you have that setup on your cluster and you can monitor the recommendations through Grafana dashboards and use that to debug and make manual changes to the resource assignments. Monitoring mode is the default but you can override and use one of the supported autoscaling modes as well at the time of submitting a job. Refer to documentation for usage and a walkthrough on how to get started.

Monitoring vertical autoscaling through kubectl

You can use the Kubernetes command-line tool kubectl to list active recommendations on your cluster, view all the job signatures that are being tracked as well as purge resources associated with signatures that aren’t relevant anymore. In this section, we provide some example code to demonstrate listing, querying, and deleting recommendations.

List all vertical autoscaling recommendations on a cluster

You can use kubectl to get the verticalpodautoscaler resource in order to view the current status and recommendations. The following sample query lists all resources currently active on your EKS cluster:

kubectl get verticalpodautoscalers \
-o custom-columns='NAME:.metadata.name,'\
'MEM:.status.recommendation.containerRecommendations[0].target.memory' \

This produces output similar to the following

NAME               SIGNATURE           MODE      MEM
ds-<some-id>-vpa   <some-signature>    Off       930143865
ds-<some-id>-vpa   <some-signature>    Initial   14291063673

Query and delete a recommendation

You can also use kubectl to purge recommendation for a job based on the signature. Alternately, you can use the --all flag and skip specifying the signature to purge all the resources on your cluster. Note that in this case you’ll actually be deleting the EMR vertical autoscaling job-run resource. This is a custom resource managed by EMR, deleting it automatically deletes the associated VPA object that tracks and stores recommendations. See the following code:

kubectl delete jobrun -n emr \
jobrun.dynamicsizing.emr.services.k8s.aws "ds-<some-id>" deleted

You can use the --all and --all-namespaces to delete all vertical autoscaling related resources

kubectl delete jobruns --all --all-namespaces
jobrun.dynamicsizing.emr.services.k8s.aws "ds-<some-id>" deleted

Monitor vertical autoscaling through Prometheus and Grafana

You can use Prometheus and Grafana to monitor the vertical autoscaling functionality on your EKS cluster. This includes viewing recommendations that evolve over time for different job signatures, monitoring the autoscaling functionality etc. For this setup, we assume Prometheus and Grafana are already installed on your EKS cluster using the official Helm charts. If not, refer to the Setting up Prometheus and Grafana for monitoring the cluster section of the Running batch workloads on Amazon EKS workshop to get them up and running on your cluster.

Modify Prometheus to collect vertical autoscaling metrics

Prometheus doesn’t track vertical autoscaling metrics by default. To enable this, you’ll need to start gathering metrics from the VPA custom resource objects on your cluster. This can be easily done by patching your Helm chart with the following configuration:

helm upgrade -f prometheus-helm-values.yaml prometheus prometheus-community/prometheus -n prometheus

Here, prometheus-helm-values.yaml is the vertical autoscaling specific customization that tells Prometheus to gather vertical autoscaling related recommendations from the VPA resource objects, along with the minimal required metadata such as the job’s signature.

You can verify if this setup is working by running the following Prometheus queries for the newly created custom metrics:

  • kube_customresource_vpa_spark_rec_memory_target
  • kube_customresource_vpa_spark_rec_memory_lower
  • kube_customresource_vpa_spark_rec_memory_upper

These represent the lower bound, upper bound and target memory for EMR Spark jobs that have vertical autoscaling enabled. The query can be grouped or filtered using the signature label similar to the following Prometheus query:


Use Grafana to visualize recommendations and autoscaling functionality

You can use our sample Grafana dashboard by importing the EMR vertical autoscaling JSON model into your Grafana deployment. The dashboard visualizes vertical autoscaling recommendations alongside the memory provisioned and actually utilized by EMR Spark applications as shown in the following screenshot.

Grafana Dashboard

Results are presented categorized by your Kubernetes namespace and job signature. When you choose a certain namespace and signature combination, you’re presented with a pane. The pane represents a comparison of the vertical autoscaling recommendations for jobs belonging to the chosen signature, compared to the actual resource utilization of that job and the amount of Spark executor memory provisioned to the job. If autoscaling is enabled, the expectation is that the Spark executor memory would track the recommendation. If you’re in monitoring mode however, the two won’t match but you can still view the recommendations from this dashboard or use them to better understand the actual utilization and resource profile of your job.

Illustration of provisioned memory, utilization and recommendations

To better illustrate vertical autoscaling behavior and usage for different workloads, we executed query 2 of the TPC-DS benchmark for 5 iterations — The first two iterations in monitoring mode and the last 3 in autoscaling mode and visualized the results in the Grafana dashboard shared in the previous section.

Monitoring mode

This particular job was provisioned to run with 32GB of executor memory (the blue line in the image) but the actual utilization hovered at around the 10 GB mark (amber line). Vertical autoscaling computes a recommendation of approximately 14 GB based on this run (the green line). This recommendation is based on the actual utilization with a safety margin added in.

Cost optimization example 1

The second iteration of the job was also run in the monitoring mode and the utilization and the recommendations remained unchanged.

cost optimization example 2

Autoscaling mode

Iterations 3 through 5 were run in autoscaling mode. In this case, the provisioned memory drops from 32GB to match the recommended value of 14GB (the blue line).

cost optimization example 3

The utilization and recommendations remained unchanged for subsequent iterations in the case of this example. Furthermore, we observed that all the iterations of the job completed in around 5 minutes, both with and without autoscaling. This example illustrates the successful scaling down of the job’s executor memory allocation by about 56% (a drop from 32GB to approximately 14 GB) which also translates to an equivalent reduction in the EMR memory uplift costs of the job, with no impact to the job’s performance.

Automatic OOM recovery

In the earlier example, we didn’t observe any OOM events as a result of autoscaling. In the rare occasion where autoscaling results in OOM events, jobs should usually be scaled back up automatically. On the other hand, if a job that has autoscaling enabled is under-provisioned and as a result experiences OOM events, vertical autoscaling can scale up resources to facilitate automatic recovery.

In the following example, a job was provisioned with 2.5 GB of executor memory and experienced OOM exceptions during its execution. Vertical autoscaling responded to the OOM events by automatically scaling up failed executors when they were re-started. As seen in the following image, when the amber line representing memory utilization started approaching the blue line representing the provisioned memory, vertical autoscaling kicked in to increase the amount of provisioned memory for the re-started executors, allowing the automatic recovery and successful completion of the job without any intervention. The recommended memory converged to approximately 5 GB before the job completed.

OOM recovery example

All subsequent runs of jobs with the same signature will now start-up with the recommended settings computed earlier, preventing OOM events right from the start.


Refer to documentation for information on cleaning up vertical autoscaling related resources from your cluster. To cleanup your EMR on EKS cluster after trying out the vertical autoscaling feature, refer to the clean-up section of the EMR on EKS workshop.


You can use vertical autoscaling to easily monitor resource utilization for one or more EMR on EKS jobs without any impact to your production workloads. You can use standard Kubernetes tooling including Prometheus, Grafana and kubectl to interact with and monitor vertical autoscaling on your cluster. You can also autoscale your EMR Spark jobs using recommendations that are derived based on the needs of your job, allowing you to realize cost savings and optimize cluster utilization as well as build resiliency to out-of-memory errors. Additionally, you can use it in conjunction with existing autoscaling mechanisms such as Dynamic Resource Allocation and Karpenter to effortlessly achieve optimal vertical resource assignment. Looking ahead, when Kubernetes fully supports in-place resizing of pods, vertical autoscaling will be able to take advantage of it to seamlessly scale your EMR jobs up or down, further facilitating optimal costs and cluster utilization.

To learn more about EMR on EKS vertical autoscaling and getting started with it, refer to documentation. You can also use the EMR on EKS Workshop to try out the EMR on EKS deployment option for Amazon EMR.

About the author

Rajkishan Gunasekaran is a Principal Engineer for Amazon EMR on EKS at Amazon Web Services.

Build, deploy, and run Spark jobs on Amazon EMR with the open-source EMR CLI tool

Post Syndicated from Damon Cortesi original https://aws.amazon.com/blogs/big-data/build-deploy-and-run-spark-jobs-on-amazon-emr-with-the-open-source-emr-cli-tool/

Today, we’re pleased to introduce the Amazon EMR CLI, a new command line tool to package and deploy PySpark projects across different Amazon EMR environments. With the introduction of the EMR CLI, you now have a simple way to not only deploy a wide range of PySpark projects to remote EMR environments, but also integrate with your CI/CD solution of choice.

In this post, we show how you can use the EMR CLI to create a new PySpark project from scratch and deploy it to Amazon EMR Serverless in one command.

Overview of solution

The EMR CLI is an open-source tool to help improve the developer experience of developing and deploying jobs on Amazon EMR. When you’re just getting started with Apache Spark, there are a variety of options with respect to how to package, deploy, and run jobs that can be overwhelming or require deep domain expertise. The EMR CLI provides simple commands for these actions that remove the guesswork from deploying Spark jobs. You can use it to create new projects or alongside existing PySpark projects.

In this post, we walk through creating a new PySpark project that analyzes weather data from the NOAA Global Surface Summary of Day open dataset. We’ll use the EMR CLI to do the following:

  1. Initialize the project.
  2. Package the dependencies.
  3. Deploy the code and dependencies to Amazon Simple Storage Service (Amazon S3).
  4. Run the job on EMR Serverless.


For this walkthrough, you should have the following prerequisites:

  • An AWS account
  • An EMR Serverless application in the us-east-1 Region
  • An S3 bucket for your code and logs in the us-east-1 Region
  • An AWS Identity and Access Management (IAM) job role that can run EMR Serverless jobs and access S3 buckets
  • Python version >= 3.7
  • Docker

If you don’t already have an existing EMR Serverless application, you can use the following AWS CloudFormation template or use the emr bootstrap command after you’ve installed the CLI.


Install the EMR CLI

You can find the source for the EMR CLI in the GitHub repo, but it’s also distributed via PyPI. It requires Python version >= 3.7 to run and is tested on macOS, Linux, and Windows. To install the latest version, use the following command:

pip3 install emr-cli

You should now be able to run the emr --help command and see the different subcommands you can use:

❯ emr --help                                                                                                
Usage: emr [OPTIONS] COMMAND [ARGS]...                                                                      
  Package, deploy, and run PySpark projects on EMR.                                                         
  --help  Show this message and exit.                                                                       
  bootstrap  Bootstrap an EMR Serverless environment.                                                       
  deploy     Copy a local project to S3.                                                                    
  init       Initialize a local PySpark project.                                                            
  package    Package a project and dependencies into dist/                                                  
  run        Run a project on EMR, optionally build and deploy                                              

If you didn’t already create an EMR Serverless application, the bootstrap command can create a sample environment for you and a configuration file with the relevant settings. Assuming you used the provided CloudFormation stack, set the following environment variables using the information on the Outputs tab of your stack. Set the Region in the terminal to us-east-1 and set a few other environment variables we’ll need along the way:

export AWS_REGION=us-east-1

We use us-east-1 because that’s where the NOAA GSOD data bucket is. EMR Serverless can access S3 buckets and other AWS resources in the same Region by default. To access other services, configure EMR Serverless with VPC access.

Initialize a project

Next, we use the emr init command to initialize a default PySpark project for us in the provided directory. The default templates create a standard Python project that uses pyproject.toml to define its dependencies. In this case, we use Pandas and PyArrow in our script, so those are already pre-populated.

❯ emr init my-project
[emr-cli]: Initializing project in my-project
[emr-cli]: Project initialized.

After the project is initialized, you can run cd my-project or open the my-project directory in your code editor of choice. You should see the following set of files:

├── Dockerfile  
├── entrypoint.py  
├── jobs  
│ └── extreme_weather.py  
└── pyproject.toml

Note that we also have a Dockerfile here. This is used by the package command to ensure that our project dependencies are built on the right architecture and operating system for Amazon EMR.

If you use Poetry to manage your Python dependencies, you can also add a --project-type poetry flag to the emr init command to create a Poetry project.

If you already have an existing PySpark project, you can use emr init --dockerfile to create the Dockerfile necessary to package things up.

Run the project

Now that we’ve got our sample project created, we need to package our dependencies, deploy the code to Amazon S3, and start a job on EMR Serverless. With the EMR CLI, you can do all of that in one command. Make sure to run the command from the my-project directory:

emr run \
--entry-point entrypoint.py \
--application-id ${APPLICATION_ID} \
--job-role ${JOB_ROLE_ARN} \
--s3-code-uri s3://${S3_BUCKET}/tmp/emr-cli-demo/ \
--build \

This command performs several actions:

  1. Auto-detects the type of Spark project in the current directory.
  2. Initiates a build for your project to package up dependencies.
  3. Copies your entry point and resulting build files to Amazon S3.
  4. Starts an EMR Serverless job.
  5. Waits for the job to finish, exiting with an error status if it fails.

You should now see the following output in your terminal as the job begins running in EMR Serverless:

[emr-cli]: Job submitted to EMR Serverless (Job Run ID: 00f8uf1gpdb12r0l)
[emr-cli]: Waiting for job to complete...
[emr-cli]: Job state is now: SCHEDULED
[emr-cli]: Job state is now: RUNNING
[emr-cli]: Job state is now: SUCCESS
[emr-cli]: Job completed successfully!

And that’s it! If you want to run the same code on Amazon EMR on Amazon Elastic Compute Cloud (Amazon EC2), you can replace --application-id with --cluster-id j-11111111. The CLI will take care of sending the right spark-submit commands to your EMR cluster.

Now let’s walk through some of the other commands.

emr package

PySpark projects can be packaged in numerous ways, from a single .py file to a complex Poetry project with various dependencies. The EMR CLI can help consistently package your projects without having to worry about the details.

For example, if you have a single .py file in your project directory, the package command doesn’t need to do anything. If, however, you have multiple .py files in a typical Python project style, the emr package command will zip these files up as a package that can later be uploaded to Amazon S3 and provided to your PySpark job using the --py-files option. If you have third party dependencies defined in pyproject.toml, emr package will create a virtual environment archive and start your EMR job with the spark.archive option.

The EMR CLI also supports Poetry for dependency management and packaging. If you have a Poetry project with a corresponding poetry.lock file, there’s nothing else you need to do. The emr package command will detect your poetry.lock file and automatically build the project using the Poetry Bundle plugin. You can use a Poetry project in two ways:

  • Create a project using the emr init command. The commands take a --project-type poetry option that create a Poetry project for you:
    ❯ emr init --project-type poetry emr-poetry  
    [emr-cli]: Initializing project in emr-poetry  
    [emr-cli]: Project initialized.
    ❯ cd emr-poetry
    ❯ poetry install

  • If you have a pre-existing project, you can use the emr init --dockerfile option, which creates a Dockerfile that is automatically used when you run emr package.

Finally, as noted earlier, the EMR CLI provides you a default Dockerfile based on Amazon Linux 2 that you can use to reliably build package artifacts that are compatible with different EMR environments.

emr deploy

The emr deploy command takes care of copying the necessary artifacts for your project to Amazon S3, so you don’t have to worry about it. Regardless of how the project is packaged, emr deploy will copy the resulting files to your Amazon S3 location of choice.

One use case for this is with CI/CD pipelines. Sometimes you want to deploy a specific version of code to Amazon S3 to be used in your data pipelines. With emr deploy, this is as simple as changing the --s3-code-uri parameter.

For example, let’s assume you’ve already packaged your project using the emr package command. Most CI/CD pipelines allow you to access the git tag. You can use that as part of the emr deploy command to deploy a new version of your artifacts. In GitHub actions, this is github.ref_name, and you can use this in an action to deploy a versioned artifact to Amazon S3. See the following code:

emr deploy \
    --entry-point entrypoint.py \
    --s3-code-uri s3://<BUCKET_NAME>/<PREFIX>/${{github.ref_name}}/

In your downstream jobs, you could then update the location of your entry point files to point to this new location when you’re ready, or you can use the emr run command discussed in the next section.

emr run

Let’s take a quick look at the emr run command. We’ve used it before to package, deploy, and run in one command, but you can also use it to run on already-deployed artifacts. Let’s look at the specific options:

❯ emr run --help                                                                                            
Usage: emr run [OPTIONS]                                                                                    
  Run a project on EMR, optionally build and deploy                                                         
  --application-id TEXT     EMR Serverless Application ID                                                   
  --cluster-id TEXT         EMR on EC2 Cluster ID                                                           
  --entry-point FILE        Python or Jar file for the main entrypoint                                      
  --job-role TEXT           IAM Role ARN to use for the job execution                                       
  --wait                    Wait for job to finish                                                          
  --s3-code-uri TEXT        Where to copy/run code artifacts to/from                                        
  --job-name TEXT           The name of the job                                                             
  --job-args TEXT           Comma-delimited string of arguments to be passed                                
                            to Spark job                                                                    
  --spark-submit-opts TEXT  String of spark-submit options                                                  
  --build                   Package and deploy job artifacts                                                
  --show-stdout             Show the stdout of the job after it's finished                                  
  --help                    Show this message and exit.

If you want to run your code on EMR Serverless, the emr run command takes an --application-id and --job-role parameters. If you want to run on EMR on EC2, you only need the --cluster-id option.

Required for both options are --entry-point and --s3-code-uri. --entry-point is the main script that will be called by Amazon EMR. If you have any dependencies, --s3-code-uri is where they get uploaded to using the emr deploy command, and the EMR CLI will build the relevant spark-submit properties pointing to these artifacts.

There are a few different ways to customize the job:

  • –job-name – Allows you to specify the job or step name
  • –job-args – Allows you to provide command line arguments to your script
  • –spark-submit-opts – Allows you to add additional spark-submit options like --conf spark.jars or others
  • –show-stdout – Currently only works with single-file .py jobs on EMR on EC2, but will display stdout in your terminal after the job is complete

As we’ve seen before, --build invokes both the package and deploy commands. This makes it easier to iterate on local development when your code still needs to run remotely. You can simply use the same emr run command over and over again to build, deploy, and run your code in your environment of choice.

Future updates

The EMR CLI is under active development. Updates are currently in progress to support Amazon EMR on EKS and allow for the creation of local development environments to make local iteration of Spark jobs even easier. Feel free to contribute to the project in the GitHub repository.

Clean up

To avoid incurring future charges, stop or delete your EMR Serverless application. If you used the CloudFormation template, be sure to delete your stack.


With the release of the EMR CLI, we’ve made it easier for you to deploy and run Spark jobs on EMR Serverless. The utility is available as open source on GitHub. We’re planning a host of new functionalities; if there are specific requests you have, feel free to file an issue or open a pull request!

About the author

Damon is a Principal Developer Advocate on the EMR team at AWS. He’s worked with data and analytics pipelines for over 10 years and splits his team between splitting service logs and stacking firewood.

Accelerate HiveQL with Oozie to Spark SQL migration on Amazon EMR

Post Syndicated from Vinay Kumar Khambhampati original https://aws.amazon.com/blogs/big-data/accelerate-hiveql-with-oozie-to-spark-sql-migration-on-amazon-emr/

Many customers run big data workloads such as extract, transform, and load (ETL) on Apache Hive to create a data warehouse on Hadoop. Apache Hive has performed pretty well for a long time. But with advancements in infrastructure such as cloud computing and multicore machines with large RAM, Apache Spark started to gain visibility by performing better than Apache Hive.

Customers now want to migrate their Apache Hive workloads to Apache Spark in the cloud to get the benefits of optimized runtime, cost reduction through transient clusters, better scalability by decoupling the storage and compute, and flexibility. However, migration from Apache Hive to Apache Spark needs a lot of manual effort to write migration scripts and maintain different Spark job configurations.

In this post, we walk you through a solution that automates the migration from HiveQL to Spark SQL. The solution was used to migrate Hive with Oozie workloads to Spark SQL and run them on Amazon EMR for a large gaming client. You can also use this solution to develop new jobs with Spark SQL and process them on Amazon EMR. This post assumes that you have a basic understanding of Apache Spark, Hive, and Amazon EMR.

Solution overview

In our example, we use Apache Oozie, which schedules Apache Hive jobs as actions to collect and process data on a daily basis.

We migrate these Oozie workflows with Hive actions by extracting the HQL files, and dynamic and static parameters, and converting them to be Spark compliant. Manual conversion is both time consuming and error prone. To convert the HQL to Spark SQL, you’ll need to sort through existing HQLs, replace the parameters, and change the syntax for a bunch of files.

Instead, we can use automation to speed up the process of migration and reduce heavy lifting tasks, costs, and risks.

We split the solution into two primary components: generating Spark job metadata and running the SQL on Amazon EMR. The first component (metadata setup) consumes existing Hive job configurations and generates metadata such as number of parameters, number of actions (steps), and file formats. The second component consumes the generated metadata from the first component and prepares the run order of Spark SQL within a Spark session. With this solution, we support basic orchestration and scheduling with the help of AWS services like Amazon DynamoDB and Amazon Simple Storage Service (Amazon S3). We can validate the solution by running queries in Amazon Athena.

In the following sections, we walk through these components and how to use these automations in detail.

Generate Spark SQL metadata

Our batch job consists of Hive steps scheduled to run sequentially. For each step, we run HQL scripts that extract, transform, and aggregate input data into one final Hive table, which stores data in HDFS. We use the following Oozie workflow parser script, which takes the input of an existing Hive job and generates configurations artifacts needed for running SQL using PySpark.

Oozie workflow XML parser

We create a Python script to automatically parse the Oozie jobs, including workflow.xml, co-ordinator.xml, job properties, and HQL files. This script can handle many Hive actions in a workflow by organizing the metadata at the step level into separate folders. Each step includes the list of SQLs, SQL paths, and their static parameters, which are input for the solution in the next step.

The process consists of two steps:

  1. The Python parser script takes input of the existing Oozie Hive job and its configuration files.
  2. The script generates a metadata JSON file for each step.

The following diagram outlines these steps and shows sample output.


You need the following prerequisites:

  • Python 3.8
  • Python packages:
    • sqlparse==0.4.2
    • jproperties==2.1.1
    • defusedxml== 0.7.1


Complete the following steps:

  1. Install Python 3.8.
  2. Create a virtual environment:
python3 -m venv /path/to/new/virtual/environment
  1. Activate the newly created virtual environment:
source /path/to/new/virtual/environment/bin/activate
  1. Git clone the project:
git clone https://github.com/aws-samples/oozie-job-parser-extract-hive-sql
  1. Install dependent packages:
cd oozie-job-parser-extract-hive-sql
pip install -r requirements.txt

Sample command

We can use the following sample command:

python xml_parser.py --base-folder ./sample_jobs/ --job-name sample_oozie_job_name --job-version V3 --hive-action-version 0.4 --coordinator-action-version 0.4 --workflow-version 0.4 --properties-file-name job.coordinator.properties

The output is as follows:

{'nameNode': 'hdfs://@{{/cluster/${{cluster}}/namenode}}:54310', 'jobTracker': '@{{/cluster/${{cluster}}/jobtracker}}:54311', 'queueName': 'test_queue', 'appName': 'test_app', 'oozie.use.system.libpath': 'true', 'oozie.coord.application.path': '${nameNode}/user/${user.name}/apps/${appName}', 'oozie_app_path': '${oozie.coord.application.path}', 'start': '${{startDate}}', 'end': '${{endDate}}', 'initial_instance': '${{startDate}}', 'job_name': '${appName}', 'timeOut': '-1', 'concurrency': '3', 'execOrder': 'FIFO', 'throttle': '7', 'hiveMetaThrift': '@{{/cluster/${{cluster}}/hivemetastore}}', 'hiveMySQL': '@{{/cluster/${{cluster}}/hivemysql}}', 'zkQuorum': '@{{/cluster/${{cluster}}/zookeeper}}', 'flag': '_done', 'frequency': 'hourly', 'owner': 'who', 'SLA': '2:00', 'job_type': 'coordinator', 'sys_cat_id': '6', 'active': '1', 'data_file': 'hdfs://${nameNode}/hive/warehouse/test_schema/test_dataset', 'upstreamTriggerDir': '/input_trigger/upstream1'}
('./sample_jobs/development/sample_oozie_job_name/step1/step1.json', 'w')

('./sample_jobs/development/sample_oozie_job_name/step2/step2.json', 'w')


This method has the following limitations:

  • The Python script parses only HiveQL actions from the Oozie workflow.xml.
  • The Python script generates one file for each SQL statement and uses the sequence ID for file names. It doesn’t name the SQL based on the functionality of the SQL.

Run Spark SQL on Amazon EMR

After we create the SQL metadata files, we use another automation script to run them with Spark SQL on Amazon EMR. This automation script supports custom UDFs by adding JAR files to spark submit. This solution uses DynamoDB for logging the run details of SQLs for support and maintenance.

Architecture overview

The following diagram illustrates the solution architecture.


You need the following prerequisites:

  • Version:
    • Spark 3.X
    • Python 3.8
    • Amazon EMR 6.1


Complete the following steps:

  1. Install the AWS Command Line Interface (AWS CLI) on your workspace by following the instructions in Installing or updating the latest version of the AWS CLI. To configure AWS CLI interaction with AWS, refer to Quick setup.
  2. Create two tables in DynamoDB: one to store metadata about jobs and steps, and another to log job runs.
    • Use the following AWS CLI command to create the metadata table in DynamoDB:
aws dynamodb create-table --region us-east-1 --table-name dw-etl-metadata --attribute-definitions '[ { "AttributeName": "id","AttributeType": "S" } , { "AttributeName": "step_id","AttributeType": "S" }]' --key-schema '[{"AttributeName": "id", "KeyType": "HASH"}, {"AttributeName": "step_id", "KeyType": "RANGE"}]' --billing-mode PROVISIONED --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5

You can check on the DynamoDB console that the table dw-etl-metadata is successfully created.

The metadata table has the following attributes.

Attributes Type Comments
id String partition_key
step_id String sort_key
step_name String Step description
sql_base_path string Base path
sql_info list List of SQLs in ETL pipeline
. sql_path SQL file name
. sql_active_flag y/n
. sql_load_order Order of SQL
. sql_parameters Parameters in SQL and values
spark_config Map Spark configs
    • Use the following AWS CLI command to create the log table in DynamoDB:
aws dynamodb create-table --region us-east-1 --table-name dw-etl-pipelinelog --attribute-definitions '[ { "AttributeName":"job_run_id", "AttributeType": "S" } , { "AttributeName":"step_id", "AttributeType": "S" } ]' --key-schema '[{"AttributeName": "job_run_id", "KeyType": "HASH"},{"AttributeName": "step_id", "KeyType": "RANGE"}]' --billing-mode PROVISIONED --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5

You can check on the DynamoDB console that the table dw-etl-pipelinelog is successfully created.

The log table has the following attributes.

Attributes Type Comments
job_run_id String partition_key
id String sort_key (UUID)
end_time String End time
error_description String Error in case of failure
expire Number Time to Live
sql_seq Number SQL sequence number
start_time String Start time
Status String Status of job
step_id String Job ID SQL belongs

The log table can grow quickly if there are too many jobs or if they are running frequently. We can archive them to Amazon S3 if they are no longer used or use the Time to Live feature of DynamoDB to clean up old records.

  1. Run the first command to set the variable in case you have an existing bucket that can be reused. If not, create a S3 bucket to store the Spark SQL code, which will be run by Amazon EMR.
export s3_bucket_name=unique-code-bucket-name # Change unique-code-bucket-name to a valid bucket name
aws s3api create-bucket --bucket $s3_bucket_name --region us-east-1
  1. Enable secure transfer on the bucket:
aws s3api put-bucket-policy --bucket $s3_bucket_name --policy '{"Version": "2012-10-17", "Statement": [{"Effect": "Deny", "Principal": {"AWS": "*"}, "Action": "s3:*", "Resource": ["arn:aws:s3:::unique-code-bucket-name", "arn:aws:s3:::unique-code-bucket-name/*"], "Condition": {"Bool": {"aws:SecureTransport": "false"} } } ] }' # Change unique-code-bucket-name to a valid bucket name

  1. Clone the project to your workspace:
git clone https://github.com/aws-samples/pyspark-sql-framework.git
  1. Create a ZIP file and upload it to the code bucket created earlier:
cd pyspark-sql-framework/code
zip code.zip -r *
aws s3 cp ./code.zip s3://$s3_bucket_name/framework/code.zip
  1. Upload the ETL driver code to the S3 bucket:
cd $OLDPWD/pyspark-sql-framework
aws s3 cp ./code/etl_driver.py s3://$s3_bucket_name/framework/

  1. Upload sample job SQLs to Amazon S3:
aws s3 cp ./sample_oozie_job_name/ s3://$s3_bucket_name/DW/sample_oozie_job_name/ --recursive

  1. Add a sample step (./sample_oozie_job_name/step1/step1.json) to DynamoDB (for more information, refer to Write data to a table using the console or AWS CLI):
  "name": "step1.q",
  "step_name": "step1",
  "sql_info": [
      "sql_load_order": 5,
      "sql_parameters": {
        "DATE": "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd')}",
        "HOUR": "${coord:formatTime(coord:nominalTime(), 'HH')}"
      "sql_active_flag": "Y",
      "sql_path": "5.sql"
      "sql_load_order": 10,
      "sql_parameters": {
        "DATE": "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd')}",
        "HOUR": "${coord:formatTime(coord:nominalTime(), 'HH')}"
      "sql_active_flag": "Y",
      "sql_path": "10.sql"
  "id": "emr_config",
  "step_id": "sample_oozie_job_name#step1",
  "sql_base_path": "sample_oozie_job_name/step1/",
  "spark_config": {
    "spark.sql.parser.quotedRegexColumnNames": "true"

  1. In the Athena query editor, create the database base:
create database base;
  1. Copy the sample data files from the repo to Amazon S3:
    1. Copy us_current.csv:
aws s3 cp ./sample_data/us_current.csv s3://$s3_bucket_name/covid-19-testing-data/base/source_us_current/;

  1. Copy states_current.csv:
aws s3 cp ./sample_data/states_current.csv s3://$s3_bucket_name/covid-19-testing-data/base/source_states_current/;

  1. To create the source tables in the base database, run the DDLs present in the repo in the Athena query editor:
    1. Run the ./sample_data/ddl/states_current.q file by modifying the S3 path to the bucket you created.
    1. Run the ./sample_data/ddl/us_current.q file by modifying the S3 path to the bucket you created.

The ETL driver file implements the Spark driver logic. It can be invoked locally or on an EMR instance.

  1. Launch an EMR cluster.
    1. Make sure to select Use for Spark table metadata under AWS Glue Data Catalog settings.

  1. Add the following steps to EMR cluster.
aws emr add-steps --cluster-id <<cluster id created above>> --steps 'Type=CUSTOM_JAR,Name="boto3",ActionOnFailure=CONTINUE,Jar=command-runner.jar,Args=[bash,-c,"sudo pip3 install boto3"]'
aws emr add-steps --cluster-id <<cluster id created above>> --steps 'Name="sample_oozie_job_name",Jar="command-runner.jar",Args=[spark-submit,--py-files,s3://unique-code-bucket-name-#####/framework/code.zip,s3://unique-code-bucket-name-#####/framework/etl_driver.py,--step_id,sample_oozie_job_name#step1,--job_run_id,sample_oozie_job_name#step1#2022-01-01-12-00-01,  --code_bucket=s3://unique-code-bucket-name-#####/DW,--metadata_table=dw-etl-metadata,--log_table_name=dw-etl-pipelinelog,--sql_parameters,DATE=2022-02-02::HOUR=12::code_bucket=s3://unique-code-bucket-name-#####]' # Change unique-code-bucket-name to a valid bucket name

The following table summarizes the parameters for the spark step.

Step type Spark Application
Name Any Name
Deploy mode Client
Spark-submit options --py-files s3://unique-code-bucket-name-#####/framework/code.zip
Application location s3://unique-code-bucket-name-####/framework/etl_driver.py
Arguments --step_id sample_oozie_job_name#step1 --job_run_id sample_oozie_job_name#step1#2022-01-01-12-00-01 --code_bucket=s3://unique-code-bucket-name-#######/DW --metadata_table=dw-etl-metadata --log_table_name=dw-etl-pipelinelog --sql_parameters DATE=2022-02-02::HOUR=12::code_bucket=s3://unique-code-bucket-name-#######
Action on failure Continue

The following table summarizes the script arguments.

Script Argument Argument Description
deploy-mode Spark deploy mode. Client/Cluster.
name <jobname>#<stepname> Unique name for the Spark job. This can be used to identify the job on the Spark History UI.
py-files <s3 path for code>/code.zip S3 path for the code.
<s3 path for code>/etl_driver.py S3 path for the driver module. This is the entry point for the solution.
step_id <jobname>#<stepname> Unique name for the step. This refers to the step_id in the metadata entered in DynamoDB.
job_run_id <random UUID> Unique ID to identify the log entries in DynamoDB.
log_table_name <DynamoDB Log table name> DynamoDB table for storing the job run details.
code_bucket <s3 bucket> S3 path for the SQL files that are uploaded in the job setup.
metadata_table <DynamoDB Metadata table name> DynamoDB table for storing the job metadata.
sql_parameters DATE=2022-07-04::HOUR=00 Any additional or dynamic parameters expected by the SQL files.


After completion of EMR step you should have data on S3 bucket for the table base.states_daily. We can validate the data by querying the table base.states_daily in Athena.

Congratulations, you have converted an Oozie Hive job to Spark and run on Amazon EMR successfully.

Solution highlights

This solution has the following benefits:

  • It avoids boilerplate code for any new pipeline and offers less maintenance of code
  • Onboarding any new pipeline only needs the metadata set up—the DynamoDB entries and SQL to be placed in the S3 path
  • Any common modifications or enhancements can be done at the solution level, which will be reflected across all jobs
  • DynamoDB metadata provides insight into all active jobs and their optimized runtime parameters
  • For each run, this solution persists the SQL start time, end time, and status in a log table to identify issues and analyze runtimes
  • It supports Spark SQL and UDF functionality. Custom UDFs can be provided externally to the spark submit command


This method has the following limitations:

  • The solution only supports SQL queries on Spark
  • Each SQL should be a Spark action like insert, create, drop, and so on

In this post, we explained the scenario of migrating an existing Oozie job. We can use the PySpark solution independently for any new development by creating DynamoDB entries and SQL files.

Clean up

Delete all the resources created as part of this solution to avoid ongoing charges for the resources:

  1. Delete the DynamoDB tables:
aws dynamodb delete-table --table-name dw-etl-metadata --region us-east-1
aws dynamodb delete-table --table-name dw-etl-pipelinelog --region us-east-1
  1. Delete the S3 bucket:
aws s3 rm s3://$s3_bucket_name --region us-east-1 --recursive
aws s3api create-bucket --bucket $s3_bucket_name --region us-east-1

  1. Stop the EMR cluster if it wasn’t a transient cluster:
aws emr terminate-clusters --cluster-ids <<cluster id created above>> 


In this post, we presented two automated solutions: one for parsing Oozie workflows and HiveQL files to generate metadata, and a PySpark solution for running SQLs using generated metadata. We successfully implemented these solutions to migrate a Hive workload to EMR Spark for a major gaming customer and achieved about 60% effort reduction.

For a Hive with Oozie to Spark migration, these solutions help complete the code conversion quickly so you can focus on performance benchmark and testing. Developing a new pipeline is also quick—you only need to create SQL logic, test it using Spark (shell or notebook), add metadata to DynamoDB, and test via the PySpark SQL solution. Overall, you can use the solution in this post to accelerate Hive to Spark code migration.

About the authors

Vinay Kumar Khambhampati is a Lead Consultant with the AWS ProServe Team, helping customers with cloud adoption. He is passionate about big data and data analytics.

Sandeep Singh is a Lead Consultant at AWS ProServe, focused on analytics, data lake architecture, and implementation. He helps enterprise customers migrate and modernize their data lake and data warehouse using AWS services.

Amol Guldagad is a Data Analytics Consultant based in India. He has worked with customers in different industries like banking and financial services, healthcare, power and utilities, manufacturing, and retail, helping them solve complex challenges with large-scale data platforms. At AWS ProServe, he helps customers accelerate their journey to the cloud and innovate using AWS analytics services.

How CyberSolutions built a scalable data pipeline using Amazon EMR Serverless and the AWS Data Lab

Post Syndicated from Constantin Scoarță original https://aws.amazon.com/blogs/big-data/how-cybersolutions-built-a-scalable-data-pipeline-using-amazon-emr-serverless-and-the-aws-data-lab/

This post is co-written by Constantin Scoarță and Horațiu Măiereanu from CyberSolutions Tech.

CyberSolutions is one of the leading ecommerce enablers in Germany. We design, implement, maintain, and optimize award-winning ecommerce platforms end to end. Our solutions are based on best-in-class software like SAP Hybris and Adobe Experience Manager, and complemented by unique services that help automate the pricing and sourcing processes.

We have built data pipelines to process, aggregate, and clean our data for our forecasting service. With the growing interest in our services, we wanted to scale our batch-based data pipeline to process more historical data on a daily basis and yet remain performant, cost-efficient, and predictable. To meet our requirements, we have been exploring the use of Amazon EMR Serverless as a potential solution.

To accelerate our initiative, we worked with the AWS Data Lab team. They offer joint engineering engagements between customers and AWS technical resources to create tangible deliverables that accelerate data and analytics initiatives. We chose to work through a Build Lab, which is a 2–5-day intensive build with a technical customer team.

In this post, we share how we engaged with the AWS Data Lab program to build a scalable and performant data pipeline using EMR Serverless.

Use case

Our forecasting and recommendation algorithm is fed with historical data, which needs to be curated, cleaned, and aggregated. Our solution was based on AWS Glue workflows orchestrating a set of AWS Glue jobs, which worked fine for our requirements. However, as our use case developed, it required more computations and bigger datasets, resulting into unpredictable performance and cost.

This pipeline performs daily extracts from our data warehouse and a few other systems, curates the data, and does some aggregations (such as daily average). Those will be consumed by our internal tools and generate recommendations accordingly. Prior to the engagement, the pipeline was processing 28 days’ worth of historical data in approximately 70 minutes. We wanted to extend that to 100 days and 365 days of data without having to extend the extraction window or factor in the resources configured.

Solution overview

While working with the Data Lab team, we decided to structure our efforts into two approaches. As a short-term improvement, we were looking into optimizing the existing pipeline based on AWS Glue extract, transform, and load (ETL) jobs, orchestrated via AWS Glue workflows. However, for the mid-term to long-term, we looked at EMR Serverless to run our forecasting data pipeline.

EMR Serverless is an option in Amazon EMR that makes it easy and cost-effective for data engineers and analysts to run petabyte-scale data analytics in the cloud. With EMR Serverless, we could run applications built using open-source frameworks such as Apache Spark (as in our case) without having to configure, manage, optimize, or secure clusters. The following factors influenced our decision to use EMR Serverless:

  • Our pipeline had minimal dependency on the AWS Glue context and its features, instead running native Apache Spark
  • EMR Serverless offers configurable drivers and workers
  • With EMR Serverless, we were able to take advantage of its cost tracking feature for applications
  • The need for managing our own Spark History Server was eliminated because EMR Serverless automatically creates a monitoring Spark UI for each job

Therefore, we planned the lab activities to be categorized as follows:

  • Improve the existing code to be more performant and scalable
  • Create an EMR Serverless application and adapt the pipeline
  • Run the entire pipeline with different date intervals

The following solution architecture depicts the high-level components we worked with during the Build Lab.

In the following sections, we dive into the lab implementation in more detail.

Improve the existing code

After examining our code decisions, we identified a step in our pipeline that consumed the most time and resources, and we decided to focus on improving it. Our target job for this optimization was the “Create Moving Average” job, which involves computing various aggregations such as averages, medians, and sums on a moving window. Initially, this step took around 4.7 minutes to process an interval of 28 days. However, running the job for larger datasets proved to be challenging – it didn’t scale well and even resulted in errors in some cases.

While reviewing our code, we focused on several areas, including checking data frames at certain steps to ensure that they contained content before proceeding. Initially, we used the count() API to achieve this, but we discovered that head() was a better alternative because it returns the first n rows only and is faster than count() for large input data. With this change, we were able to save around 15 seconds when processing 28 days’ worth of data. Additionally, we optimized our output writing by using coalesce() instead of repartition().

These changes managed to shave off some time, down to 4 minutes per run. However, we could achieve a better performance by using cache() on data frames before performing the aggregations, which materializes the data frame upon the following transformation. Additionally, we used unpersist() to free up executors’ memory after we were done with the mentioned aggregations. This led to a runtime of approximately 3.5 minutes for this job.

Following the successful code improvements, we managed to extend the data input to 100 days, 1 year, and 3 years. For this specific job, the coalesce() function wasn’t avoiding the shuffle operation and caused uneven data distribution per executor, so we switched back to repartition() for this job. By the end, we managed to get successful runs in 4.7, 12, and 57 minutes, using the same number of workers in AWS Glue (10 standard workers).

Adapt code to EMR Serverless

To observe if running the same job in EMR Serverless would yield better results, we configured an application that uses a comparable number of executors as in AWS Glue jobs. In the job configurations, we used 2 cores and 6 GB of memory for the driver and 20 executors with 4 cores and 16 GB of memory. However, we didn’t use additional ephemeral storage (by default, workers come with free 20 GB).

By the time we had the Build Lab, AWS Glue supported Apache Spark 3.1.1; however, we opted to use Spark 3.2.0 (Amazon EMR version 6.6.0) instead. Additionally, during the Build Lab, only x86_64 EMR Serverless applications were available, although it now also supports arm64-based architecture.

We adapted the code utilizing AWS Glue context to work with native Apache Spark. For instance, we needed to overwrite existing partitions and sync updates with the AWS Glue Data Catalog, especially when old partitions were replaced and new ones were added. We achieved this by setting spark.conf.set("spark.sql.sources.partitionOverwriteMode", "DYNAMIC") and using an MSCK REPAIR query to sync the relevant table. Similarly, we replaced the read and write operations to rely on Apache Spark APIs.

During the tests, we intentionally disabled the fine-grained auto scaling feature of EMR Serverless while running jobs, in order to observe how the code would perform with the same number of workers but different date intervals. We achieved that by setting spark.dynamicAllocation.enabled to disabled (the default is true).

For the same code, number of workers, and data inputs, we managed to get better performance results with EMR Serverless, which were 2.5, 2.9, 6, and 16 minutes for 28 days, 100 days, 1 year, and 3 years, respectively.

Run the entire pipeline with different date intervals

Because the code for our jobs was implemented in a modular fashion, we were able to quickly test all of them with EMR Serverless and then link them together to orchestrate the pipeline via Amazon Managed Workflows for Apache Airflow (Amazon MWAA).

Regarding performance, our previous pipeline using AWS Glue took around 70 minutes to run with our regular workload. However, our new pipeline, powered by Amazon MWAA-backed EMR Serverless, achieved similar results in approximately 60 minutes. Although this is a notable improvement, the most significant benefit was our ability to scale up to process larger amounts of data using the same number of workers. For instance, processing 1 year’s worth of data only took around 107 minutes to complete.

Conclusion and key takeaways

In this post, we outlined the approach taken by the CyberSolutions team in conjunction with the AWS Data Lab to create a high-performing and scalable demand forecasting pipeline. By using optimized Apache Spark jobs on customizable EMR Serverless workers, we were able to surpass the performance of our previous workflow. Specifically, the new setup resulted in 50–72% better performance for most jobs when processing 100 days of data, resulting in an overall cost savings of around 38%.

EMR Serverless applications’ features helped us have better control over cost. For example, we configured the pre-initialized capacity, which resulted in job start times of 1–4 seconds. And we set up the application behavior to start with the first submitted job and automatically stop after a configurable idle time.

As a next step, we are actively testing AWS Graviton2-based EMR applications, which come with more performance gains and lower cost.

About the Authors

 Constantin Scoarță is a Software Engineer at CyberSolutions Tech. He is mainly focused on building data cleaning and forecasting pipelines. In his spare time, he enjoys hiking, cycling, and skiing.

Horațiu Măiereanu is the Head of Python Development at CyberSolutions Tech. His team builds smart microservices for ecommerce retailers to help them improve and automate their workloads. In his free time, he likes hiking and traveling with his family and friends.

Ahmed Ewis is a Solutions Architect at the AWS Data Lab. He helps AWS customers design and build scalable data platforms using AWS database and analytics services. Outside of work, Ahmed enjoys playing with his child and cooking.

Amazon EMR on EKS widens the performance gap: Run Apache Spark workloads 5.37 times faster and at 4.3 times lower cost

Post Syndicated from Melody Yang original https://aws.amazon.com/blogs/big-data/amazon-emr-on-eks-widens-the-performance-gap-run-apache-spark-workloads-5-37-times-faster-and-at-4-3-times-lower-cost/

Amazon EMR on EKS provides a deployment option for Amazon EMR that allows organizations to run open-source big data frameworks on Amazon Elastic Kubernetes Service (Amazon EKS). With EMR on EKS, Spark applications run on the Amazon EMR runtime for Apache Spark. This performance-optimized runtime offered by Amazon EMR makes your Spark jobs run fast and cost-effectively. Also, you can run other types of business applications, such as web applications and machine learning (ML) TensorFlow workloads, on the same EKS cluster. EMR on EKS simplifies your infrastructure management, maximizes resource utilization, and reduces your cost.

We have been continually improving the Spark performance in each Amazon EMR release to further shorten job runtime and optimize users’ spending on their Amazon EMR big data workloads. As of the Amazon EMR 6.5 release in January 2022, the optimized Spark runtime was 3.5 times faster than OSS Spark v3.1.2 with up to 61% lower costs. Amazon EMR 6.10 is now 1.59 times faster than Amazon EMR 6.5, which has resulted in 5.37 times better performance than OSS Spark v3.3.1 with 76.8% cost savings.

In this post, we describe the benchmark setup and results on top of the EMR on EKS environment. We also share a Spark benchmark solution that suits all Amazon EMR deployment options, so you can replicate the process in your environment for your own performance test cases. The solution uses the TPC-DS dataset and unmodified data schema and table relationships, but derives queries from TPC-DS to support the SparkSQL test cases. It is not comparable to other published TPC-DS benchmark results.

Benchmark setup

To compare with the EMR on EKS 6.5 test result detailed in the post Amazon EMR on Amazon EKS provides up to 61% lower costs and up to 68% performance improvement for Spark workloads, this benchmark for the latest release (Amazon EMR 6.10) uses the same approach: a TPC-DS benchmark framework and the same size of TPC-DS input dataset from an Amazon Simple Storage Service (Amazon S3) location. For the source data, we chose the 3 TB scale factor, which contains 17.7 billion records, approximately 924 GB compressed data in Parquet file format. The setup instructions and technical details can be found in the aws-sample repository.

In summary, the entire performance test job includes 104 SparkSQL queries and was completed in approximately 24 minutes (1,397.55 seconds) with an estimated running cost of $5.08 USD. The input data and test result outputs were both stored on Amazon S3.

The job has been configured with the following parameters that match with the previous Amazon EMR 6.5 test:

  • EMR release – EMR 6.10.0
  • Hardware:
    • Compute – 6 X c5d.9xlarge instances, 216 vCPU, 432 GiB memory in total
    • Storage – 6 x 900 NVMe SSD build-in storage
    • Amazon EBS root volume – 6 X 20GB gp2
  • Spark configuration:
    • Driver pod – 1 instance among other 7 executors on a shared Amazon Elastic Compute Cloud (Amazon EC2) node:
      • spark.driver.cores=4
      • spark.driver.memory=5g
      • spark.kubernetes.driver.limit.cores=4.1
    • Executor pod – 47 instances distributed over 6 EC2 nodes
      • spark.executor.cores=4
      • spark.executor.memory=6g
      • spark.executor.memoryOverhead=2G
      • spark.kubernetes.executor.limit.cores=4.3
  • Metadata store – We use Spark’s in-memory data catalog to store metadata for TPC-DS databases and tables—spark.sql.catalogImplementation is set to the default value in-memory. The fact tables are partitioned by the date column, which consists of partitions ranging from 200–2,100. No statistics are pre-calculated for these tables.


A single test session consists of 104 Spark SQL queries that were run sequentially. We ran each Spark runtime session (EMR runtime for Apache Spark, OSS Apache Spark) three times. The Spark benchmark job produces a CSV file to Amazon S3 that summarizes the median, minimum, and maximum runtime for each individual query.

The way we calculate the final benchmark results (geomean and the total job runtime) are based on arithmetic means. We take the mean of the median, minimum, and maximum values per query using the formula of AVERAGE(), for example AVERAGE(F2:H2). Then we take a geometric mean of the average column I by the formula GEOMEAN(I2:I105) and SUM(I2:I105) for the total runtime.

Previously, we observed that EMR on EKS 6.5 is 3.5 times faster than OSS Spark on EKS, and costs 2.6 times less. From this benchmark, we found that the gap has widened: EMR on EKS 6.10 now provides a 5.37 times performance improvement on average and up to 11.61 times improved performance for individual queries over OSS Spark 3.3.1 on Amazon EKS. From the running cost perspective, we see the significant reduction by 4.3 times.

The following graph shows the performance improvement of Amazon EMR 6.10 compared to OSS Spark 3.3.1 at the individual query level. The X-axis shows the name of each query, and the Y-axis shows the total runtime in seconds on logarithmic scale. The most significant performance gains for eight queries (q14a, q14b, q23b, q24a, q24b, q4, q67, q72) demonstrated over 10 times faster for the runtime.

Job cost estimation

The cost estimate doesn’t account for Amazon S3 storage, or PUT and GET requests. The Amazon EMR on EKS uplift calculation is based on the hourly billing information provided by AWS Cost Explorer.

  • c5d.9xlarge hourly price – $1.728
  • Number of EC2 instances – 6
  • Amazon EBS storage per GB-month – $0.10
  • Amazon EBS gp2 root volume – 20GB
  • Job run time (hour)
    • OSS Spark 3.3.1 – 2.09
    • EMR on EKS 6.5.0 – 0.68
    • EMR on EKS 6.10.0 – 0.39
Cost component OSS Spark 3.3.1 on EKS EMR on EKS 6.5.0 EMR on EKS 6.10.0
Amazon EC2 $21.67 $7.05 $4.04
EMR on EKS $ – $1.57 $0.99
Amazon EKS $0.21 $0.07 $0.04
Amazon EBS root volume $0.03 $0.01 $0.01
Total $21.88 $8.70 $5.08

Performance enhancements

Although we improve on Amazon EMR’s performance with each release, Amazon EMR 6.10 contained many performance optimizations, making it 5.37 times faster than OSS Spark v3.3.1 and 1.59 times faster than our first release of 2022, Amazon EMR 6.5. This additional performance boost was achieved through the addition of multiple optimizations, including:

  • Enhancements to join performance, such as the following:
    • Shuffle-Hash Joins (SHJ) are more CPU and I/O efficient than Shuffle-Sort-Merge Joins (SMJ) when the costs of building and probing the hash table, including the availability of memory, are less than the cost of sorting and performing the merge join. However, SHJs have drawbacks, such as risk of out of memory errors due to its inability to spill to disk, which prevents them from being aggressively used across Spark in place of SMJs by default. We have optimized our use of SHJs so that they can be applied to more queries by default than in OSS Spark.
    • For some query shapes, we have eliminated redundant joins and enabled the use of more performant join types.
  • We have reduced the amount of data shuffled before joins and the potential for data explosions after joins by selectively pushing down aggregates through joins.
  • Bloom filters can improve performance by reducing the amount of data shuffled before the join. However, there are cases where bloom filters are not beneficial and can even regress performance. For example, the bloom filter introduces a dependency between stages that reduces query parallelism, but may end up filtering out relatively little data. Our enhancements allow bloom filters to be safely applied to more query plans than OSS Spark.
  • Aggregates with high-precision decimals are computationally intensive in OSS Spark. We optimized high-precision decimal computations to increasing their performance.


With version 6.10, Amazon EMR has further enhanced the EMR runtime for Apache Spark in comparison to our previous benchmark tests for Amazon EMR version 6.5. When running EMR workloads with the the equivalent Apache Spark version 3.3.1, we observed 1.59 times better performance with 41.6% cheaper costs than Amazon EMR 6.5.

With our TPC-DS benchmark setup, we observed a significant performance increase of 5.37 times and a cost reduction of 4.3 times using EMR on EKS compared to OSS Spark.

To learn more and get started with EMR on EKS, try out the EMR on EKS Workshop and visit the EMR on EKS Best Practices Guide page.

About the Authors

Melody YangMelody Yang is a Senior Big Data Solution Architect for Amazon EMR at AWS. She is an experienced analytics leader working with AWS customers to provide best practice guidance and technical advice in order to assist their success in data transformation. Her areas of interests are open-source frameworks and automation, data engineering and DataOps.

Ashok Chintalapati is a software development engineer for Amazon EMR at Amazon Web Services.

Push Amazon EMR step logs from Amazon EC2 instances to Amazon CloudWatch logs

Post Syndicated from Nausheen Sayed original https://aws.amazon.com/blogs/big-data/push-amazon-emr-step-logs-from-amazon-ec2-instances-to-amazon-cloudwatch-logs/

Amazon EMR is a big data service offered by AWS to run Apache Spark and other open-source applications on AWS to build scalable data pipelines in a cost-effective manner. Monitoring the logs generated from the jobs deployed on EMR clusters is essential to help detect critical issues in real time and identify root causes quickly.

Pushing those logs into Amazon CloudWatch enables you to centralize and drive actionable intelligence from your logs to address operational issues without needing to provision servers or manage software. You can instantly begin writing queries with aggregations, filters, and regular expressions. In addition, you can visualize time series data, drill down into individual log events, and export query results to CloudWatch dashboards.

To ingest logs that are persisted on the Amazon Elastic Compute Cloud (Amazon EC2) instances of an EMR cluster into CloudWatch, you can use the CloudWatch agent. This provides a simple way to push logs from an EC2 instance to CloudWatch.

The CloudWatch agent is a software package that autonomously and continuously runs on your servers. You can install and configure the CloudWatch agent to collect system and application logs from EC2 instances, on-premises hosts, and containerized applications. CloudWatch processes and stores the logs collected by the CloudWatch agent, which further helps with the performance and health monitoring of your infrastructure and applications.

In this post, we create an EMR cluster and centralize the EMR step logs of the jobs in CloudWatch. This will make it easier for you to manage your EMR cluster, troubleshoot issues, and monitor performance. This solution is particularly helpful if you want to use CloudWatch to collect and visualize real-time logs, metrics, and event data, streamlining your infrastructure and application maintenance.

Overview of solution

The solution presented in this post is based on a specific configuration where the EMR step concurrency level is set to 1. This means that only one step is run at a time on the cluster. It’s important to note that if the EMR step concurrency level is set to a value greater than 1, the solution may not work as expected. We highly recommend verifying your EMR step concurrency configuration before implementing the solution presented in this post.

The following diagram illustrates the solution architecture.

Solution Architecture Diagram

The workflow includes the following steps:

  1. Users start an Apache Spark EMR job, creating a step on the EMR cluster. Using Apache Spark, the workload is distributed across the different nodes of the EMR cluster.
  2. In each node (EC2 instance) of the cluster, a CloudWatch agent watches different logs directories, capturing new entries in the log files and pushing them to CloudWatch.
  3. Users can view the step logs accessing the different log groups from the CloudWatch console. The step logs written by Amazon EMR are as follows:
    • controller — Information about the processing of the step. If your step fails while loading, you can find the stack trace in this log.
    • stderr — The standard error channel of Spark while it processes the step.
    • stdout — The standard output channel of Spark while it processes the step.

We provide an AWS CloudFormation template in this post as a general guide. The template demonstrates how to configure a CloudWatch agent on Amazon EMR to push Spark logs to CloudWatch. You can review and customize it as needed to include your Amazon EMR security configurations. As a best practice, we recommend including your Amazon EMR security configurations in the template to encrypt data in transit.

You should also be aware that some of the resources deployed by this stack incur costs when they remain in use.

In the next sections, we go through the following steps:

  1. Create and upload the bootstrap script to an Amazon Simple Storage Service (Amazon S3) bucket.
  2. Use the CloudFormation template to create the following resources:
  3. Monitor the Spark logs on the CloudWatch console.


This post assumes that you have the following:

Create and upload the bootstrap script to an S3 bucket

For more information, see Uploading objects and Installing and running the CloudWatch agent on your servers.

To create and the upload the bootstrap script, complete the following steps:

  1. Create a local file named bootstrap_cloudwatch_agent.sh with the following content:
    echo -e 'Installing CloudWatch Agent... \n'
    sudo rpm -Uvh --force https://s3.amazonaws.com/amazoncloudwatch-agent/amazon_linux/amd64/latest/amazon-cloudwatch-agent.rpm
    echo -e 'Starting CloudWatch Agent... \n'
    sudo amazon-cloudwatch-agent-ctl -a fetch-config -m ec2 -c ssm:AmazonCloudWatch-Config.json -s

  2. On the Amazon S3 console, choose your S3 bucket.
  3. On the Objects tab, choose Upload.
  4. Choose Add files, then choose the bootstrap script.
  5. Choose Upload, then choose the file name: bootstrap_cloudwatch_agent.sh.
  6. Choose Copy S3 URI. We use this value in a later step.

Provision resources with the CloudFormation template

Choose Launch Stack to launch a CloudFormation stack in your account and deploy the template:

This template creates an IAM role, IAM instance profile, Systems Manager parameter, and EMR cluster. The cluster starts the Spark PI estimation example application. You will be billed for the AWS resources used if you create a stack from this template.

The CloudFormation wizard will ask you to modify or provide these parameters:

  • InstanceType – The type of instance for all instance groups. The default is m4.xlarge.
  • InstanceCountCore – The number of instances in the core instance group. The default is 2.
  • EMRReleaseLabel – The Amazon EMR release label you want to use. The default is emr-6.9.0.
  • BootstrapScriptPath – The S3 path of your CloudWatch agent installation bootstrap script that you copied earlier.
  • Subnet – The EC2 subnet where the cluster launches. You must provide this parameter.
  • EC2KeyPairName – An optional EC2 keypair for connecting to cluster nodes, as an alternative to Session Manager.

Monitor the log streams

After the CloudFormation stack deploys successfully, on the CloudWatch console, choose Log groups in the navigation pane. Then filter the log groups by the prefix /aws/emr/master.

choose Log groups in the navigation pane

The ID in the log group corresponds to the EC2 instance ID of the EMR primary node. If you have multiple EMR clusters, you can use this ID to identify a particular EMR cluster, based on the primary node ID.

In the log group, you will find the three different log streams.

In the log group, you will find the three different log streams.

The log streams contain the following information:

  • step-stdout – The standard output channel of Spark while it processes the step.
    The standard output channel of Spark while it processes the step
  • step-stderr – The standard error channel of Spark while it processes the step.
    The standard error channel of Spark while it processes the step.
  • step-controller – Information about the processing of the step. If your step fails while loading, you can find the stack trace in this log.
    Information about the processing of the step.

Clean up

To avoid future charges in your account, delete the resources you created in this walkthrough. The EMR cluster will incur charges as long as the cluster is active, so stop it when you’re done.

  1. On the CloudFormation console, in the navigation pane, choose Stacks.
  2. Choose the stack you launched (EMR-CloudWatch-Demo), then choose Delete.
  3. Empty the S3 bucket you created.
  4. Delete the S3 bucket you created.


Now that you have completed the steps in this walkthrough, you have the CloudWatch agent running on your cluster hosts and configured to push EMR step logs to CloudWatch. With this feature, you can effectively monitor the health and performance of your Spark jobs running on Amazon EMR, detecting critical issues in real time and identifying root causes quickly.

You can package and deploy this solution through a CloudFormation template like this example template, which creates the IAM instance profile role, Systems Manager parameter, and EMR cluster.

To take this further, consider using these logs in CloudWatch alarms for alerts on a log group-metric filter. You could collect them with other alarms into a composite alarm or configure alarm actions such as sending Amazon Simple Notification Service (Amazon SNS) notifications to trigger event-driven processes such as AWS Lambda functions.

About the Author

Ennio Pastore is a Senior Data Architect on the AWS Data Lab team. He is an enthusiast of everything related to new technologies that have a positive impact on businesses and general livelihood. Ennio has over 10 years of experience in data analytics. He helps companies define and implement data platforms across industries, such as telecommunications, banking, gaming, retail, and insurance.

AWS Week in Review – March 20, 2023

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/aws-week-in-review-march-20-2023/

This post is part of our Week in Review series. Check back each week for a quick roundup of interesting news and announcements from AWS!

A new week starts, and Spring is almost here! If you’re curious about AWS news from the previous seven days, I got you covered.

Last Week’s Launches
Here are the launches that got my attention last week:

Picture of an S3 bucket and AWS CEO Adam Selipsky.Amazon S3 – Last week there was AWS Pi Day 2023 celebrating 17 years of innovation since Amazon S3 was introduced on March 14, 2006. For the occasion, the team released many new capabilities:

Amazon Linux 2023 – Our new Linux-based operating system is now generally available. Sébastien’s post is full of tips and info.

Application Auto Scaling – Now can use arithmetic operations and mathematical functions to customize the metrics used with Target Tracking policies. You can use it to scale based on your own application-specific metrics. Read how it works with Amazon ECS services.

AWS Data Exchange for Amazon S3 is now generally available – You can now share and find data files directly from S3 buckets, without the need to create or manage copies of the data.

Amazon Neptune – Now offers a graph summary API to help understand important metadata about property graphs (PG) and resource description framework (RDF) graphs. Neptune added support for Slow Query Logs to help identify queries that need performance tuning.

Amazon OpenSearch Service – The team introduced security analytics that provides new threat monitoring, detection, and alerting features. The service now supports OpenSearch version 2.5 that adds several new features such as support for Point in Time Search and improvements to observability and geospatial functionality.

AWS Lake Formation and Apache Hive on Amazon EMR – Introduced fine-grained access controls that allow data administrators to define and enforce fine-grained table and column level security for customers accessing data via Apache Hive running on Amazon EMR.

Amazon EC2 M1 Mac Instances – You can now update guest environments to a specific or the latest macOS version without having to tear down and recreate the existing macOS environments.

AWS Chatbot – Now Integrates With Microsoft Teams to simplify the way you troubleshoot and operate your AWS resources.

Amazon GuardDuty RDS Protection for Amazon Aurora – Now generally available to help profile and monitor access activity to Aurora databases in your AWS account without impacting database performance

AWS Database Migration Service – Now supports validation to ensure that data is migrated accurately to S3 and can now generate an AWS Glue Data Catalog when migrating to S3.

AWS Backup – You can now back up and restore virtual machines running on VMware vSphere 8 and with multiple vNICs.

Amazon Kendra – There are new connectors to index documents and search for information across these new content: Confluence Server, Confluence Cloud, Microsoft SharePoint OnPrem, Microsoft SharePoint Cloud. This post shows how to use the Amazon Kendra connector for Microsoft Teams.

For a full list of AWS announcements, be sure to keep an eye on the What’s New at AWS page.

Other AWS News
A few more blog posts you might have missed:

Example of a geospatial query.Women founders Q&A – We’re talking to six women founders and leaders about how they’re making impacts in their communities, industries, and beyond.

What you missed at that 2023 IMAGINE: Nonprofit conference – Where hundreds of nonprofit leaders, technologists, and innovators gathered to learn and share how AWS can drive a positive impact for people and the planet.

Monitoring load balancers using Amazon CloudWatch anomaly detection alarms – The metrics emitted by load balancers provide crucial and unique insight into service health, service performance, and end-to-end network performance.

Extend geospatial queries in Amazon Athena with user-defined functions (UDFs) and AWS Lambda – Using a solution based on Uber’s Hexagonal Hierarchical Spatial Index (H3) to divide the globe into equally-sized hexagons.

How cities can use transport data to reduce pollution and increase safety – A guest post by Rikesh Shah, outgoing head of open innovation at Transport for London.

For AWS open-source news and updates, here’s the latest newsletter curated by Ricardo to bring you the most recent updates on open-source projects, posts, events, and more.

Upcoming AWS Events
Here are some opportunities to meet:

AWS Public Sector Day 2023 (March 21, London, UK) – An event dedicated to helping public sector organizations use technology to achieve more with less through the current challenging conditions.

Women in Tech at Skills Center Arlington (March 23, VA, USA) – Let’s celebrate the history and legacy of women in tech.

The AWS Summits season is warming up! You can sign up here to know when registration opens in your area.

That’s all from me for this week. Come back next Monday for another Week in Review!


Build a serverless transactional data lake with Apache Iceberg, Amazon EMR Serverless, and Amazon Athena

Post Syndicated from Houssem Chihoub original https://aws.amazon.com/blogs/big-data/build-a-serverless-transactional-data-lake-with-apache-iceberg-amazon-emr-serverless-and-amazon-athena/

Since the deluge of big data over a decade ago, many organizations have learned to build applications to process and analyze petabytes of data. Data lakes have served as a central repository to store structured and unstructured data at any scale and in various formats. However, as data processing at scale solutions grow, organizations need to build more and more features on top of their data lakes. One important feature is to run different workloads such as business intelligence (BI), Machine Learning (ML), Data Science and data exploration, and Change Data Capture (CDC) of transactional data, without having to maintain multiple copies of data. Additionally, the task of maintaining and managing files in the data lake can be tedious and sometimes complex.

Table formats like Apache Iceberg provide solutions to these issues. They enable transactions on top of data lakes and can simplify data storage, management, ingestion, and processing. These transactional data lakes combine features from both the data lake and the data warehouse. You can simplify your data strategy by running multiple workloads and applications on the same data in the same location. However, using these formats requires building, maintaining, and scaling infrastructure and integration connectors that can be time-consuming, challenging, and costly.

In this post, we show how you can build a serverless transactional data lake with Apache Iceberg on Amazon Simple Storage Service (Amazon S3) using Amazon EMR Serverless and Amazon Athena. We provide an example for data ingestion and querying using an ecommerce sales data lake.

Apache Iceberg overview

Iceberg is an open-source table format that brings the power of SQL tables to big data files. It enables ACID transactions on tables, allowing for concurrent data ingestion, updates, and queries, all while using familiar SQL. Iceberg employs internal metadata management that keeps track of data and empowers a set of rich features at scale. It allows you to time travel and roll back to old versions of committed data transactions, control the table’s schema evolution, easily compact data, and employ hidden partitioning for fast queries.

Iceberg manages files on behalf of the user and unlocks use cases such as:

  • Concurrent data ingestion and querying, including streaming and CDC
  • BI and reporting with expressive simple SQL
  • Empowering ML feature stores and training sets
  • Compliance and regulations workloads, such as GDPR find and forget
  • Reinstating late-arriving data, which is dimensions data arriving later than the fact data. For example, the reason for a flight delay may arrive well after the fact that the fligh is delayed.
  • Tracking data changes and rollback

Build your transactional data lake on AWS

You can build your modern data architecture with a scalable data lake that integrates seamlessly with an Amazon Redshift powered cloud warehouse. Moreover, many customers are looking for an architecture where they can combine the benefits of a data lake and a data warehouse in the same storage location. In the following figure, we show a comprehensive architecture that uses the modern data architecture strategy on AWS to build a fully featured transactional data lake. AWS provides flexibility and a wide breadth of features to ingest data, build AI and ML applications, and run analytics workloads without having to focus on the undifferentiated heavy lifting.

Data can be organized into three different zones, as shown in the following figure. The first zone is the raw zone, where data can be captured from the source as is. The transformed zone is an enterprise-wide zone to host cleaned and transformed data in order to serve multiple teams and use cases. Iceberg provides a table format on top of Amazon S3 in this zone to provide ACID transactions, but also to allow seamless file management and provide time travel and rollback capabilities. The business zone stores data specific to business cases and applications aggregated and computed from data in the transformed zone.

One important aspect to a successful data strategy for any organization is data governance. On AWS, you can implement a thorough governance strategy with fine-grained access control to the data lake with AWS Lake Formation.

Serverless architecture overview

In this section, we show you how to ingest and query data in your transactional data lake in a few steps. EMR Serverless is a serverless option that makes it easy for data analysts and engineers to run Spark-based analytics without configuring, managing, and scaling clusters or servers. You can run your Spark applications without having to plan capacity or provision infrastructure, while paying only for your usage. EMR Serverless supports Iceberg natively to create tables and query, merge, and insert data with Spark. In the following architecture diagram, Spark transformation jobs can load data from the raw zone or source, apply the cleaning and transformation logic, and ingest data in the transformed zone on Iceberg tables. Spark code can run instantaneously on an EMR Serverless application, which we demonstrate later in this post.

The Iceberg table is synced with the AWS Glue Data Catalog. The Data Catalog provides a central location to govern and keep track of the schema and metadata. With Iceberg, ingestion, update, and querying processes can benefit from atomicity, snapshot isolation, and managing concurrency to keep a consistent view of data.

Athena is a serverless, interactive analytics service built on open-source frameworks, supporting open-table and file formats. Athena provides a simplified, flexible way to analyze petabytes of data where it lives. To serve BI and reporting analysis, it allows you to build and run queries on Iceberg tables natively and integrates with a variety of BI tools.

Sales data model

Star schema and its variants are very popular for modeling data in data warehouses. They implement one or more fact tables and dimension tables. The fact table stores the main transactional data from the business logic with foreign keys to dimensional tables. Dimension tables hold additional complementary data to enrich the fact table.

In this post, we take the example of sales data from the TPC-DS benchmark. We zoom in on a subset of the schema with the web_sales fact table, as shown in the following figure. It stores numeric values about sales cost, ship cost, tax, and net profit. Additionally, it has foreign keys to dimensional tables like date_dim, time_dim, customer, and item. These dimensional tables store records that give more details. For instance, you can show when a sale took place by which customer for which item.

Dimension-based models have been used extensively to build data warehouses. In the following sections, we show how to implement such a model on top of Iceberg, providing data warehousing features on top of your data lake, and run different workloads in the same location. We provide a complete example of building a serverless architecture with data ingestion using EMR Serverless and Athena using TPC-DS queries.


For this walkthrough, you should have the following prerequisites:

  • An AWS account
  • Basic knowledge about data management and SQL

Deploy solution resources with AWS CloudFormation

We provide an AWS CloudFormation template to deploy the data lake stack with the following resources:

  • Two S3 buckets: one for scripts and query results, and one for the data lake storage
  • An Athena workgroup
  • An EMR Serverless application
  • An AWS Glue database and tables on external public S3 buckets of TPC-DS data
  • An AWS Glue database for the data lake
  • An AWS Identity and Access Management (IAM) role and polices

Complete the following steps to create your resources:

  1. Launch the CloudFormation stack:

Launch Button

This automatically launches AWS CloudFormation in your AWS account with the CloudFormation template. It prompts you to sign in as needed.

  1. Keep the template settings as is.
  2. Check the I acknowledge that AWS CloudFormation might create IAM resources box.
  3. Choose Submit

When the stack creation is complete, check the Outputs tab of the stack to verify the resources created.

Upload Spark scripts to Amazon S3

Complete the following steps to upload your Spark scripts:

  1. Download the following scripts: ingest-iceberg.py and update-item.py.
  2. On the Amazon S3 console, go to the datalake-resources-<AccountID>-us-east-1 bucket you created earlier.
  3. Create a new folder named scripts.
  4. Upload the two PySpark scripts: ingest-iceberg.py and update-item.py.

Create Iceberg tables and ingest TPC-DS data

To create your Iceberg tables and ingest the data, complete the following steps:

  1. On the Amazon EMR console, choose EMR Serverless in the navigation pane.
  2. Choose Manage applications.
  3. Choose the application datalake-app.

  1. Choose Start application.

Once started, it will provision the pre-initialized capacity as configured at creation (one Spark driver and two Spark executors). The pre-initialized capacity are resources that will be provisioned when you start your application. They can be used instantly when you submit jobs. However, they incur charges even if they’re not used when the application is in a started state. By default, the application is set to stop when idle for 15 minutes.

Now that the EMR application has started, we can submit the Spark ingest job ingest-iceberg.py. The job creates the Iceberg tables and then loads data from the previously created AWS Glue Data Catalog tables on TPC-DS data in an external bucket.

  1. Navigate to the datalake-app.
  2. On the Job runs tab, choose Submit job.

  1. For Name, enter ingest-data.
  2. For Runtime role, choose the IAM role created by the CloudFormation stack.
  3. For Script location, enter the S3 path for your resource bucket (datalake-resource-<####>-us-east-1>scripts>ingest-iceberg.py).

  1. Under Spark properties, choose Edit in text.
  2. Enter the following properties, replacing <BUCKET_NAME> with your data lake bucket name datalake-<####>-us-east-1 (not datalake-resources)
--conf spark.executor.cores=2 --conf spark.executor.memory=4g --conf spark.driver.cores=2 --conf spark.driver.memory=8g --conf spark.executor.instances=2 --conf spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.dev.warehouse=s3://<BUCKET_NAME>/warehouse --conf spark.sql.catalog.dev=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.dev.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.glue_catalog.lock-impl=org.apache.iceberg.aws.glue.DynamoLockManager --conf spark.sql.catalog.glue_catalog.lock.table=myIcebergLockTab --conf spark.dynamicAllocation.maxExecutors=8 --conf spark.driver.maxResultSize=1G --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory

  1. Submit the job.

You can monitor the job progress.

Query Iceberg tables

In this section, we provide examples of data warehouse queries from TPC-DS on the Iceberg tables.

  1. On the Athena console, open the query editor.
  2. For Workgroup, switch to DatalakeWorkgroup.

  1. Choose Acknowledge.

The queries in DatalakeWorkgroup will run on Athena engine version 3.

  1. On the Saved queries tab, choose a query to run on your Iceberg tables.

The following queries are listed:

  • Query3 – Report the total extended sales price per item brand of a specific manufacturer for all sales in a specific month of the year.
  • Query45 – Report the total web sales for customers in specific zip codes, cities, counties, or states, or specific items for a given year and quarter.
  • Query52 – Report the total of extended sales price for all items of a specific brand in a specific year and month.
  • Query6 – List all the states with at least 10 customers who during a given month bought items with the price tag at least 20% higher than the average price of items in the same category.
  • Query75 – For 2 consecutive years, track the sales of items by brand, class, and category.
  • Query86a – Roll up the web sales for a given year by category and class, and rank the sales among peers within the parent. For each group, compute the sum of sales and location with the hierarchy and rank within the group.

These queries are examples of queries used in decision-making and reporting in an organization. You can run them in the order you want. For this post, we start with Query3.

  1. Before you run the query, confirm that Database is set to datalake.

  1. Now you can run the query.

  1. Repeat these steps to run the other queries.

Update the item table

After running the queries, we prepare a batch of updates and inserts of records into the item table.

  1. First, run the following query to count the number of records in the item Iceberg table:
SELECT count(*) FROM "datalake"."item_iceberg";

This should return 102,000 records.

  1. Select item records with a price higher than $90:
SELECT count(*) FROM "datalake"."item_iceberg" WHERE i_current_price > 90.0;

This will return 1,112 records.

The update-item.py job takes these 1,112 records, modifies 11 records to change the name of the brand to Unknown, and changes the remaining 1,101 records’ i_item_id key to flag them as new records. As a result, a batch of 11 updates and 1,101 inserts are merged into the item_iceberg table.

The 11 records to be updated are those with price higher than $90, and the brand name starts with corpnameless.

  1. Run the following query:
SELECT count(*) FROM "datalake"."item_iceberg" WHERE i_current_price > 90.0 AND i_brand LIKE 'corpnameless%';

The result is 11 records. The item_update.py job replaces the brand name with Unknown and merges the batch into the Iceberg table.

Now you can return to the EMR Serverless console and run the job on the EMR Serverless application.

  1. On the application details page, choose Submit job.
  2. For Name, enter update-item-job.
  3. For Runtime role¸ use the same role that you used previously.
  4. For S3 URI, enter the update-item.py script location.

  1. Under Spark properties, choose Edit in text.
  2. Enter the following properties, replacing the <BUCKET-NAME> with your own datalake-<####>-us-east-1:
--conf spark.executor.cores=2 --conf spark.executor.memory=8g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=2 --conf spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.dev=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.dev.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.glue_catalog.lock-impl=org.apache.iceberg.aws.glue.DynamoLockManager --conf spark.sql.catalog.glue_catalog.lock.table=myIcebergLockTab --conf spark.dynamicAllocation.maxExecutors=4 --conf spark.driver.maxResultSize=1G --conf spark.sql.catalog.dev.warehouse=s3://<BUCKET-NAME>/warehouse --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory

  1. Then submit the job.

  1. After the job finishes successfully, return to the Athena console and run the following query:
SELECT count(*) FROM "datalake"."item_iceberg";

The returned result is 103,101 = 102,000 + (1,112 – 11). The batch was merged successfully.

Time travel

To run a time travel query, complete the following steps:

  1. Get the timestamp of the job run via the application details page on the EMR Serverless console, or the Spark UI on the History Server, as shown in the following screenshot.

This time could be just minutes before you ran the update Spark job.

  1. Convert the timestamp from the format YYYY/MM/DD hh:mm:ss to YYYY-MM-DDThh:mm:ss.sTZD with time zone. For example, from 2023/02/20 14:40:41 to 2023-02-20 14:40:41.000 UTC.
  2. On the Athena console, run the following query to count the item table records at a time before the update job, replacing <TRAVEL_TIME> with your time:
SELECT count(*) FROM "datalake"."item_iceberg" FOR TIMESTAMP AS OF TIMESTAMP <TRAVEL_TIME>;

The query will give 102,000 as a result, the expected table size before running the update job.

  1. Now you can run a query with a timestamp after the successful run of the update job (for example, 2023-02-20 15:06:00.000 UTC):
SELECT count(*) FROM "datalake"."item_iceberg" FOR TIMESTAMP AS OF TIMESTAMP <TRAVEL_TIME>;

The query will now give 103,101 as the size of the table at that time, after the update job successfully finished.

Additionally, you can query in Athena based on the version ID of a snapshot in Iceberg. However, for more advanced use cases, such as to roll back to a given version or to find version IDs, you can use Iceberg’s SDK or Spark on Amazon EMR.

Clean up

Complete the following steps to clean up your resources:

  1. On the Amazon S3 console, empty your buckets.
  2. On the Athena console, delete the workgroup DatalakeWorkgroup.
  3. On the EMR Studio console, stop the application datalake-app.
  4. On the AWS CloudFormation console, delete the CloudFormation stack.


In this post, we created a serverless transactional data lake with Iceberg tables, EMR Serverless, and Athena. We used TPC-DS sales data with 10 GB data and more than 7 million records in the fact table. We demonstrated how straightforward it is to rely on SQL and Spark to run serverless jobs for data ingestion and upserts. Moreover, we showed how to run complex BI queries directly on Iceberg tables from Athena for reporting.

You can start building your serverless transactional data lake on AWS today, and dive deep into the features and optimizations Iceberg provides to build analytics applications more easily. Iceberg can also help you in the future to improve performance and reduce costs.

About the Author

Houssem is a Specialist Solutions Architect at AWS with a focus on analytics. He is passionate about data and emerging technologies in analytics. He holds a PhD on data management in the cloud. Prior to joining AWS, he worked on several big data projects and published several research papers in international conferences and venues.

Build incremental data pipelines to load transactional data changes using AWS DMS, Delta 2.0, and Amazon EMR Serverless

Post Syndicated from Sankar Sundaram original https://aws.amazon.com/blogs/big-data/build-incremental-data-pipelines-to-load-transactional-data-changes-using-aws-dms-delta-2-0-and-amazon-emr-serverless/

Building data lakes from continuously changing transactional data of databases and keeping data lakes up to date is a complex task and can be an operational challenge. A solution to this problem is to use AWS Database Migration Service (AWS DMS) for migrating historical and real-time transactional data into the data lake. You can then apply transformations and store data in Delta format for managing inserts, updates, and deletes.

Amazon EMR Serverless is a serverless option in Amazon EMR that makes it easy for data analysts and engineers to run open-source big data analytics frameworks without configuring, managing, and scaling clusters or servers. EMR Serverless automatically provisions and scales the compute and memory resources required by your applications, and you only pay for the resources that the applications use. EMR Serverless also provides you with more flexibility on overriding default Spark configurations, customizing EMR Serverless images, and customizing Spark driver and executor sizes to better suit specific workloads.

This post demonstrates how to implement a solution that uses AWS DMS to stream ongoing replication or change data capture (CDC) from an Amazon Aurora PostgreSQL-Compatible Edition database into Amazon Simple Storage Service (Amazon S3). We then apply transformations using Spark jobs on an EMR Serverless application and write transformed output into open-source Delta tables in Amazon S3. The Delta tables created by the EMR Serverless application are exposed through the AWS Glue Data Catalog and can be queried through Amazon Athena. Although this post uses an Aurora PostgreSQL database hosted on AWS as the data source, the solution can be extended to ingest data from any of the AWS DMS supported databases hosted on your data centers.

Solution overview

The following diagram shows the overall architecture of the solution that we implement in this post.

Architecture diagram

The solution consists of the following steps for implementing a full and incremental (CDC) data ingestion from a relational database:

  • Data storage and data generation – We create an Aurora PostgreSQL database and generate fictional trip data by running a stored procedure. The data will have attributes like trip ID (primary key), timestamp, source location, and destination location. Incremental data is generated in the PostgreSQL table by running custom SQL scripts.
  • Data ingestion – Steps 1 and 2 use AWS DMS, which connects to the source database and moves full and incremental data (CDC) to Amazon S3 in Parquet format. Let’s refer to this S3 bucket as the raw layer.
  • Data transformation – Steps 3 and 4 represent an EMR Serverless Spark application (Amazon EMR 6.9 with Apache Spark version 3.3.0) created using Amazon EMR Studio. The script reads input data from the S3 raw bucket, and then invokes Delta Lake’s MERGE statements to merge the data with the target S3 bucket (curated layer). The script also creates and updates a manifest file on Amazon S3 every time the job is run to enable data access from Athena and Amazon Redshift Spectrum.
  • Data access – The EMR Serverless job has code snippets that create a Delta table in the AWS Glue Data Catalog in Step 5. Steps 6 and 7 describe using Athena and Redshift Spectrum to query data from the Delta tables using standard SQL through the AWS Glue Data Catalog.
  • Data pipeline – Step 8 describes the process for triggering the data pipeline in a periodic manner through Airflow operators using Amazon Managed Workflows for Apache Airflow (Amazon MWAA). Refer to Submitting EMR Serverless jobs from Airflow for additional details. In this post, AWS DMS has been configured to replicate data from Amazon Aurora PostgreSQL-Compatible Edition into an S3 bucket with hourly partitions. The Airflow DAG can be configured to call an EMR Serverless job to process the past X hours of data based on specific project requirements. Implementation of the Airflow setup is not explored within the scope of this post.

The architecture has the following major features:

  • Reliability – The end-to-end architecture is made resilient with the Multi-AZ feature of EMR Serverless and using Multi-AZ deployments for AWS DMS and Amazon Aurora PostgreSQL-Compatible Edition. When you submit jobs to an EMR Serverless application, those jobs are automatically distributed to different Availability Zones in the Region. A job is run in a single Availability Zone to avoid performance implications of network traffic across Availability Zones. In case an Availability Zone is impaired, a job submitted to your EMR Serverless application is automatically run in a different (healthy) Availability Zone. When using resources in a private VPC, EMR Serverless recommends that you specify the private VPC configuration for multiple Availability Zones so that EMR Serverless can automatically select a healthy Availability Zone.
  • Cost optimization – When you run Spark or Hive applications using EMR Serverless, you pay for the amount of vCPU, memory, and storage resources consumed by your applications, leading to optimal utilization of resources. There is no separate charge for Amazon Elastic Compute Cloud (Amazon EC2) instances or Amazon Elastic Block Store (Amazon EBS) volumes. For additional details on cost, refer to Amazon EMR Serverless cost estimator.
  • Performance efficiency – You can run analytics workloads at any scale with automatic on-demand scaling that resizes resources in seconds to meet changing data volumes and processing requirements. EMR Serverless includes the Amazon EMR performance-optimized runtime for Apache Spark and Hive. The Amazon EMR runtime for Spark is 100% API-compatible with OSS Spark and is over 3.5 times as fast as the standard open-source, so your jobs run faster and incur less compute costs. With fast and fine-grained scaling in EMR Serverless, if a pipeline runs daily and needs to process 1 GB of data one day and 100 GB of data another day, EMR Serverless automatically scales to handle that load.
  • Monitoring – EMR Serverless sends metrics to Amazon CloudWatch at the application and job level every 1 minute. You can set up a single-view dashboard in CloudWatch to visualize application-level and job-level metrics using an AWS CloudFormation template provided on the EMR Serverless CloudWatch Dashboard GitHub repository. Also, EMR Serverless can store application logs in a managed storage, Amazon S3, or both based on your configuration settings. After you submit a job to an EMR Serverless application, you can view the real-time Spark UI or the Hive Tez UI for the running job from the EMR Studio console or request a secure URL using the GetDashboardForJobRun API. For completed jobs, you can view the Spark History Server or the Persistent Hive Tez UI from the EMR Studio console.

The following steps are performed to implement this solution:

  1. Connect to the Aurora PostgreSQL instance and generate a sample dataset.
    • Set up a data pipeline for loading data from Amazon Aurora PostgreSQL-Compatible Edition into Delta Lake on Amazon S3 and query using Athena:
    • Start the AWS DMS task to perform full table load and capture ongoing replication to the S3 raw layer.
    • Run the EMR Serverless Spark application to load data into Delta Lake.
    • Query the Delta tables (native tables) through Athena.
  2. Run the data pipeline to capture incremental data changes into Delta Lake:
    • Generate an incremental (CDC) dataset and insert it into the Aurora PostgreSQL database.
    • Run the EMR Serverless Spark application to merge CDC data in the S3 curated layer (incremental load).
    • Query the Delta Lake tables through Athena to validate the merged data.


We use a CloudFormation template to provision the AWS resources required for the solution. The CloudFormation template requires you to select an EC2 key pair. This key is configured on an EC2 instance that lives in the public subnet. We use this EC2 instance to connect to the Aurora PostgreSQL instance that lives in the private subnet. Make sure you have a key in the Region where you deploy the template. If you don’t have one, you can create a new key pair.

To walk through this post, we use Delta Lake version > 2.0.0, which is supported in Apache Spark 3.2.x. Choose the Delta Lake version compatible with your Spark version by visiting the Delta Lake releases page. We use an EMR Serverless application with version emr-6.9.0, which supports Spark version 3.3.0.

Deploy your resources

To provision the resources needed for the solution, complete the following steps:

  1. Choose Launch Stack:

  1. For Stack name, enter emr-serverless-deltalake-blog.
  2. For DatabaseUserName, enter the user name for logging in to Amazon Aurora PostgreSQL-Compatible Edition. Keep the default value if you don’t want to change it.
  3. For DatabasePassword, enter the password for logging in to Amazon Aurora PostgreSQL-Compatible Edition.
  4. For ClientIPCIDR, enter the IP address of your SQL client that will be used to connect to the EC2 instance. We use this EC2 instance to connect to the Aurora PostgreSQL database.
  5. For KeyName, enter the key pair to be used in your EC2 instance. This EC2 instance will be used as a proxy to connect from your SQL client to the Aurora PostgreSQL source database.
  6. For EC2ImageId, PrivateSubnet1CIDR, PrivateSubnet2CIDR, PublicSubnetCIDR, and VpcCIDR, keep the default values or choose appropriate values for the VPC and EC2 image for your specific environment.
  7. Choose Next.
  8. Choose Next again.
  9. On the review page, select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  10. Choose Create stack.

After the CloudFormation template is complete and the resources are created, the Outputs tab shows the information shown in the following screenshot.

The CloudFormation template creates all the resources needed for the solution workflow:

  • S3 raw and curated buckets
  • Aurora PostgreSQL database
  • AWS DMS migration task, replication instance, and other resources
  • EC2 instance for running data ingestion scripts
  • AWS Identity and Access Management (IAM) roles and policies needed to perform the necessary activities as part of this solution
  • VPC, subnets, security groups, and relevant network components
  • AWS Lambda functions that perform setup activities required for this workflow
  • Additional components needed for running the EMR Serverless workflow

You can find the PySpark script in the raw S3 bucket on the Amazon S3 console as shown in the following screenshot. The bucket will have the naming structure <CloudFormation template name>-rawS3bucket-<random string>. Make a note of the S3 path to the emr_delta_cdc.py script; you need this information while submitting the Spark job via the EMR Serverless application.

The preceding task for creating the resources via CloudFormation assumes that AWS Lake Formation is not enabled in the Region (which we enable later in this post). If you already have Lake Formation enabled in the Region, make sure the IAM user or role used in the CloudFormation template has the necessary permissions to create a database in the AWS Glue Data Catalog.

Connect to the Aurora PostgreSQL instance and generate a sample dataset

Connect to the Aurora PostgreSQL endpoint using your preferred client. For this post, we use the PSQL command line tool. Note that the IP address of the client machine from which you’re connecting to the database must be updated in the Aurora PostgreSQL security group. This is done by the CloudFormation template based on the input parameter value for ClientIPCIDR. If you’re accessing the database from another machine, update the security group accordingly.

  1. Connect to your EC2 instance from the command line using the public DNS of the EC2 instance from the CloudFormation template output.
  2. Log in to the EC2 instance and connect to the Aurora PostgreSQL instance using the following commands (the Aurora PostgreSQL endpoint is available on the Outputs tab of the CloudFormation stack):
    psql -h << Aurora PostgreSQL endpoint >> -p 5432 -U <<username>> -d emrdelta_source_db

  1. Run the following commands to create a schema and table for the fictional trip dataset:
    create schema delta_emr_source;
    create table delta_emr_source.travel_details (trip_id int PRIMARY KEY,tstamp timestamp, route_id varchar(2),destination varchar(50),source_location varchar(50));

  1. Create the following stored procedure to generate the records for the trip dataset and insert the records into the table.
    create or replace procedure delta_emr_source.insert_records(records int)
    language plpgsql
    as $$
    max_trip_id integer;
    --get max trip_id
    select coalesce(max(trip_id),1) into max_trip_id from delta_emr_source.travel_details;
    --insert records
    for i in max_trip_id+1..max_trip_id+records loop
    INSERT INTO delta_emr_source.travel_details (trip_id, tstamp, route_id,destination,source_location) values (i, current_timestamp, chr(65 + (i % 10)),(array['Seattle', 'New York', 'New Jersey', 'Los Angeles', 'Las Vegas',
    'Tucson', 'Washington DC', 'Philadelphia', 'Miami', 'San Francisco'])[(floor(random() * 10))+1],(array['Seattle', 'New York', 'New Jersey', 'Los Angeles', 'Las Vegas',
    'Tucson', 'Washington DC', 'Philadelphia', 'Miami', 'San Francisco'])[(floor(random() * 10))+1]);
    end loop;
    raise notice 'Inserted record count - %', records;
    end; $$;

  2. Call the preceding stored procedure to insert 20,000 records into the Aurora PostgreSQL database:
    call delta_emr_source.insert_records(20000);

  3. After the stored procedure is complete, verify that the records have been inserted successfully:
    select count(*) from delta_emr_source.travel_details;

Set up a data pipeline for loading data into Delta tables on Amazon S3 and query using Athena

In this section, we walk through the steps to set up a data pipeline that loads data from Amazon Aurora PostgreSQL-Compatible Edition into Delta tables on Amazon S3 and then query the data using Athena.

Start the AWS DMS task to perform full table load to the S3 raw layer

To perform the full table load, complete the following steps:

  1. On the AWS DMS console, choose Database migration tasks in the navigation pane.
  2. Select the task that was created by the CloudFormation template (emrdelta-postgres-s3-migration).
  3. On the Actions menu, choose Restart/Resume.

The task starts the full load and ongoing replication of data from the source database to Amazon S3.

  1. Wait for the job to complete.

You can validate that the data has been migrated successfully checking the Load state column for the AWS DMS task.

  1. Navigate to the S3 bucket created from the CloudFormation template to store raw data from AWS DMS.The bucket will have the naming structure <CloudFormation template name>-rawS3bucket-<random string>.
  2. Navigate to the folder delta_emr_source/travel_details in the raw S3 bucket. You can verify the S3 folder has Parquet data populated from the AWS DMS task.

Run the EMR Serverless Spark application to load data into Delta tables

We use EMR Studio to manage and submit jobs in an EMR Serverless application.

  1. Launch EMR Studio and create an EMR Serverless application.
  2. For Name, enter emr-delta-blog.
  3. For Type, choose Spark.
  4. For Release version, choose your release version.
  5. For Architecture, select x86_64.
  6. For Application setup options, select Choose default settings.

  1. Choose Create application and verify that the EMR application has been created successfully on the Amazon EMR console.

  1. Choose emr_delta_blog and then choose Start application. You can verify that the EMR application has started successfully on the Amazon EMR console, as shown in the following screenshot.

The application will move to Stopped status after a period of inactivity. When you submit the job to the application, it will start again and start the job. This provides cost savings because the jobs are run on demand as opposed to maintaining a running EMR cluster.

  1. While the application is in Started status, choose Submit job to submit the job to the application.

Create a new job in the Job details page

  1. For Name, enter emr-delta-load-job.
  2. For Runtime role, choose emrserverless-execution-role.
  3. For S3 URI, enter the S3 (raw bucket) path where the script emr_delta_cdc.py is uploaded.
  4. For Script arguments, enter ["I","delta_emr_source","9999-12-31-01","travel_details","route_id"].

The script arguments provide the following details to the EMR Serverless application:

  • I – The first argument represents the data load type. The allowed values are I for full load and U for incremental data load.
  • delta_emr_source – The second argument represents the source database schema from which data is being migrated through the AWS DMS task.
  • 9999-12-31-01 – The third argument represents the partition from which data needs to be loaded in an incremental fashion. This argument is used only during CDC data load; for full load, we have provided a default value (9999-12-31-01).
  • travel_details – The fourth argument represents the source database table from which data is being migrated through the AWS DMS task. Use a semicolon as a delimiter when entering multiple tables.
  • route_id – The fifth argument represents the partition keys on which the table data should be partitioned when stored in the S3 curated bucket. Use a semicolon as a delimiter when entering comma-separated partition keys for multiple tables.

With arguments, you can group a set of tables and submit the job to an EMR Serverless application. You can provide multiple table names separated by semicolons and enter the partition keys for those tables also separated by semicolon. If a particular table doesn’t have a partition key, simply enter a semicolon alone. The number of semicolon-separated values should match the table and partition key arguments for the script to run successfully.

Also, if you want to capture additional tables as part of an existing EMR Serverless job, you need to create a new EMR Serverless job to capture full load separately (set the first argument as I along with the new table names) and then change the argument list of the existing EMR Serverless job to add those new tables to capture incremental data load going forward.

EMR Serverless version 6.9.0 comes pre-installed with Delta version 2.1.0. Refer to About Amazon EMR Releases for more details about pre-installed libraries and applications for a specific Amazon EMR release. Before this, we have to upload the Delta JAR files to an S3 bucket in your account and provide the JAR file path in the application configurations using the spark.jars option. In this walkthrough, we create an EMR Serverless 6.9.0 application and use the pre-installed Delta jars from Amazon EMR.

  1. Under Spark properties, choose Edit in text and enter the following configurations:
--conf spark.jars=/usr/share/aws/delta/lib/delta-core.jar,/usr/share/aws/delta/lib/delta-storage.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog --conf spark.submit.pyFiles=/usr/share/aws/delta/lib/delta-core.jar --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory

If you want to use a different version of Delta JAR files, you can replace the S3 path of the JAR files in these configuration options.

  1. Leave the rest of the configurations at their default and choose Submit job.
  2. Wait for the job to complete successfully. You can verify this on the EMR Serverless console.

  1. Additionally, go to the S3 location (the curated bucket created by AWS CloudFormation) and verify that the Delta files are created along with the manifest file.

  1. Select a job run and then choose Spark History Server (Completed jobs) on the View Application UIs menu.

You can now use the Spark History Server UI to navigate to various tabs and analyze the job run in a detailed manner. For Spark error and output logs, you can navigate to the Executors tab and explore the driver or executor logs as required. This can help you to debug the job in case of failures by looking at the Spark logs.You can also choose Spark UI (Running jobs) to track the progress of the EMR Serverless Spark jobs while they are running.

The data load script is the same for initial and incremental data load because it can handle both the workflows through script arguments:

from pyspark.sql import SparkSession
from datetime import datetime
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import boto3
import sys
from delta import *

# S3 bucket location, auto-populated for this post. Replace for other jobs
curated_bucket= "<<curated_bucket_name>>"

spark = (
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

#Check for argument list and if it doesn't match the expected argument count, exit the program
if len(sys.argv) != 6:
print("This script requires 5 arguments for successful execution - Load_type,database_schema,CDC_path,source_table,Partition_keys")

s3 = boto3.client('s3')

# Split table names into a list if there are more than one table seperated by semicolon
tables = sys.argv[4].split(";")

schema = sys.argv[2]
load_type = sys.argv[1]
cdc_partition = sys.argv[3]
deltaHivePath = "s3://" + curated_bucket + "/" + schema + "/"
columns_to_drop = ["Op","schema_name", "table_name", "update_ts_dms", "tstamp"]
db_name = "emrserverless_delta"

# Split table partition keys into a list if there are more than one table separated by semicolon
partition_keys = sys.argv[5].split(";")
# Exit if length of table names and partition keys are different to ensure data is provided for all tables.
if len(tables)!=len(partition_keys):
print("Please enter partition keys for all tables. if partition key is not present enter empty semicolon - T1_PK;;T3PK")

i = 0
while i < len(tables):
table = tables[i]
partition_key = partition_keys[i].split(",")
if load_type == 'I':
print("Moving to Full-load logic for the table", table)

# Read the data from the raw bucket
source_df1 = spark.read.format("parquet").load(
"s3://" + raw_bucket + "/" + schema + "/" + table + "/")

# There is no target table in Delta format. Loading for the first time
# The following code segment populates Delta table in S3 and also
# updated the Glue catalog for querying with Athena.
additional_options = {"path": deltaHivePath + table + "/"}
if columns_to_drop is not None and columns_to_drop != '':
source_df1 = source_df1.drop(*columns_to_drop)

#Check for presence of partition key and before writing data to Curated bucket
if partition_key[0]:
.saveAsTable(db_name + ".spark_" + table)
.saveAsTable(db_name + ".spark_" + table)

# Generate symlink for Amazon Redshift Spectrum to read data
deltaTable = DeltaTable.forPath(spark, deltaHivePath + table + "/")

print("Moving to upsert logic, Reading data from partition - ",cdc_partition)
# The below logic will verify if the CDC path has data before proceeding with
# incremental load. if CDC path is not available for a specific table the load
# process is skipped to avoid spark read error.
resp = s3.list_objects_v2(
Prefix=schema +"/" +table +"/" +cdc_partition,
if 'CommonPrefixes' in resp:
update_df = spark.read.format("parquet").load(
"s3://" + raw_bucket + "/" + schema + "/" + table + "/" + cdc_partition + "/")

# Get recent record for each primary key to update the recent transaction to the Delta table
# This step is needed to de-dup transactions like inserts and deletes within the same batch
sort_order = Window.partitionBy(
update_df = update_df.withColumn("rec_val", row_number().over(

# upsert script using Merge operation. The below script updates/inserts data
# on all columns. In case you need to insert/update specific columns
# use whenNotMatchedInsert/whenMatchedUpdate functions and parameterize the input for each table
deltaTable = DeltaTable.forPath(spark, deltaHivePath + table + "/")
deltaTable.alias('trg') \
.merge(update_df.alias('src'),'trg.trip_id = src.trip_id')\
.whenNotMatchedInsertAll(condition="src.Op = 'I'") \
.whenMatchedUpdateAll(condition="src.Op='U'") \
.whenMatchedDelete(condition="src.Op = 'D'") \

# Generate symlink for Amazon Redshift Spectrum to read data
print("The path is empty for table -", table)
i = i + 1
print("The Job has completed execution...")

Monitor EMR Serverless application using CloudWatch dashboards

We can optionally monitor the EMR Serverless application using CloudWatch dashboards by installing the CloudFormation template from the EMR Serverless CloudWatch Dashboard GitHub repository.Follow the instructions on the Getting started section on the GitHub repository and deploy the CloudFormation template in your account.

You need to provide the EMR Serverless application ID as a parameter while deploying the CloudFormation stack, which can be obtained on the EMR Studio Applications page as shown in the following screenshot.

After the CloudFormation template is successfully deployed, navigate to the CloudWatch console to see a custom dashboard created for the EMR Serverless application ID that was provided to the CloudFormation template.

Choose the dashboard to see the different metrics for the EMR Serverless application in a single dashboard view.

You can see the available workers (one driver and two executors that were pre-initialized in the default configuration) and also the spike under successful job count that indicates the initial data load job that was completed successfully.

You could also monitor the CPU, memory, and storage allocated for the application, driver, and executor nodes separately.

The following image shows application metrics for three workers with 12 vCPUs (both driver and executor initialized with 4 vCPUs) and also the memory and storage usage. You can monitor the metrics from this dashboard and pre-initialize your application capacity that suits your specific workloads.

We can see the number of executors that were utilized for this job execution from the executor metrics section within the CloudWatch dashboard. We have used two executors and a driver for running this job.

Query the Delta tables through Athena

Previously, Delta tables were accessed through Athena by generating the manifest files (which maintain the list of data files to read for querying a Delta table). With the newly launched support in Athena for reading native Delta tables, it’s no longer required to generate and update manifest files. The Athena SQL engine version 3 can directly query native Delta tables. If you’re using an older engine version, change the engine version.

Navigate to the Athena console and start querying the data. Run a SELECT query and fetch the first 10 records to verify the data:

SELECT * FROM "AwsDataCatalog"."emrserverless_delta"."spark_travel_details" limit 10;

The table (native Delta table) has been created and updated to the AWS Glue Data Catalog from the EMR Serverless application code. You can successfully query and explore the data through Athena or Spark applications, but the schema definitions for individual columns aren’t updated in Data Catalog with this approach.

The following screenshot shows the Delta table created through code has a single array column. Athena supports reading native Delta tables and therefore we can read the data successfully even though the Data Catalog shows only a single array column.

If you need the individual column-level metadata to be available in the Data Catalog, run an AWS Glue crawler periodically to keep the AWS Glue metadata updated. For more information, refer to Introducing native Delta Lake table support with AWS Glue crawlers.

Run the data pipeline to load incremental data changes into the Delta tables

In this section, we walk through the steps to run the data pipeline.

Generate an incremental (CDC) dataset and insert it into the Aurora PostgreSQL database

  1. Log in to the EC2 instance via SSH and using the PSQL CLI, run the following SQL commands to generate CDC data on the source database:

update delta_emr_source.travel_details set destination='Tucson' where destination='Miami';
call delta_emr_source.insert_records(200);
delete from delta_emr_source.travel_details where destination='Los Angeles';

  1. Navigate to the AWS DMS console and verify whether the incremental records are populated to the S3 raw bucket by the replication task.

You can also verify in the S3 raw bucket location that the files are created under hourly partitioned folders.

Run the EMR Serverless Spark application to merge CDC data in the S3 curated layer (incremental load)

After the AWS DMS task has successfully loaded the incremental data, submit the Spark job on the EMR Serverless application to load the incremental data (CDC) with the following script arguments:

["U", "delta_emr_source", "2022-10-25-21", "travel_details","route_id"]

The partition path given here as 2022-10-25-21 should be changed as applicable in your use case. We use an example use case where the EMR Serverless job runs every hour, and the input data folder is partitioned on an hourly basis from AWS DMS. You can choose an appropriate partitioning strategy on the S3 raw bucket for your use case.

  1. Under Spark properties, choose Edit in text and enter the following configurations:
--conf spark.jars=/usr/share/aws/delta/lib/delta-core.jar,/usr/share/aws/delta/lib/delta-storage.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog --conf spark.submit.pyFiles=/usr/share/aws/delta/lib/delta-core.jar --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory

  1. When the job is successful, verify in Amazon S3 that more files are created in the _delta_log folder, capturing the changes from the current run.

Query the Delta tables through Athena to validate the merged data

Go to the Athena console to query the data and validate count to ensure that the table contains the most recent data:

SELECT destination, count(*) FROM "AwsDataCatalog"."emrserverless_delta"."spark_travel_details" group by destination;

If you also want to query this data from Amazon Redshift, you can create external tables in Redshift Spectrum for Delta tables. For more information, refer to Creating external tables for data managed in Delta Lake. Redshift Spectrum currently supports querying Delta tables through the manifest file option. A Delta table manifest contains a list of files that make up a consistent snapshot of the Delta table. The code snippet given in this post updates the manifest files every time new data is loaded in the Delta tables to ensure only the latest data is read from the Delta tables.

Clean up

To avoid incurring ongoing charges, clean up your infrastructure by deleting the stack from the AWS CloudFormation console. Delete the EMR Serverless application and any other resources you created during this exercise.


In this post, we demonstrated how to create a transactional data lake with Delta table format using EMR Serverless and AWS DMS. With the flexibility provided by EMR Serverless, you can use the latest version of open-source Delta framework on EMR Serverless (with the latest version of Spark) in order to support a wider range of transactional data lake needs based on various use cases.

Now you can build a transactional data lake for your organization with Delta table format and access data using Athena and Redshift Spectrum for various analytical workloads. You could use this high-level architecture for any other use cases where you need to use the latest version of Spark on EMR Serverless.

About the Authors

Sankar Sundaram is a Data Lab Architect at AWS, where he helps customers build and modernize data architectures and help them build secure, scalable, and performant data lake, database, and data warehouse solutions.

Monjumi Sarma is a Data Lab Solutions Architect at AWS. She helps customers architect data analytics solutions, which gives them an accelerated path towards modernization initiatives.

Use Apache Iceberg in a data lake to support incremental data processing

Post Syndicated from Flora Wu original https://aws.amazon.com/blogs/big-data/use-apache-iceberg-in-a-data-lake-to-support-incremental-data-processing/

Apache Iceberg is an open table format for very large analytic datasets, which captures metadata information on the state of datasets as they evolve and change over time. It adds tables to compute engines including Spark, Trino, PrestoDB, Flink, and Hive using a high-performance table format that works just like a SQL table. Iceberg has become very popular for its support for ACID transactions in data lakes and features like schema and partition evolution, time travel, and rollback.

Apache Iceberg integration is supported by AWS analytics services including Amazon EMR, Amazon Athena, and AWS Glue. Amazon EMR can provision clusters with Spark, Hive, Trino, and Flink that can run Iceberg. Starting with Amazon EMR version 6.5.0, you can use Iceberg with your EMR cluster without requiring a bootstrap action. In early 2022, AWS announced general availability of Athena ACID transactions, powered by Apache Iceberg. The recently released Athena query engine version 3 provides better integration with the Iceberg table format. AWS Glue 3.0 and later supports the Apache Iceberg framework for data lakes.

In this post, we discuss what customers want in modern data lakes and how Apache Iceberg helps address customer needs. Then we walk through a solution to build a high-performance and evolving Iceberg data lake on Amazon Simple Storage Service (Amazon S3) and process incremental data by running insert, update, and delete SQL statements. Finally, we show you how to performance tune the process to improve read and write performance.

How Apache Iceberg addresses what customers want in modern data lakes

More and more customers are building data lakes, with structured and unstructured data, to support many users, applications, and analytics tools. There is an increased need for data lakes to support database like features such as ACID transactions, record-level updates and deletes, time travel, and rollback. Apache Iceberg is designed to support these features on cost-effective petabyte-scale data lakes on Amazon S3.

Apache Iceberg addresses customer needs by capturing rich metadata information about the dataset at the time the individual data files are created. There are three layers in the architecture of an Iceberg table: the Iceberg catalog, the metadata layer, and the data layer, as depicted in the following figure (source).

The Iceberg catalog stores the metadata pointer to the current table metadata file. When a select query is reading an Iceberg table, the query engine first goes to the Iceberg catalog, then retrieves the location of the current metadata file. Whenever there is an update to the Iceberg table, a new snapshot of the table is created, and the metadata pointer points to the current table metadata file.

The following is an example Iceberg catalog with AWS Glue implementation. You can see the database name, the location (S3 path) of the Iceberg table, and the metadata location.

The metadata layer has three types of files: the metadata file, manifest list, and manifest file in a hierarchy. At the top of the hierarchy is the metadata file, which stores information about the table’s schema, partition information, and snapshots. The snapshot points to the manifest list. The manifest list has the information about each manifest file that makes up the snapshot, such as location of the manifest file, the partitions it belongs to, and the lower and upper bounds for partition columns for the data files it tracks. The manifest file tracks data files as well as additional details about each file, such as the file format. All three files work in a hierarchy to track the snapshots, schema, partitioning, properties, and data files in an Iceberg table.

The data layer has the individual data files of the Iceberg table. Iceberg supports a wide range of file formats including Parquet, ORC, and Avro. Because the Iceberg table tracks the individual data files instead of only pointing to the partition location with data files, it isolates the writing operations from reading operations. You can write the data files at any time, but only commit the change explicitly, which creates a new version of the snapshot and metadata files.

Solution overview

In this post, we walk you through a solution to build a high-performing Apache Iceberg data lake on Amazon S3; process incremental data with insert, update, and delete SQL statements; and tune the Iceberg table to improve read and write performance. The following diagram illustrates the solution architecture.

To demonstrate this solution, we use the Amazon Customer Reviews dataset in an S3 bucket (s3://amazon-reviews-pds/parquet/). In real use case, it would be raw data stored in your S3 bucket. We can check the data size with the following code in the AWS Command Line Interface (AWS CLI):

//Run this AWS CLI command to check the data size
aws s3 ls --summarize --human-readable --recursive s3://amazon-reviews-pds/parquet

The total object count is 430, and total size is 47.4 GiB.

To set up and test this solution, we complete the following high-level steps:

  1. Set up an S3 bucket in the curated zone to store converted data in Iceberg table format.
  2. Launch an EMR cluster with appropriate configurations for Apache Iceberg.
  3. Create a notebook in EMR Studio.
  4. Configure the Spark session for Apache Iceberg.
  5. Convert data to Iceberg table format and move data to the curated zone.
  6. Run insert, update, and delete queries in Athena to process incremental data.
  7. Carry out performance tuning.


To follow along with this walkthrough, you must have an AWS account with an AWS Identity and Access Management (IAM) role that has sufficient access to provision the required resources.

Set up the S3 bucket for Iceberg data in the curated zone in your data lake

Choose the Region in which you want to create the S3 bucket and provide a unique name:


Launch an EMR cluster to run Iceberg jobs using Spark

You can create an EMR cluster from the AWS Management Console, Amazon EMR CLI, or AWS Cloud Development Kit (AWS CDK). For this post, we walk you through how to create an EMR cluster from the console.

  1. On the Amazon EMR console, choose Create cluster.
  2. Choose Advanced options.
  3. For Software Configuration, choose the latest Amazon EMR release. As of January 2023, the latest release is 6.9.0. Iceberg requires release 6.5.0 and above.
  4. Select JupyterEnterpriseGateway and Spark as the software to install.
  5. For Edit software settings, select Enter configuration and enter [{"classification":"iceberg-defaults","properties":{"iceberg.enabled":true}}].
  6. Leave other settings at their default and choose Next.
  7. For Hardware, use the default setting.
  8. Choose Next.
  9. For Cluster name, enter a name. We use iceberg-blog-cluster.
  10. Leave the remaining settings unchanged and choose Next.
  11. Choose Create cluster.

Create a notebook in EMR Studio

We now walk you through how to create a notebook in EMR Studio from the console.

  1. On the IAM console, create an EMR Studio service role.
  2. On the Amazon EMR console, choose EMR Studio.
  3. Choose Get started.

The Get started page appears in a new tab.

  1. Choose Create Studio in the new tab.
  2. Enter a name. We use iceberg-studio.
  3. Choose the same VPC and subnet as those for the EMR cluster, and the default security group.
  4. Choose AWS Identity and Access Management (IAM) for authentication, and choose the EMR Studio service role you just created.
  5. Choose an S3 path for Workspaces backup.
  6. Choose Create Studio.
  7. After the Studio is created, choose the Studio access URL.
  8. On the EMR Studio dashboard, choose Create workspace.
  9. Enter a name for your Workspace. We use iceberg-workspace.
  10. Expand Advanced configuration and choose Attach Workspace to an EMR cluster.
  11. Choose the EMR cluster you created earlier.
  12. Choose Create Workspace.
  13. Choose the Workspace name to open a new tab.

In the navigation pane, there is a notebook that has the same name as the Workspace. In our case, it is iceberg-workspace.

  1. Open the notebook.
  2. When prompted to choose a kernel, choose Spark.

Configure a Spark session for Apache Iceberg

Use the following code, providing your own S3 bucket name:

%%configure -f
"conf": {
"spark.sql.catalog.demo": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.demo.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
"spark.sql.catalog.demo.warehouse": "s3://iceberg-curated-blog-data",

This sets the following Spark session configurations:

  • spark.sql.catalog.demo – Registers a Spark catalog named demo, which uses the Iceberg Spark catalog plugin.
  • spark.sql.catalog.demo.catalog-impl – The demo Spark catalog uses AWS Glue as the physical catalog to store Iceberg database and table information.
  • spark.sql.catalog.demo.warehouse – The demo Spark catalog stores all Iceberg metadata and data files under the root path defined by this property: s3://iceberg-curated-blog-data.
  • spark.sql.extensions – Adds support to Iceberg Spark SQL extensions, which allows you to run Iceberg Spark procedures and some Iceberg-only SQL commands (you use this in a later step).
  • spark.sql.catalog.demo.io-impl – Iceberg allows users to write data to Amazon S3 through S3FileIO. The AWS Glue Data Catalog by default uses this FileIO, and other catalogs can load this FileIO using the io-impl catalog property.

Convert data to Iceberg table format

You can use either Spark on Amazon EMR or Athena to load the Iceberg table. In the EMR Studio Workspace notebook Spark session, run the following commands to load the data:

// create a database in AWS Glue named reviews if not exist
spark.sql("CREATE DATABASE IF NOT EXISTS demo.reviews")

// load reviews - this load all the parquet files
val reviews_all_location = "s3://amazon-reviews-pds/parquet/"
val reviews_all = spark.read.parquet(reviews_all_location)

// write reviews data to an Iceberg v2 table
reviews_all.writeTo("demo.reviews.all_reviews").tableProperty("format-version", "2").createOrReplace()

After you run the code, you should find two prefixes created in your data warehouse S3 path (s3://iceberg-curated-blog-data/reviews.db/all_reviews): data and metadata.

Process incremental data using insert, update, and delete SQL statements in Athena

Athena is a serverless query engine that you can use to perform read, write, update, and optimization tasks against Iceberg tables. To demonstrate how the Apache Iceberg data lake format supports incremental data ingestion, we run insert, update, and delete SQL statements on the data lake.

Navigate to the Athena console and choose Query editor. If this is your first time using the Athena query editor, you need to configure the query result location to be the S3 bucket you created earlier. You should be able to see that the table reviews.all_reviews is available for querying. Run the following query to verify that you have loaded the Iceberg table successfully:

select * from reviews.all_reviews limit 5;

Process incremental data by running insert, update, and delete SQL statements:

//Example update statement
update reviews.all_reviews set star_rating=5 where product_category = 'Watches' and star_rating=4

//Example delete statement
delete from reviews.all_reviews where product_category = 'Watches' and star_rating=1

Performance tuning

In this section, we walk through different ways to improve Apache Iceberg read and write performance.

Configure Apache Iceberg table properties

Apache Iceberg is a table format, and it supports table properties to configure table behavior such as read, write, and catalog. You can improve the read and write performance on Iceberg tables by adjusting the table properties.

For example, if you notice that you write too many small files for an Iceberg table, you can config the write file size to write fewer but bigger size files, to help improve query performance.

Property Default Description
write.target-file-size-bytes 536870912 (512 MB) Controls the size of files generated to target about this many bytes

Use the following code to alter the table format:

//Example code to alter table format in EMR Studio Workspace notebook
spark.sql("ALTER TABLE demo.reviews.all_reviews 
SET TBLPROPERTIES ('write_target_data_file_size_bytes'='536870912')")

Partitioning and sorting

To make a query run fast, the less data read the better. Iceberg takes advantage of the rich metadata it captures at write time and facilitates techniques such as scan planning, partitioning, pruning, and column-level stats such as min/max values to skip data files that don’t have match records. We walk you through how query scan planning and partitioning work in Iceberg and how we use them to improve query performance.

Query scan planning

For a given query, the first step in a query engine is scan planning, which is the process to find the files in a table needed for a query. Planning in an Iceberg table is very efficient, because Iceberg’s rich metadata can be used to prune metadata files that aren’t needed, in addition to filtering data files that don’t contain matching data. In our tests, we observed Athena scanned 50% or less data for a given query on an Iceberg table compared to original data before conversion to Iceberg format.

There are two types of filtering:

  • Metadata filtering – Iceberg uses two levels of metadata to track the files in a snapshot: the manifest list and manifest files. It first uses the manifest list, which acts as an index of the manifest files. During planning, Iceberg filters manifests using the partition value range in the manifest list without reading all the manifest files. Then it uses selected manifest files to get data files.
  • Data filtering – After selecting the list of manifest files, Iceberg uses the partition data and column-level stats for each data file stored in manifest files to filter data files. During planning, query predicates are converted to predicates on the partition data and applied first to filter data files. Then, the column stats like column-level value counts, null counts, lower bounds, and upper bounds are used to filter out data files that can’t match the query predicate. By using upper and lower bounds to filter data files at planning time, Iceberg greatly improves query performance.

Partitioning and sorting

Partitioning is a way to group records with the same key column values together in writing. The benefit of partitioning is faster queries that access only part of the data, as explained earlier in query scan planning: data filtering. Iceberg makes partitioning simple by supporting hidden partitioning, in the way that Iceberg produces partition values by taking a column value and optionally transforming it.

In our use case, we first run the following query on the Iceberg table not partitioned. Then we partition the Iceberg table by the category of the reviews, which will be used in the query WHERE condition to filter out records. With partitioning, the query could scan much less data. See the following code:

//Example code in EMR Studio Workspace notebook to create an Iceberg table all_reviews_partitioned partitioned by product_category
reviews_all.writeTo("demo.reviews.all_reviews_partitioned").tableProperty("format-version", "2").partitionedBy($"product_category").createOrReplace()

Run the following select statement on the non-partitioned all_reviews table vs. the partitioned table to see the performance difference:

//Run this query on all_reviews table and the partitioned table for performance testing
select marketplace,customer_id, review_id,product_id,product_title,star_rating from reviews.all_reviews where product_category = 'Watches' and review_date between date('2005-01-01') and date('2005-03-31')

//Run the same select query on partitioned dataset
select marketplace,customer_id, review_id,product_id,product_title,star_rating from reviews.all_reviews_partitioned where product_category = 'Watches' and review_date between date('2005-01-01') and date('2005-03-31')

The following table shows the performance improvement of data partitioning, with about 50% performance improvement and 70% less data scanned.

Dataset Name Non-Partitioned Dataset Partitioned Dataset
Runtime (seconds) 8.20 4.25
Data Scanned (MB) 131.55 33.79

Note that the runtime is the average runtime with multiple runs in our test.

We saw good performance improvement after partitioning. However, this can be further improved by using column-level stats from Iceberg manifest files. In order to use the column-level stats effectively, you want to further sort your records based on the query patterns. Sorting the whole dataset using the columns that are often used in queries will reorder the data in such a way that each data file ends up with a unique range of values for the specific columns. If these columns are used in the query condition, it allows query engines to further skip data files, thereby enabling even faster queries.

Copy-on-write vs. read-on-merge

When implementing update and delete on Iceberg tables in the data lake, there are two approaches defined by the Iceberg table properties:

  • Copy-on-write – With this approach, when there are changes to the Iceberg table, either updates or deletes, the data files associated with the impacted records will be duplicated and updated. The records will be either updated or deleted from the duplicated data files. A new snapshot of the Iceberg table will be created and pointing to the newer version of data files. This makes the overall writes slower. There might be situations that concurrent writes are needed with conflicts so retry has to happen, which increases the write time even more. On the other hand, when reading the data, there is no extra process needed. The query will retrieve data from the latest version of data files.
  • Merge-on-read – With this approach, when there are updates or deletes on the Iceberg table, the existing data files will not be rewritten; instead new delete files will be created to track the changes. For deletes, a new delete file will be created with the deleted records. When reading the Iceberg table, the delete file will be applied to the retrieved data to filter out the delete records. For updates, a new delete file will be created to mark the updated records as deleted. Then a new file will be created for those records but with updated values. When reading the Iceberg table, both the delete and new files will be applied to the retrieved data to reflect the latest changes and produce the correct results. So, for any subsequent queries, an extra step to merge the data files with the delete and new files will happen, which will usually increase the query time. On the other hand, the writes might be faster because there is no need to rewrite the existing data files.

To test the impact of the two approaches, you can run the following code to set the Iceberg table properties:

//Run code to alter Iceberg table property to set copy-on-write and merge-on-read in EMR Studio Workspace notebook
spark.sql(“ALTER TABLE demo.reviews.all_reviews 
SET TBLPROPERTIES (‘write.delete.mode’=’copy-on-write’,’write.update.mode’=’copy-on-write’)”)

Run the update, delete, and select SQL statements in Athena to show the runtime difference for copy-on-write vs. merge-on-read:

//Example update statement
update reviews.all_reviews set star_rating=5 where product_category = ‘Watches’ and star_rating=4

//Example delete statement
delete from reviews.all_reviews where product_category = ‘Watches’ and star_rating=1

//Example select statement
select marketplace,customer_id, review_id,product_id,product_title,star_rating from reviews.all_reviews where product_category = ‘Watches’ and review_date between date(‘2005-01-01’) and date(‘2005-03-31’)

The following table summarizes the query runtimes.

Query Copy-on-Write Merge-on-Read
Runtime (seconds) 66.251 116.174 97.75 10.788 54.941 113.44
Data scanned (MB) 494.06 3.07 137.16 494.06 3.07 137.16

Note that the runtime is the average runtime with multiple runs in our test.

As our test results show, there are always trade-offs in the two approaches. Which approach to use depends on your use cases. In summary, the considerations come down to latency on the read vs. write. You can reference the following table and make the right choice.

. Copy-on-Write Merge-on-Read
Pros Faster reads Faster writes
Cons Expensive writes Higher latency on reads
When to use Good for frequent reads, infrequent updates and deletes or large batch updates Good for tables with frequent updates and deletes

Data compaction

If your data file size is small, you might end up with thousands or millions of files in an Iceberg table. This dramatically increases the I/O operation and slows down the queries. Furthermore, Iceberg tracks each data file in a dataset. More data files lead to more metadata. This in turn increases the overhead and I/O operation on reading metadata files. In order to improve the query performance, it’s recommended to compact small data files to larger data files.

When updating and deleting records in Iceberg table, if the read-on-merge approach is used, you might end up with many small deletes or new data files. Running compaction will combine all these files and create a newer version of the data file. This eliminates the need to reconcile them during reads. It’s recommended to have regular compaction jobs to impact reads as little as possible while still maintaining faster write speed.

Run the following data compaction command, then run the select query from Athena:

//Data compaction 
optimize reviews.all_reviews REWRITE DATA USING BIN_PACK

//Run this query before and after data compaction
select marketplace,customer_id, review_id,product_id,product_title,star_rating from reviews.all_reviews where product_category = 'Watches' and review_date between date('2005-01-01') and date('2005-03-31')

The following table compares the runtime before vs. after data compaction. You can see about 40% performance improvement.

Query Before Data Compaction After Data Compaction
Runtime (seconds) 97.75 32.676 seconds
Data scanned (MB) 137.16 M 189.19 M

Note that the select queries ran on the all_reviews table after update and delete operations, before and after data compaction. The runtime is the average runtime with multiple runs in our test.

Clean up

After you follow the solution walkthrough to perform the use cases, complete the following steps to clean up your resources and avoid further costs:

  1. Drop the AWS Glue tables and database from Athena or run the following code in your notebook:
// DROP the table 
spark.sql("DROP TABLE demo.reviews.all_reviews") 
spark.sql("DROP TABLE demo.reviews.all_reviews_partitioned") 

// DROP the database 
spark.sql("DROP DATABASE demo.reviews")
  1. On the EMR Studio console, choose Workspaces in the navigation pane.
  2. Select the Workspace you created and choose Delete.
  3. On the EMR console, navigate to the Studios page.
  4. Select the Studio you created and choose Delete.
  5. On the EMR console, choose Clusters in the navigation pane.
  6. Select the cluster and choose Terminate.
  7. Delete the S3 bucket and any other resources that you created as part of the prerequisites for this post.


In this post, we introduced the Apache Iceberg framework and how it helps resolve some of the challenges we have in a modern data lake. Then we walked you though a solution to process incremental data in a data lake using Apache Iceberg. Finally, we had a deep dive into performance tuning to improve read and write performance for our use cases.

We hope this post provides some useful information for you to decide whether you want to adopt Apache Iceberg in your data lake solution.

About the Authors

Flora Wu is a Sr. Resident Architect at AWS Data Lab. She helps enterprise customers create data analytics strategies and build solutions to accelerate their businesses outcomes. In her spare time, she enjoys playing tennis, dancing salsa, and traveling.

Daniel Li is a Sr. Solutions Architect at Amazon Web Services. He focuses on helping customers develop, adopt, and implement cloud services and strategy. When not working, he likes spending time outdoors with his family.

Reduce Amazon EMR cluster costs by up to 19% with new enhancements in Amazon EMR Managed Scaling

Post Syndicated from Sushant Majithia original https://aws.amazon.com/blogs/big-data/reduce-amazon-emr-cluster-costs-by-up-to-19-with-new-enhancements-in-amazon-emr-managed-scaling/

In June 2020, AWS announced the general availability of Amazon EMR Managed Scaling. With EMR Managed Scaling, you specify the minimum and maximum compute limits for your clusters, and Amazon EMR automatically resizes your cluster for optimal performance and resource utilization. EMR Managed Scaling constantly monitors key workload-related metrics and uses an algorithm that optimizes the cluster size for best resource utilization. Given that the feature is completely managed, improvements to the algorithm are immediately realized without needing a version upgrade. Amazon EMR can scale the cluster up during peaks and scale it down gracefully during idle periods, reducing your costs and optimizing cluster capacity for the best performance.

Throughout 2022, we made multiple enhancements to the EMR Managed Scaling algorithm. With these improvements, we observed that for clusters enabled with EMR Managed Scaling, utilization improved by up to 15 percent, and total costs were reduced further by up to 19 percent. Starting mid-December 2022, EMR Managed Scaling enhancements were enabled by default for clusters using Amazon EMR versions 5.34.0 and later and Amazon EMR versions 6.4.0 and later for both new and existing clusters. Further, given that the feature is completely managed, you will get the new optimized Managed Scaling algorithm by default, and no action is needed on your end.

Listed below are some of the key enhancements we enabled for EMR Managed Scaling:

  • Improved cluster utilization with targeted scale-down of your EMR cluster
  • Reduced costs by preventing scale-down of instances that store intermediate shuffle data using Spark Shuffle data awareness
  • Improved cluster utilization and reduce costs with gradual scale-up of your EMR cluster

Customer success stories

How the enhanced EMR Managed Scaling algorithm helped a technology enterprise reduce costs:

To illustrate the cost savings by examples, we looked at an EMR clusters for a technology enterprise, which heavily uses Amazon EMR to process real time billing data between Kafka and S3 using Spark. They run a persistent EMR cluster with EMR version 5.35 and have EMR Managed Scaling turned-on. The following Amazon CloudWatch dashboard shows how starting December 21, the enhanced Managed Scaling algorithm provisioned (total nodes requested) only 70 nodes vs. the previous Managed Scaling algorithm which provisioned 179 nodes for a similar job profile. The lower the number of resources provisioned to run your jobs, the lower the total cost of your EMR cluster.

How the enhanced EMR Managed Scaling algorithm helped an advertising enterprise reduce costs:

We also looked at an EMR cluster for an advertising enterprise, which leverages Amazon EMR for their data analytics strategy and executes their batch ETL jobs using Spark. They run their clusters on EMR version 6.5 and have EMR Managed Scaling turned-on. The following Amazon CloudWatch dashboard shows how starting December 15, the enhanced Managed Scaling algorithm provisioned (total units requested) only 41 nodes vs. the previous Managed Scaling algorithm which provisioned 86 nodes for a similar job profile.

Estimating the cost savings and utilization improvements for your EMR clusters:

Cluster cost savings:

To view estimated cost savings for your EMR cluster with the EMR Managed Scaling enhancements, please follow the steps below:

  • Open the CloudWatch metrics console and, under EMR, search by your ClusterId.
  • From the list of metrics available for EMR, select the following two metrics:
    • Running capacity – Based on the unit type you specified in your Managed Scaling policy, this will be available as either “TotalUnitsRunning” or “TotalNodesRunning” or “TotalVCPURunning
    • Capacity requested by Managed Scaling – Based on the unit type you specified in your Managed Scaling policy, this will be available as either “TotalUnitsRequested” or “TotalNodesRequested” or “TotalVCPURequested
  •  Plot both of the metrics to your CloudWatch dashboard.
  • Select the time frame as the 3 months between November 2022 and January 2023 to view the improvements with the enhanced Managed Scaling algorithm when compared to the previous Managed Scaling algorithm.

Cluster utilization improvements:

To estimate the improvements in your EMR cluster utilization with the EMR Managed Scaling enhancements, please follow the steps below:

  • Open the CloudWatch metrics console and, under EMR, search by your ClusterId.
  • From the list of metrics available for EMR, select the “YARNMemoryAvailablePercentage” metric.
  • To derive memory utilized by YARN, add a math expression such as “Add Math → Start with empty expression”
    • For the new math expression, set Label=Yarn Utilization and set Details=100-YARNMemoryAvailablePercentage.
  • Plot the cluster utilization metric to your CloudWatch dashboard.
  • Select the time frame as the 3 months between November 2022 and January 2023 to view the improvements with the enhanced Managed Scaling algorithm when compared to the previous Managed Scaling algorithm.

What’s next

We will continue to tune the Managed Scaling algorithm with every new EMR release and thereby improve the customer experience when scaling clusters with EMR Managed Scaling.


In this post, we provided an overview of the key enhancement we launched in EMR Managed Scaling. With these enhancements, we observed that the cluster utilization improved by up to 15 percent, and cluster cost was reduced by up to 19 percent. Starting mid-December 2022, these enhancements were enabled by default for EMR clusters using Amazon EMR versions 5.34.0 and later, and Amazon EMR versions 6.4.0 and later. Given that EMR Managed Scaling is a completely managed feature, you will get the new, optimized EMR Managed Scaling algorithm by default, and no action is needed from your end.

To learn more and get started with EMR Managed Scaling, visit the EMR Managed Scaling documentation page.

About the Authors

Sushant Majithia is a Principal Product Manager for EMR at Amazon Web Services.

 Vishal Vyas is a Senior Software Engineer for EMR at Amazon Web Services.

Matthew Liem is a Senior Solution Architecture Manager at AWS.

Achieve up to 27% better price-performance for Spark workloads with AWS Graviton2 on Amazon EMR Serverless

Post Syndicated from Karthik Prabhakar original https://aws.amazon.com/blogs/big-data/achieve-up-to-27-better-price-performance-for-spark-workloads-with-aws-graviton2-on-amazon-emr-serverless/

Amazon EMR Serverless is a serverless option in Amazon EMR that makes it simple to run applications using open-source analytics frameworks such as Apache Spark and Hive without configuring, managing, or scaling clusters.

At AWS re:Invent 2022, we announced support for running serverless Spark and Hive workloads with AWS Graviton2 (Arm64) on Amazon EMR Serverless. AWS Graviton2 processors are custom-built by AWS using 64-bit Arm Neoverse cores, delivering a significant leap in price-performance for your cloud workloads.

This post discusses the performance improvements observed while running Apache Spark jobs using AWS Graviton2 on EMR Serverless. We found that Graviton2 on EMR Serverless achieved 10% performance improvement for Spark workloads based on runtime. AWS Graviton2 is offered at a 20% lower cost than the x86 architecture option (see the Amazon EMR pricing page for details), resulting in a 27% overall better price-performance for workloads.

Spark performance test results

The following charts compare the benchmark runtime with and without Graviton2 for a EMR Serverless Spark application (note that the charts are not drawn to scale). We observed up to 10% improvement in total runtime and 8% improvement in geometric mean for the queries compared to x86.

The following table summarizes our results.

Metric Graviton2 x86 %Gain
Total Execution Time (in seconds) 2,670 2,959 10%
Geometric Mean (in seconds) 22.06 24.07 8%

Testing configuration

To evaluate the performance improvements, we use benchmark tests derived from TPC-DS 3 TB scale performance benchmarks. The benchmark consists of 104 queries, and each query is submitted sequentially to an EMR Serverless application. EMR Serverless has automatic and fine-grained scaling enabled by default. Spark provides Dynamic Resource Allocation (DRA) to dynamically adjust the application resources based on the workload, and EMR Serverless uses the signals from DRA to elastically scale workers as needed. For our tests, we chose a predefined pre-initialized capacity that allows the application to scale to default limits. Each application has 1 driver and 100 workers configured as pre-initialized capacity, allowing it to scale to a maximum of 8000 vCPU/60000 GB capacity. When launching the applications, as default we use x86_64 to get baseline numbers and Arm64 for AWS Graviton2, and the application had VPC networking enabled.

The following table summarizes the Spark application configuration.

Number of Drivers Driver Size Number of Executors Executor Size Ephemeral Storage Amazon EMR release label
1 4 vCPUs, 16 GB Memory 100 4 vCPUs, 16 GB Memory 200 G 6.9

Performance test results and cost comparison

Let’s do a cost comparison of the benchmark tests. Because we used 1 driver [4 vCPUs, 16 GB memory] and 100 executors [4 vCPUs, 16 GB memory] for each run, the total capacity used is 4*101=192 vCPUs, 16*101=1616 GB memory, 200*100=20000 GB storage. The following table summarizes the cost.

Test Total time (Seconds) vCPUs Memory (GB) Ephemeral (Storage GB) Cost
x86_64 2,958.82 404 1616 18000 $26.73
Graviton2 2,670.38 404 1616 18000 $19.59

The calculations are as follows:

  • Total vCPU cost = (number of vCPU * per vCPU rate * job runtime in hour)
  • Total GB = (Total GB of memory configured * per GB-hours rate * job runtime in hour)
  • Storage = 20 GB of ephemeral storage is available for all workers by default—you pay only for any additional storage that you configure per worker

Cost breakdown

Let’s look at the cost breakdown for x86:

  • Job runtime – 49.3 minutes = 0.82 hours
  • Total vCPU cost – 404 vCPUs x 0.82 hours job runtime x 0.052624 USD per vCPU = 17.4333 USD
  • Total GB cost – 1,616 memory-GBs x 0.82 hours job runtime x 0.0057785 USD per memory GB = 7.6572 USD
  • Storage cost – 18,000 storage-GBs x 0.82 hours job runtime x 0.000111 USD per storage GB = 1.6386 USD
  • Additional storage – 20,000 GB – 20 GB free tier * 100 workers = 18,000 additional storage GB
  • EMR Serverless total cost (x86): 17.4333 USD + 7.6572 USD + 1.6386 USD = 26.7291 USD

Let’s compare to the cost breakdown for Graviton 2:

  • Job runtime – 44.5 minutes = 0.74 hours
  • Total vCPU cost – 404 vCPUs x 0.74 hours job runtime x 0.042094 USD per vCPU = 12.5844 USD
  • Total GB cost – 1,616 memory-GBs x 0.74 hours job runtime x 0.004628 USD per memory GB = 5.5343 USD
  • Storage cost – 18,000 storage-GBs x 0.74 hours job runtime x 0.000111 USD per storage GB = 1.4785 USD
  • Additional storage – 20,000 GB – 20 GB free tier * 100 workers = 18,000 additional storage GB
  • EMR Serverless total cost (Graviton2): 12.5844 USD + 5.5343 USD + 1.4785 USD = 19.5972 USD

The tests indicate that for the benchmark run, AWS Graviton2 lead to an overall cost savings of 27%.

Individual query improvements and observations

The following chart shows the relative speedup of individual queries with Graviton2 compared to x86.

We see some regression in a few shorter queries, which had little impact on the overall benchmark runtime. We observed better performance gains for long running queries, for example:

  • q67 average 86 seconds for x86, 74 seconds for Graviton2 with 24% runtime performance gain
  • q23a and q23b gained 14% and 16%, respectively
  • q32 regressed by 7%; the difference between average runtime is <500 milliseconds (11.09 seconds for Graviton2 vs. 10.39 seconds for x86)

To quantify performance, we use benchmark SQL derived from TPC-DS 3 TB scale performance benchmarks.

If you’re evaluating migrating your workloads to Graviton2 architecture on EMR Serverless, we recommend testing the Spark workloads based on your real-world use cases. The outcome might vary based on the pre-initialized capacity and number of workers chosen. If you want to run workloads across multiple processor architectures, (for example, test the performance on x86 and Arm vCPUs) follow the walkthrough in the GitHub repo to get started with some concrete ideas.


As demonstrated in this post, Graviton2 on EMR Serverless applications consistently yielded better performance for Spark workloads. Graviton2 is available in all Regions where EMR Serverless is available. To see a list of Regions where EMR Serverless is available, see the EMR Serverless FAQs. To learn more, visit the Amazon EMR Serverless User Guide and sample codes with Apache Spark and Apache Hive.

If you’re wondering how much performance gain you can achieve with your use case, try out the steps outlined in this post and replace with your queries.

To launch your first Spark or Hive application using a Graviton2-based architecture on EMR Serverless, see Getting started with Amazon EMR Serverless.

About the authors

Karthik Prabhakar is a Senior Big Data Solutions Architect for Amazon EMR at AWS. He is an experienced analytics engineer working with AWS customers to provide best practices and technical advice in order to assist their success in their data journey.

Nithish Kumar Murcherla is a Senior Systems Development Engineer on the Amazon EMR Serverless team. He is passionate about distributed computing, containers, and everything and anything about the data.

Amazon EMR Serverless supports larger worker sizes to run more compute and memory-intensive workloads

Post Syndicated from Veena Vasudevan original https://aws.amazon.com/blogs/big-data/amazon-emr-serverless-supports-larger-worker-sizes-to-run-more-compute-and-memory-intensive-workloads/

Amazon EMR Serverless allows you to run open-source big data frameworks such as Apache Spark and Apache Hive without managing clusters and servers. With EMR Serverless, you can run analytics workloads at any scale with automatic scaling that resizes resources in seconds to meet changing data volumes and processing requirements. EMR Serverless automatically scales resources up and down to provide just the right amount of capacity for your application.

We are excited to announce that EMR Serverless now offers worker configurations of 8 vCPUs with up to 60 GB memory and 16 vCPUs with up to 120 GB memory, allowing you to run more compute and memory-intensive workloads on EMR Serverless. An EMR Serverless application internally uses workers to execute workloads. and you can configure different worker configurations based on your workload requirements. Previously, the largest worker configuration available on EMR Serverless was 4 vCPUs with up to 30 GB memory. This capability is especially beneficial for the following common scenarios:

  • Shuffle-heavy workloads
  • Memory-intensive workloads

Let’s look at each of these use cases and the benefits of having larger worker sizes.

Benefits of using large workers for shuffle-intensive workloads

In Spark and Hive, shuffle occurs when data needs to be redistributed across the cluster during a computation. When your application performs wide transformations or reduce operations such as join, groupBy, sortBy, or repartition, Spark and Hive triggers a shuffle. Also, every Spark stage and Tez vertex is bounded by a shuffle operation. Taking Spark as an example, by default, there are 200 partitions for every Spark job defined by spark.sql.shuffle.partitions. However, Spark will compute the number of tasks on the fly based on the data size and the operation being performed. When a wide transformation is performed on top of a large dataset, there could be GBs or even TBs of data that need to be fetched by all the tasks.

Shuffles are typically expensive in terms of both time and resources, and can lead to performance bottlenecks. Therefore, optimizing shuffles can have a significant impact on the performance and cost of a Spark job. With large workers, more data can be allocated to each executor’s memory, which minimizes the data shuffled across executors. This in turn leads to increased shuffle read performance because more data will be fetched locally from the same worker and less data will be fetched remotely from other workers.


To demonstrate the benefits of using large workers for shuffle-intensive queries, let’s use q78 from TPC-DS, which is a shuffle-heavy Spark query that shuffles 167 GB of data over 12 Spark stages. Let’s perform two iterations of the same query with different configurations.

The configurations for Test 1 are as follows:

  • Size of executor requested while creating EMR Serverless application = 4 vCPUs, 8 GB memory, 200 GB disk
  • Spark job config:
    • spark.executor.cores = 4
    • spark.executor.memory = 8
    • spark.executor.instances = 48
    • Parallelism = 192 (spark.executor.instances * spark.executor.cores)

The configurations for Test 2 are as follows:

  • Size of executor requested while creating EMR Serverless application = 8 vCPUs, 16 GB memory, 200 GB disk
  • Spark job config:
    • spark.executor.cores = 8
    • spark.executor.memory = 16
    • spark.executor.instances = 24
    • Parallelism = 192 (spark.executor.instances * spark.executor.cores)

Let’s also disable dynamic allocation by setting spark.dynamicAllocation.enabled to false for both tests to avoid any potential noise due to variable executor launch times and keep the resource utilization consistent for both tests. We use Spark Measure, which is an open-source tool that simplifies the collection and analysis of Spark performance metrics. Because we’re using a fixed number of executors, the total number of vCPUs and memory requested are the same for both the tests. The following table summarizes the observations from the metrics collected with Spark Measure.

. Total Time Taken for Query in milliseconds shuffleLocalBlocksFetched shuffleRemoteBlocksFetched shuffleLocalBytesRead shuffleRemoteBytesRead shuffleFetchWaitTime shuffleWriteTime
Test 1 153244 114175 5291825 3.5 GB 163.1 GB 1.9 hr 4.7 min
Test 2 108136 225448 5185552 6.9 GB 159.7 GB 3.2 min 5.2 min

As seen from the table, there is a significant difference in performance due to shuffle improvements. Test 2, with half the number of executors that are twice as large as Test 1, ran 29.44% faster, with 1.97 times more shuffle data fetched locally compared to Test 1 for the same query, same parallelism, and same aggregate vCPU and memory resources. Therefore, you can benefit from improved performance without compromising on cost or job parallelism with the help of large executors. We have observed similar performance benefits for other shuffle-intensive TPC-DS queries such as q23a and q23b.


To determine if the large workers will benefit your shuffle-intensive Spark applications, consider the following:

  • Check the Stages tab from the Spark History Server UI of your EMR Serverless application. For example, from the following screenshot of Spark History Server, we can determine that this Spark job wrote and read 167 GB of shuffle data aggregated across 12 stages, looking at the Shuffle Read and Shuffle Write columns. If your jobs shuffle over 50 GB of data, you may potentially benefit from using larger workers with 8 or 16 vCPUs or spark.executor.cores.

  • Check the SQL / DataFrame tab from the Spark History Server UI of your EMR Serverless application (only for Dataframe and Dataset APIs). When you choose the Spark action performed, such as collect, take, showString, or save, you will see an aggregated DAG for all stages separated by the exchanges. Every exchange in the DAG corresponds to a shuffle operation, and it will contain the local and remote bytes and blocks shuffled, as seen in the following screenshot. If the local shuffle blocks or bytes fetched is much less compared to the remote blocks or bytes fetched, you can rerun your application with larger workers (with 8 or 16 vCPUs or spark.executor.cores) and review these exchange metrics in a DAG to see if there is any improvement.

  • Use the Spark Measure tool with your Spark query to obtain the shuffle metrics in the Spark driver’s stdout logs, as shown in the following log for a Spark job. Review the time taken for shuffle reads (shuffleFetchWaitTime) and shuffle writes (shuffleWriteTime), and the ratio of the local bytes fetched to the remote bytes fetched. If the shuffle operation takes more than 2 minutes, rerun your application with larger workers (with 8 or 16 vCPUs or spark.executor.cores) with Spark Measure to track the improvement in shuffle performance and the overall job runtime.
Time taken: 177647 ms

Scheduling mode = FIFO
Spark Context default degree of parallelism = 192

Aggregated Spark stage metrics:
numStages => 22
numTasks => 10156
elapsedTime => 159894 (2.7 min)
stageDuration => 456893 (7.6 min)
executorRunTime => 28418517 (7.9 h)
executorCpuTime => 20276736 (5.6 h)
executorDeserializeTime => 326486 (5.4 min)
executorDeserializeCpuTime => 124323 (2.1 min)
resultSerializationTime => 534 (0.5 s)
jvmGCTime => 648809 (11 min)
shuffleFetchWaitTime => 340880 (5.7 min)
shuffleWriteTime => 245918 (4.1 min)
resultSize => 23199434 (22.1 MB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 1794288453176
recordsRead => 18696929278
bytesRead => 77354154397 (72.0 GB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 14124240761
shuffleTotalBlocksFetched => 5571316
shuffleLocalBlocksFetched => 117321
shuffleRemoteBlocksFetched => 5453995
shuffleTotalBytesRead => 158582120627 (147.7 GB)
shuffleLocalBytesRead => 3337930126 (3.1 GB)
shuffleRemoteBytesRead => 155244190501 (144.6 GB)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffleBytesWritten => 156913371886 (146.1 GB)
shuffleRecordsWritten => 13867102620

Benefits of using large workers for memory-intensive workloads

Certain types of workloads are memory-intensive and may benefit from more memory configured per worker. In this section, we discuss common scenarios where large workers could be beneficial for running memory-intensive workloads.

Data skew

Data skews commonly occur in several types of datasets. Some common examples are fraud detection, population analysis, and income distribution. For example, when you want to detect anomalies in your data, it’s expected that only less than 1% of the data is abnormal. If you want to perform some aggregation on top of normal vs. abnormal records, 99% of the data will be processed by a single worker, which may lead to that worker running out of memory. Data skews may be observed for memory-intensive transformations like groupBy, orderBy, join, window functions, collect_list, collect_set, and so on. Join types such as BroadcastNestedLoopJoin and Cartesan product are also inherently memory-intensive and susceptible to data skews. Similarly, if your input data is Gzip compressed, a single Gzip file can’t be read by more than one task because the Gzip compression type is unsplittable. When there are a few very large Gzip files in the input, your job may run out of memory because a single task may have to read a huge Gzip file that doesn’t fit in the executor memory.

Failures due to data skew can be mitigated by applying strategies such as salting. However, this often requires extensive changes to the code, which may not be feasible for a production workload that failed due to an unprecedented data skew caused by a sudden surge in incoming data volume. For a simpler workaround, you may just want to increase the worker memory. Using larger workers with more spark.executor.memory allows you to handle data skew without making any changes to your application code.


In order to improve performance, Spark allows you to cache the data frames, datasets, and RDDs in memory. This enables you to reuse a data frame multiple times in your application without having to recompute it. By default, up to 50% of your executor’s JVM is used to cache the data frames based on the property spark.memory.storageFraction. For example, if your spark.executor.memory is set to 30 GB, then 15 GB is used for cache storage that is immune to eviction.

The default storage level of cache operation is DISK_AND_MEMORY. If the size of the data frame you are trying to cache doesn’t fit in the executor’s memory, a portion of the cache spills to disk. If there isn’t enough space to write the cached data in disk, the blocks are evicted and you don’t get the benefits of caching. Using larger workers allows you to cache more data in memory, boosting job performance by retrieving cached blocks from memory rather than the underlying storage.


For example, the following PySpark job leads to a skew, with one executor processing 99.95% of the data with memory-intensive aggregates like collect_list. The job also caches a very large data frame (2.2 TB). Let’s run two iterations of the same job on EMR Serverless with the following vCPU and memory configurations.

Let’s run Test 3 with the previously largest possible worker configurations:

  • Size of executor set while creating EMR Serverless application = 4 vCPUs, 30 GB memory, 200 GB disk
  • Spark job config:
    • spark.executor.cores = 4
    • spark.executor.memory = 27 G

Let’s run Test 4 with the newly released large worker configurations:

  • Size of executor set in while creating EMR Serverless application = 8 vCPUs, 60 GB memory, 200 GB disk
  • Spark job config:
    • spark.executor.cores = 8
    • spark.executor.memory = 54 G

Test 3 failed with FetchFailedException, which resulted due to the executor memory not being sufficient for the job.

Also, from the Spark UI of Test 3, we see that the reserved storage memory of the executors was fully utilized for caching the data frames.

The remaining blocks to cache were spilled to disk, as seen in the executor’s stderr logs:

23/02/06 16:06:58 INFO MemoryStore: Will not store rdd_4_1810
23/02/06 16:06:58 WARN MemoryStore: Not enough space to cache rdd_4_1810 in memory! (computed 134.1 MiB so far)
23/02/06 16:06:58 INFO MemoryStore: Memory use = 14.8 GiB (blocks) + 507.5 MiB (scratch space shared across 4 tasks(s)) = 15.3 GiB. Storage limit = 15.3 GiB.
23/02/06 16:06:58 WARN BlockManager: Persisting block rdd_4_1810 to disk instead.

Around 33% of the persisted data frame was cached on disk, as seen on the Storage tab of the Spark UI.

Test 4 with larger executors and vCores ran successfully without throwing any memory-related errors. Also, only about 2.2% of the data frame was cached to disk. Therefore, cached blocks of a data frame will be retrieved from memory rather than from disk, offering better performance.


To determine if the large workers will benefit your memory-intensive Spark applications, consider the following:

  • Determine if your Spark application has any data skews by looking at the Spark UI. The following screenshot of the Spark UI shows an example data skew scenario where one task processes most of the data (145.2 GB), looking at the Shuffle Read size. If one or fewer tasks process significantly more data than other tasks, rerun your application with larger workers with 60–120 G of memory (spark.executor.memory set anywhere from 54–109 GB factoring in 10% of spark.executor.memoryOverhead).

  • Check the Storage tab of the Spark History Server to review the ratio of data cached in memory to disk from the Size in memory and Size in disk columns. If more than 10% of your data is cached to disk, rerun your application with larger workers to increase the amount of data cached in memory.
  • Another way to preemptively determine if your job needs more memory is by monitoring Peak JVM Memory on the Spark UI Executors tab. If the peak JVM memory used is close to the executor or driver memory, you can create an application with a larger worker and configure a higher value for spark.executor.memory or spark.driver.memory. For example, in the following screenshot, the maximum value of peak JVM memory usage is 26 GB and spark.executor.memory is set to 27 G. In this case, it may be beneficial to use larger workers with 60 GB memory and spark.executor.memory set to 54 G.


Although large vCPUs help increase the locality of the shuffle blocks, there are other factors involved such as disk throughput, disk IOPS (input/output operations per second), and network bandwidth. In some cases, more small workers with more disks could offer higher disk IOPS, throughput, and network bandwidth overall compared to fewer large workers. We encourage you to benchmark your workloads against suitable vCPU configurations to choose the best configuration for your workload.

For shuffle-heavy jobs, it’s recommended to use large disks. You can attach up to 200 GB disk to each worker when you create your application. Using large vCPUs (spark.executor.cores) per executor may increase the disk utilization on each worker. If your application fails with “No space left on device” due to the inability to fit shuffle data in the disk, use more smaller workers with 200 GB disk.


In this post, you learned about the benefits of using large executors for your EMR Serverless jobs. For more information about different worker configurations, refer to Worker configurations. Large worker configurations are available in all Regions where EMR Serverless is available.

About the Author

Veena Vasudevan is a Senior Partner Solutions Architect and an Amazon EMR specialist at AWS focusing on big data and analytics. She helps customers and partners build highly optimized, scalable, and secure solutions; modernize their architectures; and migrate their big data workloads to AWS.

Monitor Apache HBase on Amazon EMR using Amazon Managed Service for Prometheus and Amazon Managed Grafana

Post Syndicated from Anubhav Awasthi original https://aws.amazon.com/blogs/big-data/monitor-apache-hbase-on-amazon-emr-using-amazon-managed-service-for-prometheus-and-amazon-managed-grafana/

Amazon EMR provides a managed Apache Hadoop framework that makes it straightforward, fast, and cost-effective to run Apache HBase. Apache HBase is a massively scalable, distributed big data store in the Apache Hadoop ecosystem. It is an open-source, non-relational, versioned database that runs on top of the Apache Hadoop Distributed File System (HDFS). It’s built for random, strictly consistent, real-time access for tables with billions of rows and millions of columns. Monitoring HBase clusters is critical in order to identify stability and performance bottlenecks and proactively preempt them. In this post, we discuss how you can use Amazon Managed Service for Prometheus and Amazon Managed Grafana to monitor, alert, and visualize HBase metrics.

HBase has built-in support for exporting metrics via the Hadoop metrics subsystem to files or Ganglia or via JMX. You can either use AWS Distro for OpenTelemetry or Prometheus JMX exporters to collect metrics exposed by HBase. In this post, we show how to use Prometheus exporters. These exporters behave like small webservers that convert internal application metrics to Prometheus format and serve it at /metrics path. A Prometheus server running on an Amazon Elastic Compute Cloud (Amazon EC2) instance collects these metrics and remote writes to an Amazon Managed Service for Prometheus workspace. We then use Amazon Managed Grafana to create dashboards and view these metrics using an Amazon Managed Service for Prometheus workspace as its data source.

This solution can be extended to other big data platforms such as Apache Spark and Apache Presto that also use JMX to expose their metrics.

Solution overview

The following diagram illustrates our solution architecture.

Solution Architecture

This post uses an AWS CloudFormation template to perform below actions:

  1. Install an open-source Prometheus server on an EC2 instance.
  2. Create appropriate AWS Identity and Access Management (IAM) roles and security group for the EC2 instance running the Prometheus server.
  3. Create an EMR cluster with an HBase on Amazon S3 configuration.
  4. Install JMX exporters on all EMR nodes.
  5. Create additional security groups for the EMR master and worker nodes to connect with the Prometheus server running on the EC2 instance.
  6. Create a workspace in Amazon Managed Service for Prometheus.


To implement this solution, make sure you have the following prerequisites:

aws emr create-default-roles

Deploy the CloudFormation template

Deploy the CloudFormation template in the us-east-1 Region:

Launch Stack

It will take 15–20 minutes for the template to complete. The template requires the following fields:

  • Stack Name – Enter a name for the stack
  • VPC – Choose an existing VPC
  • Subnet – Choose an existing subnet
  • EMRClusterName – Use EMRHBase
  • HBaseRootDir – Provide a new HBase root directory (for example, s3://hbase-root-dir/).
  • MasterInstanceType – Use m5x.large
  • CoreInstanceType – Use m5x.large
  • CoreInstanceCount – Enter 2
  • SSHIPRange – Use <your ip address>/32 (you can go to https://checkip.amazonaws.com/ to check your IP address)
  • EMRKeyName – Choose a key pair for the EMR cluster
  • EMRRleaseLabel – Use emr-6.9.0
  • InstanceType – Use the EC2 instance type for installing the Prometheus server

cloud formation parameters

Enable remote writes on the Prometheus server

The Prometheus server is running on an EC2 instance. You can find the instance hostname in the CloudFormation stack’s Outputs tab for key PrometheusServerPublicDNSName.

  1. SSH into the EC2 instance using the key pair:
    ssh -i <sshKey.pem> ec2-user@<Public IPv4 DNS of EC2 instance running Prometheus server>

  2. Copy the value for Endpoint – remote write URL from the Amazon Managed Service for Prometheus workspace console.

  1. Edit remote_write url in /etc/prometheus/conf/prometheus.yml:
sudo vi /etc/prometheus/conf/prometheus.yml

It should look like the following code:

  1. Now we need to restart the Prometheus server to pick up the changes:
sudo systemctl restart prometheus

Enable Amazon Managed Grafana to read from an Amazon Managed Service for Prometheus workspace

We need to add the Amazon Managed Prometheus workspace as a data source in Amazon Managed Grafana. You can skip directly to step 3 if you already have an existing Amazon Managed Grafana workspace and want to use it for HBase metrics.

  1. First, let’s create a workspace on Amazon Managed Grafana. You can follow the appendix to create a workspace using the Amazon Managed Grafana console or run the following API from your terminal (provide your role ARN):
aws grafana create-workspace \
--account-access-type CURRENT_ACCOUNT \
--authentication-providers AWS_SSO \
--permission-type CUSTOMER_MANAGED \
--workspace-data-sources PROMETHEUS \
--workspace-name emr-metrics \
--workspace-role-arn <role-ARN> \
--workspace-notification-destinations SNS
  1. On the Amazon Managed Grafana console, choose Configure users and select a user you want to allow to log in to Grafana dashboards.

Make sure your IAM Identity Center user type is admin. We need this to create dashboards. You can assign the viewer role to all the other users.

  1. Log in to the Amazon Managed Grafana workspace URL using your admin credentials.
  2. Choose AWS Data Sources in the navigation pane.

  1. For Service, choose Amazon Managed Service for Prometheus.

  1. For Regions, choose US East (N. Virginia).

Create an HBase dashboard

Grafana labs has an open-source dashboard that you can use. For example, you can follow the guidance from the following HBase dashboard. Start creating your dashboard and chose the import option. Provide the URL of the dashboard or enter 12722 and choose Load. Make sure your Prometheus workspace is selected on the next page. You should see HBase metrics showing up on the dashboard.

Key HBase metrics to monitor

HBase has a wide range of metrics for HMaster and RegionServer. The following are a few important metrics to keep in mind.

HMASTER Metric Name Metric Description
. hadoop_HBase_numregionservers Number of live region servers
. hadoop_HBase_numdeadregionservers Number of dead region servers
. hadoop_HBase_ritcount Number of regions in transition
. hadoop_HBase_ritcountoverthreshold Number of regions that have been in transition longer than a threshold time (default: 60 seconds)
. hadoop_HBase_ritduration_99th_percentile Maximum time taken by 99% of the regions to remain in transition state
REGIONSERVER Metric Name Metric Description
. hadoop_HBase_regioncount Number of regions hosted by the region server
. hadoop_HBase_storefilecount Number of store files currently managed by the region server
. hadoop_HBase_storefilesize Aggregate size of the store files
. hadoop_HBase_hlogfilecount Number of write-ahead logs not yet archived
. hadoop_HBase_hlogfilesize Size of all write-ahead log files
. hadoop_HBase_totalrequestcount Total number of requests received
. hadoop_HBase_readrequestcount Number of read requests received
. hadoop_HBase_writerequestcount Number of write requests received
. hadoop_HBase_numopenconnections Number of open connections at the RPC layer
. hadoop_HBase_numactivehandler Number of RPC handlers actively servicing requests
Memstore . .
. hadoop_HBase_memstoresize Total memstore memory size of the region server
. hadoop_HBase_flushqueuelength Current depth of the memstore flush queue (if increasing, we are falling behind with clearing memstores out to Amazon S3)
. hadoop_HBase_flushtime_99th_percentile 99th percentile latency for flush operation
. hadoop_HBase_updatesblockedtime Number of milliseconds updates have been blocked so the memstore can be flushed
Block Cache . .
. hadoop_HBase_blockcachesize Block cache size
. hadoop_HBase_blockcachefreesize Block cache free size
. hadoop_HBase_blockcachehitcount Number of block cache hits
. hadoop_HBase_blockcachemisscount Number of block cache misses
. hadoop_HBase_blockcacheexpresshitpercent Percentage of the time that requests with the cache turned on hit the cache
. hadoop_HBase_blockcachecounthitpercent Percentage of block cache hits
. hadoop_HBase_blockcacheevictioncount Number of block cache evictions in the region server
. hadoop_HBase_l2cachehitratio Local disk-based bucket cache hit ratio
. hadoop_HBase_l2cachemissratio Bucket cache miss ratio
Compaction . .
. hadoop_HBase_majorcompactiontime_99th_percentile Time in milliseconds taken for major compaction
. hadoop_HBase_compactiontime_99th_percentile Time in milliseconds taken for minor compaction
. hadoop_HBase_compactionqueuelength Current depth of the compaction request queue (if increasing, we are falling behind with storefile compaction)
. flush queue length Number of flush operations waiting to be processed in the region server (a higher number indicates flush operations are slow)
IPC Queues . .
. hadoop_HBase_queuesize Total data size of all RPC calls in the RPC queues in the region server
. hadoop_HBase_numcallsingeneralqueue Number of RPC calls in the general processing queue in the region server
. hadoop_HBase_processcalltime_99th_percentile 99th percentile latency for RPC calls to be processed in the region server
. hadoop_HBase_queuecalltime_99th_percentile 99th percentile latency for RPC calls to stay in the RPC queue in the region server
JVM and GC . .
. hadoop_HBase_memheapusedm Heap used
. hadoop_HBase_memheapmaxm Total heap
. hadoop_HBase_pausetimewithgc_99th_percentile Pause time in milliseconds
. hadoop_HBase_gccount Garbage collection count
. hadoop_HBase_gctimemillis Time spent in garbage collection, in milliseconds
Latencies . .
. HBase.regionserver.<op>_<measure> Operation latencies, where <op> is Append, Delete, Mutate, Get, Replay, or Increment, and <measure> is min, max, mean, median, 75th_percentile, 95th_percentile, or 99th_percentile
. HBase.regionserver.slow<op>Count Number of operations we thought were slow, where <op> is one of the preceding list
Bulk Load . .
. Bulkload_99th_percentile hadoop_HBase_bulkload_99th_percentile
I/O . .
. FsWriteTime_99th_percentile hadoop_HBase_fswritetime_99th_percentile
. FsReadTime_99th_percentile hadoop_HBase_fsreadtime_99th_percentile
Exceptions . .
. exceptions.RegionTooBusyException .
. exceptions.callQueueTooBig .
. exceptions.NotServingRegionException .

Considerations and limitations

Note the following when using this solution:

  • You can set up alerts on Amazon Managed Service for Prometheus and visualize them in Amazon Managed Grafana.
  • This architecture can be easily extended to include other open-source frameworks such as Apache Spark, Apache Presto, and Apache Hive.
  • Refer to the pricing details for Amazon Managed Service for Prometheus and Amazon Managed Grafana.
  • These scripts are for guidance purposes only and aren’t ready for production deployments. Make sure to perform thorough testing.

Clean up

To avoid ongoing charges, delete the CloudFormation stack and workspaces created in Amazon Managed Grafana and Amazon Managed Service for Prometheus.


In this post, you learned how to monitor EMR HBase clusters and set up dashboards to visualize key metrics. This solution can serve as a unified monitoring platform for multiple EMR clusters and other applications. For more information on EMR HBase, see Release Guide and HBase Migration whitepaper.


Complete the following steps to create a workspace on Amazon Managed Grafana:

  1. Log in to the Amazon Managed Grafana console and choose Create workspace.

  1. For Authentication access, select AWS IAM Identity Center.

If you don’t have IAM Identity Center enabled, refer to Enable IAM Identity Center.

  1. Optionally, to view Prometheus alerts in your Grafana workspace, select Turn Grafana alerting on.

  1. On the next page, select Amazon Managed Service for Prometheus as the data source.

  1. After the workspace is created, assign users to access Amazon Managed Grafana.

  1. For a first-time setup, assign admin privileges to the user.

You can add other users with only viewer access.

Make sure you are able to log in to the Grafana workspace URL using your IAM Identity Center user credentials.

About the Author

Anubhav Awasthi is a Sr. Big Data Specialist Solutions Architect at AWS. He works with customers to provide architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation.

Deep dive into the AWS ProServe Hadoop Migration Delivery Kit TCO tool

Post Syndicated from Jiseong Kim original https://aws.amazon.com/blogs/big-data/deep-dive-into-the-aws-proserve-hadoop-migration-delivery-kit-tco-tool/

In the post Introducing the AWS ProServe Hadoop Migration Delivery Kit TCO tool, we introduced the AWS ProServe Hadoop Migration Delivery Kit (HMDK) TCO tool and the benefits of migrating on-premises Hadoop workloads to Amazon EMR. In this post, we dive deep into the tool, walking through all steps from log ingestion, transformation, visualization, and architecture design to calculate TCO.

Solution overview

Let’s briefly visit the HMDK TCO tool’s key features. The tool provides a YARN log collector to connect Hadoop Resource Manager to collect YARN logs. A Python-based Hadoop workload analyzer, called the YARN log analyzer, scrutinizes Hadoop applications. Amazon QuickSight dashboards showcase the results from the analyzer. The same results also accelerate the design of future EMR instances. Additionally, a TCO calculator generates the TCO estimation of an optimized EMR cluster for facilitating the migration.

Now let’s look at how the tool works. The following diagram illustrates the end-to-end workflow.

In the next sections, we walk through the five main steps of the tool:

  1. Collect YARN job history logs.
  2. Transform the job history logs from JSON to CSV.
  3. Analyze the job history logs.
  4. Design an EMR cluster for migration.
  5. Calculate the TCO.


Before getting started, make sure to complete the following prerequisites:

  1. Clone the hadoop-migration-assessment-tco repository.
  2. Install Python 3 on your local machine.
  3. Have an AWS account with permission on AWS Lambda, QuickSight (Enterprise edition), and AWS CloudFormation.

Collect YARN job history logs

First, you run a YARN log collector, start-collector.sh, on your local machine. This step collects Hadoop YARN logs and places the logs on your local machine. The script connects your local machine with the Hadoop primary node and communicates with Resource Manager. Then it retrieves the job history information (YARN logs from application managers) by calling the YARN ResourceManager application API.

Prior to running the YARN log collector, you need to configure and establish the connection (HTTP: 8088 or HTTPS: 8090; the latter is recommended) to verify the accessibility of YARN ResourceManager and enabled YARN Timeline Server (Timeline Server v1 or later are supported). You may need to define the YARN logs’ collection interval and retention policy. To ensure that you collect consecutive YARN logs, you can use a cron job to schedule the log collector in a proper time interval. For example, for a Hadoop cluster with 2,000 daily applications and the setting yarn.resourcemanager.max-completed-applications set to 1,000, theoretically, you have to run the log collector at least twice to get all the YARN logs. In addition, we recommend collecting at least 7 days of YARN logs for analyzing holistic workloads.

For more details on how to configure and schedule the log collector, refer to the yarn-log-collector GitHub repo.

Transform the YARN job history logs from JSON to CSV

After obtaining YARN logs, you run a YARN log organizer, yarn-log-organizer.py, which is a parser to transform JSON-based logs to CSV files. These output CSV files are the inputs for the YARN log analyzer. The parser also has other capabilities, including sorting events by time, removing dedicates, and merging multiple logs.

For more information on how to use the YARN log organizer, refer to the yarn-log-organizer GitHub repo.

Analyze the YARN job history logs

Next, you launch the YARN log analyzer to analyze the YARN logs in CSV format.

With QuickSight, you can visualize YARN log data and conduct analysis against the datasets generated by pre-built dashboard templates and a widget. The widget automatically creates QuickSight dashboards in the target AWS account, which configured in a CloudFormation template.

The following diagram illustrates the HMDK TCO architecture.

The YARN log analyzer provides four key functionalities:

  1. Upload transformed YARN job history logs in CSV format (for example, cluster_yarn_logs_*.csv) to Amazon Simple Storage Service (Amazon S3) buckets. These CSV files are the outputs from the YARN log organizer.
  2. Create a manifest JSON file (for example, yarn-log-manifest.json) for QuickSight and upload it to the S3 bucket:
        "fileLocations": [ { 
            "URIPrefixes": [
        } ], 
        "globalUploadSettings": { 
            "format": "CSV", 
            "delimiter": ",", 
            "textqualifier": "'", 
            "containsHeader": "true" 

  3. Deploy QuickSight dashboards using a CloudFormation template, which is in YAML format. After deploying, choose the refresh icon until you see the stack’s status as CREATE_COMPLETE. This step creates datasets on QuickSight dashboards in your AWS target account.
  4. On the QuickSight dashboard, you can find insights of the analyzed Hadoop workloads from various charts. These insights help you design future EMR instances for migration acceleration, as demonstrated in the next step.

Design an EMR cluster for migration

The results of the YARN log analyzer help you understand the actual Hadoop workloads on the existing system. This step accelerates designing future EMR instances for migration by using an Excel template. The template contains a checklist for conducting workload analysis and capacity planning:

  • Are the applications running on the cluster being used appropriately with their current capacity?
  • Is the cluster under load at a certain time or not? If so, when is the time?
  • What types of applications and engines (such as MR, TEZ, or Spark) are running on the cluster, and what is the resource usage for each type?
  • Are different jobs’ run cycles (real-time, batch, ad hoc) running in one cluster?
  • Are any jobs running in regular batches, and if so, what are these schedule intervals? (For example, every 10 minutes, 1 hour, 1 day.) Do you have jobs that use a lot of resources during a long time period?
  • Do any jobs need performance improvement?
  • Are any specific organizations or individuals monopolizing the cluster?
  • Are any mixed development and operation jobs operating in one cluster?

After you complete the checklist, you’ll have a better understanding of how to design the future architecture. For optimizing EMR cluster cost effectiveness, the following table provides general guidelines of choosing the proper type of EMR cluster and Amazon Elastic Compute Cloud (Amazon EC2) family.

To choose the proper cluster type and instance family, you need to perform several rounds of analysis against YARN logs based on various criteria. Let’s look at some key metrics.


You can find workload patterns based on the number of Hadoop applications run in a time window. For example, the daily or hourly charts “Count of Records by Startedtime” provide the following insights:

  • In daily time series charts, you compare the number of application runs between working days and holidays, and among calendar days. If the numbers are similar, it means the daily utilizations of the cluster are comparable. On the other hand, if the deviation is large, the proportion of ad hoc jobs is significant. You also can figure out the possible weekly or monthly jobs on particular days. In the situation, you can easily see specific days in a week or a month with high workload concentration.
  • In hourly time series charts, you further understand how applications are run in hourly windows. You can find peak and off-peak hours in a day.


The YARN logs contain the user ID of each application. This information helps you understand who submits an application to a queue. Based on the statistics of individual and aggregated application runs per queue and per user, you can determine the existing workload distribution by user. Usually, users at the same team have shared queues. Sometime, multiple teams have shared queues. When designing queues for users, you now have insights to help you design and distribute application workloads that are more balanced across queues than they previously were.

Application types

You can segment workloads based on various application types (such as Hive, Spark, Presto, or HBase) and run engines (such as MR, Spark, or Tez). For the compute-heavy workloads such as MapReduce or Hive-on-MR jobs, use CPU-optimized instances. For memory-intensive workloads such as Hive-on-TEZ, Presto, and Spark jobs, use memory-optimized instances.


You can categorize applications by runtime. The embedded CloudFormation template automatically creates an elapsedGroup field in a QuickSight dashboard. This enables a key feature to allow you to observe long-running jobs in one of four charts on QuickSight dashboards. Therefore, you can design tailored future architectures for these large jobs.

The corresponding QuickSight dashboards include four charts. You can drill down each chart, which is associated to one group.

Runtime/Elapsed Time of a Job
1 Less than 10 minutes
2 Between 10 minutes and 30 minutes
3 between 30 minutes and 1 hour
4 Greater than 1 hour

In the chart of Group 4, you can concentrate on scrutinizing large jobs based on various metrics, including user, queue, application type, timeline, resource usage, and so on. Based on this consideration, you may have dedicated queues on a cluster or a dedicated EMR cluster for large jobs. Meanwhile, you may submit small jobs to shared queues.


Based on resource (CPU, memory) consumption patterns, you choose the right size and family of EC2 instances for performance and cost effectiveness. For compute-intensive applications, we recommend instances of CPU-optimized families. For memory-intensive applications, the memory-optimized instance families are recommended.

In addition, based on the nature of the application workloads and resource utilization over the time, you may choose a persistent or transient EMR cluster, Amazon EMR on EKS, or Amazon EMR Serverless.

After analyzing YARN logs by various metrics, you’re ready to design future EMR architectures. The following table lists examples of proposed EMR clusters. You can find more details in the optimized-tco-calculator GitHub repo.

Calculate TCO

Finally, on your local machine, run tco-input-generator.py to aggregate YARN job history logs on an hourly basis prior to using an Excel template to calculate the optimized TCO. This step is crucial because the results simulate the Hadoop workloads in future EMR instances.

The prerequisite of TCO simulation is to run tco-input-generator.py, which generates hourly aggregated logs. Next, you open an Excel template file to enable macros and provide your inputs in green cells for calculating the TCO. Regarding the input data, you enter the actual data size without replication, and the hardware specifications (vCore, mem) of the Hadoop primary node and data nodes. You also need to select and upload previously generated hourly aggregated logs. After you set the TCO simulation variables, such as Region, EC2 type, Amazon EMR high availability, engine effect, Amazon EC2 and Amazon EBS discount (EDP), Amazon S3 volume discount, local currency rate, and EMR EC2 task/core pricing ratio and price/hour, the TCO simulator automatically calculates the optimum cost of future EMR instances on Amazon EC2. The following screenshots show an example of HMDK TCO results.

For additional information and instructions of HMDK TCO calculations, refer to the optimized-tco-calculator GitHub repo.

Clean up

After you complete all the steps and finish testing, complete the following steps to delete resources to avoid incurring costs:

  1. On the AWS CloudFormation console, choose the stack you created.
  2. Choose Delete.
  3. Choose Delete stack.
  4. Refresh the page until you see the status DELETE_COMPLETE.
  5. On the Amazon S3 console, delete S3 bucket you created.


The AWS ProServe HMDK TCO tool significantly reduces migration planning efforts, which are the time-consuming and challenging tasks of assessing your Hadoop workloads. With the HMDK TCO tool, the assessment usually takes 2–3 weeks. You can also determine the calculated TCO of future EMR architectures. With the HMDK TCO tool, you are able to quickly understand your workloads and resource usage patterns. With the insights generated by the tool, you are equipped to design optimal future EMR architectures. In many use cases, a 1-year TCO of the optimized refactored architecture provides significant cost savings (64–80% reduction) on compute and storage, compared to lift-and-shift Hadoop migrations.

To learn more about accelerating your Hadoop migrations to Amazon EMR and the HMDK CTO tool, refer to the Hadoop Migration Delivery Kit TCO GitHub repo, or reach out to [email protected].

About the authors

Sungyoul Park is a Senior Practice Manager at AWS ProServe. He helps customers innovate their business with AWS Analytics, IoT, and AI/ML services. He has a specialty in big data services and technologies and an interest in building customer business outcomes together.

Jiseong Kim is a Senior Data Architect at AWS ProServe. He mainly works with enterprise customers to help data lake migration and modernization, and provides guidance and technical assistance on big data projects such as Hadoop, Spark, data warehousing, real-time data processing, and large-scale machine learning. He also understands how to apply technologies to solve big data problems and build a well-designed data architecture.

George Zhao is a Senior Data Architect at AWS ProServe. He is an experienced analytics leader working with AWS customers to deliver modern data solutions. He is also a ProServe Amazon EMR domain specialist who enables ProServe consultants on best practices and delivery kits for Hadoop to Amazon EMR migrations. His area of interests are data lakes and cloud modern data architecture delivery.

Kalen Zhang was the Global Segment Tech Lead of Partner Data and Analytics at AWS. As a trusted advisor of data and analytics, she curated strategic initiatives for data transformation, led data and analytics workload migration and modernization programs, and accelerated customer migration journeys with partners at scale. She specializes in distributed systems, enterprise data management, advanced analytics, and large-scale strategic initiatives.

Introducing the AWS ProServe Hadoop Migration Delivery Kit TCO tool

Post Syndicated from George Zhao original https://aws.amazon.com/blogs/big-data/introducing-the-aws-proserve-hadoop-migration-delivery-kit-tco-tool/

When migrating Hadoop workloads to Amazon EMR, it’s often difficult to identify the optimal cluster configuration without analyzing existing workloads by hand. To solve this, we’re introducing the Hadoop migration assessment Total Cost of Ownership (TCO) tool. You now have a Hadoop migration assessment TCO tool within the AWS ProServe Hadoop Migration Delivery Kit (HMDK). The self-serve HMDK TCO tool accelerates the design of new cost-effective Amazon EMR clusters by analyzing the existing Hadoop workload and calculating the total cost of the ownership (TCO) running on the future Amazon EMR system. The Amazon EMR TCO report with the new Amazon EMR design can demonstrate the Amazon EMR migration with detailed cost saving and business benefits.

In this post, we introduce a use case and the functions and components of the tool. We also share case studies to show you the benefits of using the tool. Finally, we show you the technical information to use the tool.

Use case overview

Migrating Hadoop workloads to Amazon EMR accelerates big data analytics modernization, increases productivity, and reduces operational cost. Refactoring coupled compute and storage to a decoupling architecture is a modern data solution. It enables compute such as EMR instances and storage such as Amazon Simple Storage Service (Amazon S3) data lakes to scale. For various Hadoop jobs, customers have bespoke deployment options of fully managed Amazon EMR, Amazon EMR on Amazon EKS, and EMR Serverless. The optimized future EMR cluster yields the same results and values with much lower TCO compared to the source Hadoop cluster. But we need a TCO report to showcase the cost saving details, as shown in the following figure.

Typically, the commencement of a Hadoop migration needs Hadoop experts to spend weeks or even months to assess current Hadoop cluster workloads towards a plan for subsequent migration. This could delay the project from being accepted without a good TCO report.

To accelerate Hadoop migrations and mitigate the workload assessment efforts by SMEs, AWS ProServe created the Hadoop migration assessment TCO tool within the AWS ProServe Hadoop Migration Delivery Kit.

Introduction to the HMDK TCO tool

As a Hadoop migration accelerator, the HMDK TCO tool has three components:

  • YARN log collector – Retrieves the existing workload logs from YARN Resource Manager
  • YARN log analyzer – Provides a deep time-based insight on different aspects of the jobs
  • TCO calculator – Generates a 3-year or 1-year TCO calculated automatically

The self-serve HMDK TCO tool is available for download on GitHub.

Using the tool consists of three steps:

  1. First, the YARN Log collector communicates with the current Hadoop system to retrieve YARN logs.
  2. With the collected YARN logs, the next step is to use the YARN log analyzer and set up the log analyzer stack using AWS CloudFormation. The results of the log analyzer reveal Hadoop workload insights with various views and metrics of the Hadoop applications shown in Amazon QuickSight dashboards, which leads to the design of a future EMR cluster.
  3. Lastly, the TCO calculator generates the TCO report by simulating hourly resource usage of a future EMR cluster. To accelerate Hadoop migration assessment, the TCO report provides crucial information and values for your business stakeholders to make a buy-in decision.

The following diagram illustrates this architecture.

The Hadoop workload insights enable you to design a well-architected EMR cluster to achieve performance and cost-effectiveness in an agile way. For conducting well-architected designs, you need to deliberate between various system specifications of an EMR cluster and multiple cost considerations.

The system specifications are as follows:

  • Number of EMR clusters – Amazon EMR enables you to run multiple elastic clusters in the AWS Cloud to serve the same purpose of a shared static Hadoop cluster on premises
  • Types of EMR cluster (persistent or transient) – Design your system to keep minimum persistent clusters to save cost
  • Instance types and configuration (memory, vCore, and so on) – Choose the right instance for your job
  • Resource allocation for applications and cluster utilization – Based on the on-premises workload analysis, design effective resource allocation and efficient resource utilization in future EMR clusters

The cost considerations are as follows:

  • Latest price list (from thousands of available EC2 instances available) – The HMDK TCO tool makes the price calculation with Amazon Elastic Compute Cloud (Amazon EC2) instance types, configurations, and their prices.
  • Amazon S3 storage cost (standard, Glacier, and so on) – Data replication is no longer required for reliability. You can use tired storage in Amazon S3 for cost savings.

YARN log collector

The HMDK TCO tool enables a simple way to capture Hadoop YARN logs, which include the Hadoop job runs statistics and the corresponding resource usages. The following screenshot is an example of a YARN log.

The tool supports HTTPS protocol to communicate with YARN Resource Manager. The tool transports the JSON YARN logs as the inputs to a Python parser, which converts the YARN logs from JSON to CSV format. The new CSV formatted logs are the standard input files for the YARN log analyzer.

For more information, see the GitHub repo.

YARN log analyzer and optimized design use cases

With the log, we can follow up the steps in the TCO yarn-log-analysis README file to use AWS CloudFormation to set up QuickSight resources.

The HMDK TCO log analyzer generates a QuickSight dashboard on various metrics:

  • Job timeline – How many jobs are running at one time
  • Job user – Breakdown of users and queues
  • Application type and engine type – Breakdown by application types (Spark, Hive, Presto) and run engine type (MapReduce, Spark, Tez)
  • Elapsed time – The time span of completing an application
  • Resources – Memory and CPU

The following screenshot shows an example dashboard.

The QuickSight dashboards exhibit insights based on consecutive YARN logs collected in a long-enough period of time (for example, a 2-week window). The insights from the logs reveal the application types, users, queues, running cadence, time spans, and resource usages. The data also helps you discover daily batch jobs or ad hoc jobs, long-running jobs, and resource consumption. These insights help you design the right clusters, such as transient clusters or baseline permanent clusters, and choose the right EC2 instance for memory- or compute-intensive jobs. With the log analyzer results, the TCO tool automatically calculates the TCO of a future EMR cluster.

Let’s see some real customer use cases in the following sections.

Case 1: Use transient and persistent clusters wisely

For this use case, a customer in the financial sector has an 11-node Hadoop cluster.

The QuickSight timeline dashboard shows the peak time job runs because of the daily batch job. This guides us to design two clusters for fulfilling the existing workloads. When we keep a persistent cluster at a minimal size, we can have the transient EMR cluster to handle the batch style job around the peak time.

Therefore, we designed the clusters to have a persistent cluster with 2 data nodes, while transient nodes can scale from 0–10 between the hours of 1:00 AM and 4:00 AM.

The following figure illustrates this design.

This balanced design using transient and persistent clusters resulted in a cost savings of about 80% compared to a lift-and-shift design.

Case 2: Identify Hadoop queue usage and long-running jobs to design multiple clusters and optimized runs

For our next use case, a company runs 196 nodes using Hadoop 3.1 with jobs like Hive, Spark, and Kafka. The Hadoop default queue and four other queues were used to group various workloads. As illustrated in the following figure, some very long-running jobs are seen in the shared cluster, resulting in queued jobs that have resource competition and unbalanced resource allocation.

The QuickSight user dashboard guides us through the queue usage, the elapsed time dashboard guides us through the long-running jobs, and the resource dashboard guides us through the memory and vCore usage for the jobs.

Therefore, we design a solution to transfer queue jobs to run in separated clusters, and the default queue jobs are split to run in different clusters. By identifying the long-running jobs and understanding the resource needs, we could design a cluster to run such jobs more efficiently.

This design allows the job to run faster and the clusters to be used more efficiently with a cost savings benefit.

Cluster design

The HMDK TCO tool provides a cluster design template like the following example.

Here we have two clusters, one transient and one persistent, to handle the Spark and Tez jobs accordingly. The starting and ending hour for each cluster can be determined from the log analysis. With this cluster design, we can get the hourly workload resource usage forecast. Then the TCO calculator gets all the information needed to generate costs based on the TCO simulation variables you choose.

TCO calculator

The HMDK TCO calculator is a component guiding the EMR cluster design by using the EMR design template. Then it generates the hourly aggregated resource usage forecast using a Python program. The component provides guidelines and an Excel template to input system and cost specification parameters. The component has the logic with a built-in Amazon EMR price list. The 1-year and 3-year TCO cost can be automatically generated by the macro-enabled Excel TCO template.

The following figure shows the details of our HMDK TCO simulation.

The following figures show the TCO report.

TCO tool engagement outcomes

In this section, we share some of the engagement outcomes from customers after using the TCO tool for 1–2 weeks. Additionally, with the TCO tool, we can refactor on-premises Hadoop clusters to EMR clusters utilizing Amazon S3 as a data lake. The modern data solution of migrating to Amazon EMR provides unlimited scalability with operational efficiency and cost savings.

The following table illustrates four case studies of some engagements using the tool.

Case# Case Description Engagement Outcome
1 Pressured by the Hadoop License, they migrated to AWS using Amazon EMR and used Spark for replacing Hive. They designed the new EMR clusters using a balanced design of transient and persistent clusters. They can get job insights through the tool and design the new EMR clusters to fulfill the existing workloads, and expect to achieve 80% cost savings and six times performance enhancement.
2 Their goal was to migrate a Hadoop cluster with over 1,000 nodes from HDFS to Amazon S3 and Hive to Spark, and redesign the cluster using a balanced design of transient and persistent clusters. They can get job insights and redesign the cluster with a 1-year TCO of the optimized redesign architecture expected to have 64% cost savings.
3 Their goal was to migrate to Hadoop 3.1. They transferred the Hadoop queue-based job, which shared the same cluster, to two transient clusters and five persistent clusters with optimized resource usage for each job run, and handled long-running jobs faster. They can get Amazon EMR TCO results quickly in 2 weeks. Customers get insights on their workloads and long-running jobs and get the job done faster and cheaper.
4 Their goal was to migrate from Hive 1 to Spark and design an auto scaling EMR cluster. They can get Amazon EMR TCO results in 1 week. They’re expecting to see 75% cost savings on the redesigned EMR clusters and 10 times on performance improvement.


This post introduced use cases, functions, and components of the HMDK TCO tool. Through the case studies discussed in this post, you learned about real examples of the tool usage and its benefits. The HMDK TCO tool is designed for automating source Hadoop cluster workload assessment with calculated TCO calculation, and it can be done in 2–3 weeks instead of months.

More and more customers are adopting the HMDK TCO tool to accelerate their migration to Amazon EMR.

To dive deep into the HMDK TCO tool, refer to the next post in this series, How AWS ProServe Hadoop TCO tool accelerate Hadoop workload migrations to Amazon EMR.

About the authors

Sungyoul Park is a Senior Practice Manager at AWS ProServe. He helps customers innovate their business with AWS Analytics, IoT, and AI/ML services. He has a specialty in big data services and technologies and an interest in building customer business outcomes together.

Jiseong Kim is a Senior Data Architect at AWS ProServe. He mainly works with enterprise customers to help data lake migration and modernization, and provides guidance and technical assistance on big data projects such as Hadoop, Spark, data warehousing, real-time data processing, and large-scale machine learning. He also understands how to apply technologies to solve big data problems and build a well-designed data architecture.

George Zhao is a Senior Data Architect at AWS ProServe. He is an experienced analytics leader working with AWS customers to deliver modern data solutions. He is also a ProServe Amazon EMR domain specialist who enables ProServe consultants on best practices and delivery kits for Hadoop to Amazon EMR migrations. His area of interests are data lakes and cloud modern data architecture delivery.

Kalen Zhang was the Global Segment Tech Lead of Partner Data and Analytics at AWS. As a trusted advisor of data and analytics, she curated strategic initiatives for data transformation, led data and analytics workload migration and modernization programs, and accelerated customer migration journeys with partners at scale. She specializes in distributed systems, enterprise data management, advanced analytics, and large-scale strategic initiatives.

Improve observability across Amazon MWAA tasks

Post Syndicated from Payal Singh original https://aws.amazon.com/blogs/big-data/improve-observability-across-amazon-mwaa-tasks/

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow that makes it simple to set up and operate end-to-end data pipelines in the cloud at scale. A data pipeline is a set of tasks and processes used to automate the movement and transformation of data between different systems.­ The Apache Airflow open-source community provides over 1,000 pre-built operators (plugins that simplify connections to services) for Apache Airflow to build data pipelines. The Amazon provider package for Apache Airflow comes with integrations for over 31 AWS services, such as Amazon Simple Storage Service (Amazon S3), Amazon Redshift, Amazon EMR, AWS Glue, Amazon SageMaker, and more.

The most common use case for Airflow is ETL (extract, transform, and load). Nearly all Airflow users implement ETL pipelines ranging from simple to complex. Operationalizing machine learning (ML) is another growing use case, where data has to be transformed and normalized before it can be loaded into an ML model. In both use cases, the data pipeline is preparing the data for consumption by ingesting data from different sources and transforming it through a series of steps.

Observability across the different processes within the data pipeline is a key component to monitor the success or failure of the pipeline. Although scheduling the runs of tasks within the data pipeline is controlled by Airflow, the run of the task itself (transforming, normalizing, and aggregating data) is done by different services based on the use case. Having an end-to-end view of the data flow is a challenge due to multiple touch points in the data pipeline.

In this post, we provide an overview of logging enhancements when working with Amazon MWAA, which is one of the pillars of observability. We then discuss a solution to further enhance end-to-end observability by modifying the task definitions that make up the data pipeline. For this post, we focus on task definitions for two services: AWS Glue and Amazon EMR­, however the same method can be applied across different services.


Many customers’ data pipelines start simple, orchestrating a few tasks, and over time grow to be more complex, consisting of a large number of tasks and dependencies between them. As the complexity increases, it becomes increasingly hard to operate and debug in case of failure, which creates a need for a single pane of glass to provide end-to-end data pipeline orchestration and health management. For data pipeline orchestration, the Apache Airflow UI is a user-friendly tool that provides detailed views into your data pipeline. When it comes to pipeline health management, each service that your tasks are interacting with could be storing or publishing logs to different locations, such as an S3 bucket or Amazon CloudWatch logs. As the number of integration touch points increases, stitching the distributed logs generated by different services in various locations can be challenging.

One solution provided by Amazon MWAA to consolidate the Airflow and task logs within the directed acyclic graph (DAG) is to forward the logs to CloudWatch log groups. A separate log group is created for each enabled Airflow logging option (For example, DAGProcessing, Scheduler, Task, WebServer, and Worker). These logs can be queried across log groups using CloudWatch Logs Insights.

A common approach in distributed tracing is to use a correlation ID to stitch and query distributed logs. A correlation ID is a unique identifier that is passed through a request flow for tracking a sequence of activities throughout the lifetime of the workflow. When each service in the workflow needs to log information, it can include this correlation ID, thereby ensuring you can track a full request from start to finish.

The Airflow engine passes a few variables by default that are accessible to all templates. run_id is one such variable, which is a unique identifier for a DAG run. The run_id can be used as the correlation ID to query against different log groups within CloudWatch to capture all the logs for a particular DAG run.

However, be aware that services that your tasks are interacting with will use a separate log group and won’t log the run_id as part of their output. This will prevent you from getting an end-to-end view across the DAG run.

For example, if your data pipeline consists of an AWS Glue task running a Spark job as part of the pipeline, then the Airflow task logs will be available in one CloudWatch log group and the AWS Glue job logs will be in a different CloudWatch log group. However, the Spark job that is run as part of the AWS Glue job doesn’t have access to the correlation ID and can’t be tied back to a particular DAG run. So even if you use the correlation ID to query the different CloudWatch log groups, you won’t get any information about the run of the Spark job.

Solution overview

As you now know, run_id is a variable that is a unique identifier for a DAG run. The run_id is present as part of the Airflow task logs. To use the run_id effectively and increase the observability across the DAG run, we use run_id as the correlation ID and pass it to different tasks with the DAG. The correlation ID is then be consumed by the scripts used within the tasks.

The following diagram illustrates the solution architecture.

Architecture Diagram

The data pipeline that we focus on consists of the following components:

  • An S3 bucket that contains the source data
  • An AWS Glue crawler that creates the table metadata in the Data Catalog from the source data
  • An AWS Glue job that transforms the raw data into a processed data format while performing file format conversions
  • An EMR job that generates reporting datasets

For details on the architecture and complete steps on how to run the DAG refer, to Amazon MWAA for Analytics Workshop.

In the next sections, we explore the following topics:

  • The DAG file, in order to understand how to define and then pass the correlation ID in the AWS Glue and EMR tasks
  • The code needed in the Python scripts to output information based on the correlation ID

Refer to the GitHub repo for the detailed DAG definition and Spark scripts. To run the scripts, refer to the Amazon MWAA analytics workshop.

DAG definitions

In this section, we look at snippets of the additions needed to the DAG file. We also discuss how to pass the correlation ID to the AWS Glue and EMR jobs. Refer to the GitHub repo for the complete DAG code.

The DAG file begins by defining the variables:

# Variables

correlation_id = “{{ run_id }}” 
dag_name = “data_pipeline” 
S3_BUCKET_NAME = “airflow_data_pipeline_bucket”

Next, let’s look at how to pass the correlation ID to the AWS Glue job using the AWS Glue operator. Operators are the building blocks of Airflow DAGs. They contain the logic of how data is processed in the data pipeline. Each task in a DAG is defined by instantiating an operator.

Airflow provides operators for different tasks. For this post, we use the AWS Glue operator.

The AWS Glue task definition contains the following:

  • The Python Spark job script (raw_to_tranform.py) to run the job
  • The DAG name, task ID, and correlation ID, which are passed as arguments
  • The AWS Glue service role assigned, which has permissions to run the crawler and the jobs

See the following code:

# Glue Task definition

glue_task = AwsGlueJobOperator(
    script_args={‘--dag_name’: dag_name,
                 ‘--task_id’: ‘glue_task’,
                 ‘--correlation_id’: correlation_id},

Next, we pass the correlation ID to the EMR job using the EMR operator. This includes the following steps:

  1. Define the configuration of an EMR cluster.
  2. Create the EMR cluster.
  3. Define the steps to be run by the EMR job.
  4. Run the EMR job:
    1. We use the Python Spark job script aggregations.py.
    2. We pass the DAG name, task ID, and correlation ID as arguments to the steps for the EMR task.

Let’s start with defining the configuration for the EMR cluster. The correlation_id is passed in the name of the cluster to easily identify the cluster corresponding to a DAG run. The logs generated by EMR jobs are published to a S3 bucket; the correlation_id is part of the LogUri as well. See the following code:

# Define the EMR cluster configuration

    "Name": dag_name + "." + emr_task_id + "-" + correlation_id,
    "ReleaseLabel": "emr-5.29.0",
    "LogUri": "s3://{}/logs/emr/{}/{}/{}".format(S3_BUCKET_NAME, dag_name, emr_task_id, correlation_id),
    "Instances": {
      "InstanceGroups": [{
         "Name": "Master nodes",
         "Market": "ON_DEMAND",
         "InstanceRole": "MASTER",
         "InstanceType": "m5.xlarge",
         "InstanceCount": 1
         "Name": "Slave nodes",
         "Market": "ON_DEMAND",
         "InstanceRole": "CORE",
         "InstanceType": "m5.xlarge",
         "InstanceCount": 2
       "TerminationProtected": False,
       "KeepJobFlowAliveWhenNoSteps": True

Now let’s define the task to create the EMR cluster based on the configuration:

# Create the EMR cluster

cluster_creator = EmrCreateJobFlowOperator(
    task_id= emr_task_id,

Next, let’s define the steps needed to run as part of the EMR job. The input and output data processed by the EMR job is stored in an S3 bucket passed as arguments. Dag_name, task_id, and correlation_id are also passed in as arguments. The task_id used can be the name of your choice; here we use add_steps:

# EMR steps to be executed by EMR cluster

    'Name': 'Run Spark',
    'ActionOnFailure': 'CANCEL_AND_WAIT',
    'HadoopJarStep': {
        'Jar': 'command-runner.jar',
        'Args': ['spark-submit',

Next, let’s add a task to run the steps on the EMR cluster. The job_flow_id is the ID of the JobFlow, which is passed down from the EMR create task described earlier using Airflow XComs. See the following code:

#Run the EMR job

step_adder = EmrAddStepsOperator(
    job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster', key='return_value') }}",      

This completes the steps needed to pass the correlation ID within the DAG task definition.

In the next section, we use this ID within the script run to log details.

Job script definitions

In this section, we review the changes required to log information based on the correlation_id. Let’s start with the AWS Glue job script (for the complete code, refer to the following file in GitHub):

# Script changes to file ‘raw_to_transform’

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME','dag_name','task_id','correlation_id'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
logger = glueContext.get_logger()
correlation_id = args['dag_name'] + "." + args['task_id'] + " " + args['correlation_id']
logger.info("Correlation ID from GLUE job: " + correlation_id)

Next, we focus on the EMR job script (for the complete code, refer to the file in GitHub):

# Script changes to file ‘nyc_aggregations’

from __future__ import print_function
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum

if __name__ == "__main__":
    if len(sys.argv) != 6:
        Usage: nyc_aggregations.py <s3_input_path> <s3_output_path> <dag_name> <task_id> <correlation_id>
        """, file=sys.stderr)
    input_path = sys.argv[1]
    output_path = sys.argv[2]
    dag_task_name = sys.argv[3] + "." + sys.argv[4]
    correlation_id = dag_task_name + " " + sys.argv[5]
    spark = SparkSession\
    sc = spark.sparkContext
    log4jLogger = sc._jvm.org.apache.log4j
    logger = log4jLogger.LogManager.getLogger(dag_task_name)
    logger.info("Spark session started: " + correlation_id)

This completes the steps for passing the correlation ID to the script run.

After we complete the DAG definitions and script additions, we can run the DAG. Logs for a particular DAG run can be queried using the correlation ID. The correlation ID for a DAG run can be found via the Airflow UI. An example of a correlation ID is manual__2022-07-12T00:22:36.111190+00:00. With this unique string, we can run queries on the relevant CloudWatch log groups using CloudWatch Logs Insights. The result of the query includes the logging provided by the AWS Glue and EMR scripts, along with other logs associated with the correlation ID.

Example query for DAG level logs : manual__2022-07-12T00:22:36.111190+00:00

We can also obtain task-level logs by using the format <dag_name.task_id correlation_id>:

Example query : data_pipeline.glue_task manual__2022-07-12T00:22:36.111190+00:00

Clean up

If you created the setup to run and test the scripts using the Amazon MWAA analytics workshop, perform the cleanup steps to avoid incurring charges.


In this post, we showed how to send Amazon MWAA logs to CloudWatch log groups. We then discussed how to tie in logs from different tasks within a DAG using the unique correlation ID. The correlation ID can be outputted with as much or as little information needed by your job to provide more details across your entire DAG run. You can then use CloudWatch Logs Insights to query the logs.

With this solution, you can use Amazon MWAA as a single pane of glass for data pipeline orchestration and CloudWatch logs for data pipeline health management. The unique identifier improves the end-to-end observability for a DAG run and helps reduce the time needed for troubleshooting.

To learn more and get hands-on experience, start with the Amazon MWAA analytics workshop and then use the scripts in the GitHub repo to gain more observability of your DAG run.

About the Author

Payal Singh is a Partner Solutions Architect at Amazon Web Services, focused on the Serverless platform. She is responsible for helping partner and customers modernize and migrate their applications to AWS.