Tag Archives: Amazon EMR

Configure Hadoop YARN CapacityScheduler on Amazon EMR on Amazon EC2 for multi-tenant heterogeneous workloads

Post Syndicated from Suvojit Dasgupta original https://aws.amazon.com/blogs/big-data/configure-hadoop-yarn-capacityscheduler-on-amazon-emr-on-amazon-ec2-for-multi-tenant-heterogeneous-workloads/

Apache Hadoop YARN (Yet Another Resource Negotiator) is a cluster resource manager responsible for assigning computational resources (CPU, memory, I/O), and scheduling and monitoring jobs submitted to a Hadoop cluster. This generic framework allows for effective management of cluster resources for distributed data processing frameworks, such as Apache Spark, Apache MapReduce, and Apache Hive. When supported by the framework, Amazon EMR by default uses Hadoop YARN. Please note that not all frameworks offered by Amazon EMR use Hadoop YARN, such as Trino/Presto and Apache HBase.

In this post, we discuss various components of Hadoop YARN, and understand how components interact with each other to allocate resources, schedule applications, and monitor applications. We dive deep into the specific configurations to customize Hadoop YARN’s CapacityScheduler to increase cluster efficiency by allocating resources in a timely and secure manner in a multi-tenant cluster. We take an opinionated look at the configurations for CapacityScheduler and configure them on Amazon EMR on Amazon Elastic Compute Cloud (Amazon EC2) to solve for the common resource allocation, resource contention, and job scheduling challenges in a multi-tenant cluster.

We dive deep into CapacityScheduler because Amazon EMR uses CapacityScheduler by default, and CapacityScheduler has benefits over other schedulers for running workloads with heterogeneous resource consumption.

Solution overview

Modern data platforms often run applications on Amazon EMR with the following characteristics:

  • Heterogeneous resource consumption patterns by jobs, such as computation-bound jobs, I/O-bound jobs, or memory-bound jobs
  • Multiple teams running jobs with an expectation to receive an agreed-upon share of cluster resources and complete jobs in a timely manner
  • Cluster admins often have to cater to one-time requests for running jobs without impacting scheduled jobs
  • Cluster admins want to ensure users are using their assigned capacity and not using others
  • Cluster admins want to utilize the resources efficiently and allocate all available resources to currently running jobs, but want to retain the ability to reclaim resources automatically should there be a claim for the agreed-upon cluster resources from other jobs

To illustrate these use cases, let’s consider the following scenario:

  • user1 and user2 don’t belong to any team and use cluster resources periodically on an ad hoc basis
  • A data platform and analytics program has two teams:
    • A data_engineering team, containing user3
    • A data_science team, containing user4
  • user5 and user6 (and many other users) sporadically use cluster resources to run jobs

Based on this scenario, the scheduler queue may look like the following diagram. Take note of the common configurations applied to all queues, the overrides, and the user/groups-to-queue mappings.

Capacity Scheduler Queue Setup

In the subsequent sections, we will understand the high-level components of Hadoop YARN, discuss the various types of schedulers available in Hadoop YARN, review the core concepts of CapacityScheduler, and showcase how to implement this CapacityScheduler queue setup on Amazon EMR (on Amazon EC2). You can skip to Code walkthrough section if you are already familiar with Hadoop YARN and CapacityScheduler.

Overview of Hadoop YARN

At a high level, Hadoop YARN consists of three main components:

  • ResourceManager (one per primary node)
  • ApplicationMaster (one per application)
  • NodeManager (one per node)

The following diagram shows the main components and their interaction with each other.

Apache Hadoop Yarn Architecture Diagram1

Before diving further, let’s clarify what Hadoop YARN’s ResourceContainer (or container) is. A ResourceContainer represents a collection of physical computational resources. It’s an abstraction used to bundle resources into distinct, allocatable unit.

ResourceManager

The ResourceManager is responsible for resource management and making allocation decisions. It’s the ResourceManager’s responsibility to identify and allocate resources to a job upon submission to Hadoop YARN. The ResourceManager has two main components:

  • ApplicationsManager (not to be confused with ApplicationMaster)
  • Scheduler

ApplicationsManager

The ApplicationsManager is responsible for accepting job submissions, negotiating the first container for running ApplicationMaster, and providing the service for restarting the ApplicationMaster on failure.

Scheduler

The Scheduler is responsible for scheduling allocation of resources to the jobs. The Scheduler performs its scheduling function based on the resource requirements of the jobs. The Scheduler is a pluggable interface. Hadoop YARN currently provides three implementations:

  • CapacityScheduler – A pluggable scheduler for Hadoop that allows for multiple tenants to securely share a cluster such that jobs are allocated resources in a timely manner under constraints of allocated capacities. The implementation is available on GitHub. The Java concrete class is org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler. In this post, we primarily focus on CapacityScheduler, which is the default scheduler on Amazon EMR (on Amazon EC2).
  • FairScheduler – A pluggable scheduler for Hadoop that allows Hadoop YARN applications to share resources in clusters fairly. The implementation is available on GitHub. The Java concrete class is org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler.
  • FifoScheduler – A pluggable scheduler for Hadoop that allows Hadoop YARN applications share resources in clusters in a first-in-first-out basis. The implementation is available on GitHub. The Java concrete class is org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler.

ApplicationMaster

Upon negotiating the first container by ApplicationsManager, the per-application ApplicationMaster has the responsibility of negotiating the rest of the appropriate resources from the Scheduler, tracking their status, and monitoring progress.

NodeManager

The NodeManager is responsible for launching and managing containers on a node.

Hadoop YARN on Amazon EMR

By default, Amazon EMR (on Amazon EC2) uses Hadoop YARN for cluster management for the distributed data processing frameworks that support Hadoop YARN as a resource manager, like Apache Spark, Apache MapReduce, and Apache Hive. Amazon EMR provides multiple sensible default settings that work for most scenarios. However, every data platform is different and has specific needs. Amazon EMR provides the ability to customize the setting at cluster creation using configuration classifications . You can also reconfigure Amazon EMR cluster applications and specify additional configuration classifications for each instance group in a running cluster using AWS Command Line Interface (AWS CLI), or the AWS SDK.

CapacityScheduler

CapacityScheduler depends on ResourceCalculator to identify the available resources and calculate the allocation of the resources to ApplicationMaster. The ResourceCalculator is an abstract Java class. Hadoop YARN currently provides two implementations:

  • DefaultResourceCalculator – In DefaultResourceCalculator, resources are calculated based on memory alone.
  • DominantResourceCalculatorDominantResourceCalculator is based on the Dominant Resource Fairness (DRF) model of resource allocation. The paper Dominant Resource Fairness: Fair Allocation of Multiple Resource Types, Ghodsi et al. [2011] describes DRF as follows: “DRF computes the share of each resource allocated to that user. The maximum among all shares of a user is called that user’s dominant share, and the resource corresponding to the dominant share is called the dominant resource. Different users may have different dominant resources. For example, the dominant resource of a user running a computation-bound job is CPU, while the dominant resource of a user running an I/O-bound job is bandwidth. DRF simply applies max-min fairness across users’ dominant shares. That is, DRF seeks to maximize the smallest dominant share in the system, then the second-smallest, and so on.”

Because of DRF, DominantResourceCalculator is a better ResourceCalculator for data processing environments running heterogeneous workloads. By default, Amazon EMR uses DefaultResourceCalculator for CapacityScheduler. This can be verified by checking the value of yarn.scheduler.capacity.resource-calculator parameter in /etc/hadoop/conf/capacity-scheduler.xml.

Code walkthrough

CapacityScheduler provides multiple parameters to customize the scheduling behavior to meet specific needs. For a list of available parameters, refer to Hadoop: CapacityScheduler.

Refer to the configurations section in cloudformation/templates/emr.yaml to review all the CapacityScheduler parameters set as part of this post. In this example, we use two classifiers of Amazon EMR (on Amazon EC2):

  • yarn-site – The classification to update yarn-site.xml
  • capacity-scheduler – The classification to update capacity-scheduler.xml

For various types of classification available in Amazon EMR, refer to Customizing cluster and application configuration with earlier AMI versions of Amazon EMR.

In the AWS CloudFormation template, we have modified the ResourceCalculator of CapacityScheduler from the defaults, DefaultResourceCalculator to DominantResourceCalculator. Data processing environments tends to run different kinds of jobs, for example, computation-bound jobs consuming heavy CPU, I/O-bound jobs consuming heavy bandwidth, and memory-bound jobs consuming heavy memory. As previously stated, DominantResourceCalculator is better suited for such environments due to its Dominant Resource Fairness model of resource allocation. If your data processing environment only runs memory-bound jobs, then modifying this parameter isn’t necessary.

You can find the codebase in the AWS Samples GitHub repository.

Prerequisites

For deploying the solution, you should have the following prerequisites:

Deploy the solution

To deploy the solution, complete the following steps:

  • Download the source code from the AWS Samples GitHub repository:
    git clone [email protected]:aws-samples/amazon-emr-yarn-capacity-scheduler.git

  • Create an Amazon Simple Storage Service (Amazon S3) bucket:
    aws s3api create-bucket --bucket emr-yarn-capacity-scheduler-<AWS_ACCOUNT_ID>-<AWS_REGION> --region <AWS_REGION>

  • Copy the cloned repository to the Amazon S3 bucket:
    aws s3 cp --recursive amazon-emr-yarn-capacity-scheduler s3://emr-yarn-capacity-scheduler-<AWS_ACCOUNT_ID>-<AWS_REGION>/

    1. ArtifactsS3Repository – The S3 bucket name that was created in the previous step (emr-yarn-capacity-scheduler-<AWS_ACCOUNT_ID>-<AWS_REGION>).
    2. emrKeyName – An existing EC2 key name. If you don’t have an existing key and want to create a new key, refer to Use an Amazon EC2 key pair for SSH credentials.
    3. clientCIDR – The CIDR range of the client machine for accessing the EMR cluster via SSH. You can run the following command to identify the IP of the client machine: echo "$(curl -s http://checkip.amazonaws.com)/32"
  • Deploy the AWS CloudFormation templates:
    aws cloudformation create-stack \
    --stack-name emr-yarn-capacity-scheduler \
    --template-url https://emr-yarn-capacity-scheduler-<AWS_ACCOUNT_ID>-<AWS_REGION>.s3.amazonaws.com/cloudformation/templates/main.yaml \
    --parameters file://amazon-emr-yarn-capacity-scheduler/cloudformation/parameters/parameters.json \
    --capabilities CAPABILITY_NAMED_IAM \
    --region <AWS_REGION>

  • On the AWS CloudFormation console, check for the successful deployment of the following stacks.

AWS CloudFormation Stack Deployment

  • On the Amazon EMR console, check for the successful creation of the emr-cluster-capacity-scheduler cluster.
  • Choose the cluster and on the Configurations tab, review the properties under the capacity-scheduler and yarn-site classification labels.

AWS EMR Configurations

  • Access the Hadoop YARN resource manager UI on the emr-cluster-capacity-scheduler cluster to review the CapacityScheduler setup. For instructions on how to access the UI on Amazon EMR, refer to View web interfaces hosted on Amazon EMR clusters.

Apache Hadoop YARN UI

  • SSH into the emr-cluster-capacity-scheduler cluster and review the following files.For instructions on how to SSH into the EMR primary node, refer to Connect to the master node using SSH.
    • /etc/hadoop/conf/yarn-site.xml
    • /etc/hadoop/conf/capacity-scheduler.xml

All the parameters set using the yarn-site and capacity-scheduler classifiers are reflected in these files. If an admin wants to update CapacityScheduler configs, they can directly update capacity-scheduler.xml and run the following command to apply the changes without interrupting any running jobs and services:

yarn rmadmin -resfreshQueues

Changes to yarn-site.xml require the ResourceManager service to be restarted, which interrupts the running jobs. As a best practice, refrain from manual modifications and use version control for change management.

The CloudFormation template adds a bootstrap action to create test users (user1, user2, user3, user4, user5 and user6) on all the nodes and adds a step script to create HDFS directories for the test users.

Users can SSH into the  primary node, sudo as different users and submit Spark jobs to verify the job submission and CapacityScheduler behavior:

[hadoop@ip-xx-x-xx-xxx ~]$ sudo su - user1
[user1@ip-xx-x-xx-xxx ~]$ spark-submit --master yarn --deploy-mode cluster \
--class org.apache.spark.examples.SparkPi /usr/lib/spark/examples/jars/spark-examples.jar

You can validate the results from the resource manager web UI.

Apache Hadoop YARN Jobs List

Clean up

To avoid incurring future charges, delete the resources you created.

  • Delete the CloudFormation stack:
    aws cloudformation delete-stack --stack-name emr-yarn-capacity-scheduler

  • Delete the S3 bucket:
    aws s3 rb s3://emr-yarn-capacity-scheduler-<AWS_ACCOUNT_ID>-<AWS_REGION> --force

The command deletes the bucket and all files underneath it. The files may not be recoverable after deletion.

Conclusion

In this post, we discussed Apache Hadoop YARN and its various components. We discussed the types of schedulers available in Hadoop YARN. We dived deep in to the specifics of Hadoop YARN CapacityScheduler and the use of Dominant Resource Fairness to efficiently allocate resources to submitted jobs. We also showcased how to implement the discussed concepts using AWS CloudFormation.

We encourage you to use this post as a starting point to implement CapacityScheduler on Amazon EMR (on Amazon EC2) and customize the solution to meet your specific data platform goals.


About the authors

Suvojit Dasgupta is a Sr. Lakehouse Architect at Amazon Web Services. He works with customers to design and build data solutions on AWS.

Bharat Gamini is a Data Architect focused on big data and analytics at Amazon Web Services. He helps customers architect and build highly scalable, robust, and secure cloud-based analytical solutions on AWS.

Amazon EMR on EKS gets up to 19% performance boost running on AWS Graviton3 Processors vs. Graviton2

Post Syndicated from Melody Yang original https://aws.amazon.com/blogs/big-data/amazon-emr-on-eks-gets-up-to-19-performance-boost-running-on-aws-graviton3-processors-vs-graviton2/

Amazon EMR on EKS is a deployment option that enables you to run Spark workloads on Amazon Elastic Kubernetes Service (Amazon EKS) easily. It allows you to innovate faster with the latest Apache Spark on Kubernetes architecture while benefiting from the performance-optimized Spark runtime powered by Amazon EMR. This deployment option elects Amazon EKS as its underlying compute to orchestrate containerized Spark applications with better price performance.

AWS continually innovates to provide choice and better price-performance for our customers, and the third-generation Graviton processor is the next step in the journey. Amazon EMR on EKS now supports Amazon Elastic Compute Cloud (Amazon EC2) C7g—the latest AWS Graviton3 instance family. On a single EKS cluster, we measured EMR runtime for Apache Spark performance by comparing C7g with C6g families across selected instance sizes of 4XL, 8XL and 12XL. We are excited to observe a maximum 19% performance gain over the 6th generation C6g Graviton2 instances, which leads to a 15% cost reduction.

In this post, we discuss the performance test results that we observed while running the same EMR Spark runtime on different Graviton-based EC2 instance types.

For some use cases, such as the benchmark test, running a data pipeline that requires a mix of CPU types for the granular-level cost efficiency, or migrating an existing application from Intel to Graviton-based instances, we usually spin up different clusters that host separate types of processors, such as x86_64 vs. arm64. However, Amazon EMR on EKS has made it easier. In this post, we also provide guidance on running Spark with multiple CPU architectures in a common EKS cluster, so that we can save significant time and effort on setting up a separate cluster to isolate the workloads.

Infrastructure innovation

AWS Graviton3 is the latest generation of AWS-designed Arm-based processors, and C7g is the first Graviton3 instance in AWS. The C family is designed for compute-intensive workloads, including batch processing, distributed analytics, data transformations, log analysis, and more. Additionally, C7g instances are the first in the cloud to feature DDR5 memory, which provides 50% higher memory bandwidth compared to DDR4 memory, to enable high-speed access to data in memory. All these innovations are well-suited for big data workloads, especially the in-memory processing framework Apache Spark.

The following table summarizes the technical specifications for the tested instance types:

Instance Name vCPUs Memory (GiB) EBS-Optimized Bandwidth (Gbps) Network Bandwidth (Gbps) On-Demand Hourly Rate
c6g.4xlarge 16 32 4.75 Up to 10 $0.544
c7g.4xlarge 16 32 Up to 10 Up to 15 $0.58
c6g.8xlarge 32 64 9 12 $1.088
c7g.8xlarge 32 64 10 15 $1.16
c6g.12xlarge 48 96 13.5 20 $1.632
c7g.12xlarge 48 96 15 22.5 $1.74

These instances are all built on AWS Nitro System, a collection of AWS-designed hardware and software innovations. The Nitro System offloads the CPU virtualization, storage, and networking functions to dedicated hardware and software, delivering performance that is nearly indistinguishable from bare metal. Especially, C7g instances have included support for Elastic Fabric Adapter (EFA), which becomes the standard on this instance family. It allows our applications to communicate directly with network interface cards providing lower and more consistent latency. Additionally, these are all Amazon EBS-optimized instances, and C7g provides higher dedicated bandwidth for EBS volumes, which can result in better I/O performance contributing to quicker read/write operations in Spark.

Performance test results

To quantify performance, we ran TPC-DS benchmark queries for Spark with a 3TB scale. These queries are derived from TPC-DS standard SQL scripts, and the test results are not comparable to other published TPC-DS benchmark outcomes. Apart from the benchmark standards, a single Amazon EMR 6.6 Spark runtime (compatible with Apache Spark version 3.2.0) was used as the data processing engine across six different managed node groups on an EKS cluster: C6g_4, C7g_4,C6g_8, C7g_8, C6g_12, C7g_12. These groups are named after instance type to distinguish the underlying compute resources. Each group can automatically scale between 1 and 30 nodes within its corresponding instance type. Architecting the EKS cluster in such a way, we can run and compare our experiments in parallel, each of which is hosted in a single node group, i.e., an isolated compute environment on a common EKS cluster. It also makes it possible to run an application with multiple CPU architectures on the single cluster. Check out the sample EKS cluster configuration and benchmark job examples for more details.

We measure the Graviton performance and cost improvements using two calculations: total query runtime and geometric mean of the total runtime. The following table shows the results for equivalent sized C6g and C7g instances and the same Spark configurations.

Benchmark Attributes 12 XL 8 XL 4 XL
Task parallelism (spark.executor.core*spark.executor.instances) 188 cores (4*47) 188 cores (4*47) 188 cores (4*47)
spark.executor.memory 6 GB 6 GB 6 GB
Number of EC2 instances 5 7 16
EBS volume 4 * 128 GB io1 disk 4 * 128 GB io1 disk 4 * 128 GB io1 disk
Provisioned IOPS per volume 6400 6400 6400
Total query runtime on C6g (sec) 2099 2098 2042
Total query runtime on C7g (sec) 1728 1738 1660
Total run time improvement with C7g 18% 17% 19%
Geometric mean query time on C6g (sec) 9.74 9.88 9.77
Geometric mean query time on C7g (sec) 8.40 8.32 8.08
Geometric mean improvement with C7g 13.8% 15.8% 17.3%
EMR on EKS memory usage cost on C6g (per run) $0.28 $0.28 $0.28
EMR on EKS vCPU usage cost on C6g (per run) $1.26 $1.25 $1.24
Total cost per benchmark run on C6g (EC2 + EKS cluster + EMR price) $6.36 $6.02 $6.52
EMR on EKS memory usage cost on C7g (per run) $0.23 $0.23 $0.22
EMR on EKS vCPU usage cost on C7g (per run) $1.04 $1.03 $0.99
Total cost per benchmark run on C7g (EC2 + EKS cluster + EMR price) $5.49 $5.23 $5.54
Estimated cost reduction with C7g 13.7% 13.2% 15%

The total number of cores and memory are identical across all benchmarked instances, and four provisioned IOPS SSD disks were attached to each EBS-optimized instance for the optimal disk I/O performance. To allow for comparison, these configurations were intentionally chosen to match with settings in other EMR on EKS benchmarks. Check out the previous benchmark blog post Amazon EMR on Amazon EKS provides up to 61% lower costs and up to 68% performance improvement for Spark workloads for C5 instances based on x86_64 Intel CPU.

The table indicates C7g instances have consistent performance improvement compared to equivalent C6g Graviton2 instances. Our test results showed 17–19% improvement in total query runtime for selected instance sizes, and 13.8–17.3% improvement in geometric mean. On cost, we observed 13.2–15% cost reduction on C7g performance tests compared to C6g while running the 104 TPC-DS benchmark queries.

Data shuffle in a Spark workload

Generally, big data frameworks schedule computation tasks for different nodes in parallel to achieve optimal performance. To proceed with its computation, a node must have the results of computations from upstream. This requires moving intermediate data from multiple servers to the nodes where data is required, which is termed as shuffling data. In many Spark workloads, data shuffle is an inevitable operation, so it plays an important role in performance assessments. This operation may involve a high rate of disk I/O, network data transmission, and could burn a significant amount of CPU cycles.

If your workload is I/O bound or bottlenecked by current data shuffle performance, one recommendation is to benchmark on improved hardware. Overall, C7g offers better EBS and network bandwidth compared to equivalent C6g instance types, which may help you optimize performance. Therefore, in the same benchmark test, we captured the following extra information, which is broken down into per-instance-type network/IO improvements.

Based on the TPC-DS query test result, this graph illustrates the percentage increases of data shuffle operations in four categories: maximum disk read and write, and maximum network received and transmitted. In comparison to c6g instances, the disk read performance improved between 25–45%, whereas the disk write performance increase was 34–47%. On the network throughput comparison, we observed an increase of 21–36%.

Run an Amazon EMR on EKS job with multiple CPU architectures

If you’re evaluating migrating to Graviton instances for Amazon EMR on EKS workloads, we recommend testing the Spark workloads based on your real-world use cases. If you need to run workloads across multiple processor architectures, for example test the performance for Intel and Arm CPUs, follow the walkthrough in this section to get started with some concrete ideas.

Build a single multi-arch Docker image

To build a single multi-arch Docker image (x86_64 and arm64), complete the following steps:

  1. Get the Docker Buildx CLI extension.Docker Buildx is a CLI plugin that extends the Docker command to support the multi-architecture feature. Upgrade to the latest Docker desktop or manually download the CLI binary. For more details, check out Working with Buildx.
  2. Validate the version after the installation:
    docker buildx version

  3. Create a new builder that gives access to the new multi-architecture features (you only have to perform this task once):
    docker buildx create --name mybuilder --use

  4. Log in to your own Amazon ECR registry:
    AWS_REGION=us-east-1
    ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
    ECR_URL=$ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com
    aws ecr get-login-password --region $AWS_REGION | docker login --username AWS --password-stdin $ECR_URL

  5. Get the EMR Spark base image from AWS:
    SRC_ECR_URL=755674844232.dkr.ecr.us-east-1.amazonaws.com
    docker pull $SRC_ECR_URL/spark/emr-6.6.0:latest
    aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin $SRC_ECR_URL

  6. Build and push a custom Docker image.

In this case, we build a single Spark benchmark utility docker image on top of Amazon EMR 6.6. It supports both Intel and Arm processor architectures:

  • linux/amd64 – x86_64 (also known as AMD64 or Intel 64)
  • linux/arm64 – Arm
docker buildx build \
--platform linux/amd64,linux/arm64 \
-t $ECR_URL/eks-spark-benchmark:emr6.6 \
-f docker/benchmark-util/Dockerfile \
--build-arg SPARK_BASE_IMAGE=$SRC_ECR_URL/spark/emr-6.6.0:latest \
--push .

Submit Amazon EMR on EKS jobs with and without Graviton

For our first example, we submit a benchmark job to the Graviton3 node group that spins up c7g.4xlarge instances.

The following is not a complete script. Check out the full version of the example on GitHub.

aws emr-containers start-job-run \
--virtual-cluster-id $VIRTUAL_CLUSTER_ID \
--name emr66-c7-4xl \
--execution-role-arn $EMR_ROLE_ARN \
--release-label emr-6.6.0-latest \
--job-driver '{
    "sparkSubmitJobDriver": {
    "entryPoint": "local:///usr/lib/spark/examples/jars/eks-spark-benchmark-assembly-1.0.jar",
    "entryPointArguments":[.......],
    "sparkSubmitParameters": "........"}}' \
--configuration-overrides '{
"applicationConfiguration": [{
    "classification": "spark-defaults",
    "properties": {
        "spark.kubernetes.container.image": "'$ECR_URL'/eks-spark-benchmark:emr6.6",
        "spark.kubernetes.node.selector.eks.amazonaws.com/nodegroup": “C7g_4”
}}]}'

In the following example, we run the same job on non-Graviton C5 instances with Intel 64 CPU. The full version of the script is available on GitHub.

aws emr-containers start-job-run \
--virtual-cluster-id $VIRTUAL_CLUSTER_ID \
--name emr66-c5-4xl \
--execution-role-arn $EMR_ROLE_ARN \
--release-label emr-6.6.0-latest \
--job-driver '{
    "sparkSubmitJobDriver": {
    "entryPoint": "local:///usr/lib/spark/examples/jars/eks-spark-benchmark-assembly-1.0.jar",
    "entryPointArguments":[.......],
    "sparkSubmitParameters": "........"}}' \    
--configuration-overrides '{
"applicationConfiguration": [{
    "classification": "spark-defaults",
    "properties": {
        "spark.kubernetes.container.image": "'$ECR_URL'/eks-spark-benchmark:emr6.6",
        "spark.kubernetes.node.selector.eks.amazonaws.com/nodegroup”: “C5_4”
}}]}'

Summary

In May 2022, the Graviton3 instance family was made available to Amazon EMR on EKS. After running the performance-optimized EMR Spark runtime on the selected latest Arm-based Graviton3 instances, we observed up to 19% performance increase and up to 15% cost savings compared to C6g Graviton2 instances. Because Amazon EMR on EKS offers 100% API compatibility with open-source Apache Spark, you can quickly step into the evaluation process with no application changes.

If you’re wondering how much performance gain you can achieve with your use case, try out the benchmark solution or the EMR on EKS Workshop. You can also contact your AWS Solutions Architects, who can be of assistance alongside your innovation journey.


About the author

Melody Yang is a Senior Big Data Solution Architect for Amazon EMR at AWS. She is an experienced analytics leader working with AWS customers to provide best practice guidance and technical advice in order to assist their success in data transformation. Her areas of interests are open-source frameworks and automation, data engineering and DataOps.

Graviton Fast Start – A New Program to Help Move Your Workloads to AWS Graviton

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/graviton-fast-start-a-new-program-to-help-move-your-workloads-to-aws-graviton/

With the Graviton Challenge last year, we helped customers migrate to Graviton-based EC2 instances and get up to 40 percent price performance benefit in as little as 4 days. Tens of thousands of customers, including 48 of the top 50 Amazon Elastic Compute Cloud (Amazon EC2) customers, use AWS Graviton processors for their workloads. In addition to EC2, many AWS managed services can run their workloads on Graviton. For most customers, adoption is easy, requiring minimal code changes. However, the effort and time required to move workloads to Graviton depends on a few factors including your software development environment and the technology stack on which your application is built.

This year, we want to take it a step further and make it even easier for customers to adopt Graviton not only through EC2, but also through managed services. Today, we are launching AWS Graviton Fast Start, a new program that makes it even easier to move your workloads to AWS Graviton by providing step-by-step directions for EC2 and other managed services that support the Graviton platform:

  • Amazon Elastic Compute Cloud (Amazon EC2) – EC2 provides the most flexible environment for a migration and can support many kinds of workloads, such as web apps, custom databases, or analytics. You have full control over the interpreted or compiled code running in the EC2 instance. You can also use many open-source and commercial software products that support the Arm64 architecture.
  • AWS Lambda – Migrating your serverless functions can be really easy, especially if you use an interpreted runtime such as Node.js or Python. Most of the time, you only have to check the compatibility of your software dependencies. I have shown a few examples in this blog post.
  • AWS Fargate – Fargate works best if your applications are already running in containers or if you are planning to containerize them. By using multi-architecture container images or images that have Arm64 in their image manifest, you get the serverless benefits of Fargate and the price-performance advantages of Graviton.
  • Amazon Aurora – Relational databases are at the core of many applications. If you need a database compatible with PostgreSQL or MySQL, you can use Amazon Aurora to have a highly performant and globally available database powered by Graviton.
  • Amazon Relational Database Service (RDS) – Similarly to Aurora, Amazon RDS engines such as PostgreSQL, MySQL, and MariaDB can provide a fully managed relational database service using Graviton-based instances.
  • Amazon ElastiCache – When your workload requires ultra-low latency and high throughput, you can speed up your applications with ElastiCache and have a fully managed in-memory cache running on Graviton and compatible with Redis or Memcached.
  • Amazon EMR – With Amazon EMR, you can run large-scale distributed data processing jobs, interactive SQL queries, and machine learning applications on Graviton using open-source analytics frameworks such as Apache SparkApache Hive, and Presto.

Here’s some feedback we got from customers running their workloads on Graviton:

  • Formula 1 racing told us that Graviton2-based C6gn instances provided the best price performance benefits for some of their computational fluid dynamics (CFD) workloads. More recently, they found that Graviton3 C7g instances are 40 percent faster for the same simulations and expect Graviton3-based instances to become the optimal choice to run all of their CFD workloads.
  • Honeycomb has 100 percent of their production workloads running on Graviton using EC2 and Lambda. They have tested the high-throughput telemetry ingestion workload they use for their observability platform against early preview instances of Graviton3 and have seen a 35 percent performance increase for their workload over Graviton2. They were able to run 30 percent fewer instances of C7g than C6g serving the same workload and with 30 percent reduced latency. With these instances in production, they expect over 50 percent price performance improvement over x86 instances.
  • Twitter is working on a multi-year project to leverage Graviton-based EC2 instances to deliver Twitter timelines. As part of their ongoing effort to drive further efficiencies, they tested the new Graviton3-based C7g instances. Across a number of benchmarks representative of their workloads, they found Graviton3-based C7g instances deliver 20-80 percent higher performance compared to Graviton2-based C6g instances, while also reducing tail latencies by as much as 35 percent. They are excited to utilize Graviton3-based instances in the future to realize significant price performance benefits.

With all these options, getting the benefits of running all or part of your workload on AWS Graviton can be easier than you expect. To help you get started, there’s also a free trial on the Graviton-based T4g instances for up to 750 hours per month through December 31st, 2022.

Visit AWS Graviton Fast Start to get step-by-step directions on how to move your workloads to AWS Graviton.

Danilo

AWS Week In Review – July 25, 2022

Post Syndicated from Antje Barth original https://aws.amazon.com/blogs/aws/aws-week-in-review-july-25-2022/

A few weeks ago, we hosted the first EMEA AWS Heroes Summit in Milan, Italy. This past week, I had the privilege to join the Americas AWS Heroes Summit in Seattle, Washington, USA. Meeting with our community experts is always inspiring and a great opportunity to learn from each other. During the Summit, AWS Heroes from North America and Latin America shared their thoughts with AWS developer advocates and product teams on topics such as serverless, containers, machine learning, data, and DevTools. You can learn more about the AWS Heroes program here.

AWS Heroes Summit Americas 2022

Last Week’s Launches
Here are some launches that got my attention during the previous week:

Cloudscape Design System Cloudscape is an open source design system for creating web applications. It was built for and is used by AWS products and services. We created it in 2016 to improve the user experience across web applications owned by AWS services and also to help teams implement those applications faster. If you’ve ever used the AWS Management Console, you’ve seen Cloudscape in action. If you are building a product that extends the AWS Management Console, designing a user interface for a hybrid cloud management system, or setting up an on-premises solution that uses AWS, have a look at Cloudscape Design System.

Cloudscape Design System

AWS re:Post introduces community-generated articlesAWS re:Post gives you access to a vibrant community that helps you become even more successful on AWS. Expert community members can now share technical guidance and knowledge beyond answering questions through the Articles feature. Using this feature, community members can share best practices and troubleshooting processes and address customer needs around AWS technology in greater depth. The Articles feature is unlocked for community members who have achieved Rising Star status on re:Post or subject matter experts who built their reputation in the community based on their contributions and certifications. If you have a Rising Star status on re:Post, start writing articles now! All other members can unlock Rising Star status through community contributions or simply browse available articles today on re:Post.

AWS re:Post

AWS Lambda announces support for attribute-based access control (ABAC) and new IAM condition key – You can now use attribute-based access control (ABAC) with AWS Lambda to control access to functions within AWS Identity and Access Management (IAM) using tags. ABAC is an authorization strategy that defines access permissions based on attributes. In AWS, these attributes are called tags. With ABAC, you can scale an access control strategy by setting granular permissions with tags without requiring permissions updates for every new user or resource as your organization scales. Read this blog post by Julian Wood and Chris McPeek to learn more.

AWS Lambda also announced support for lambda:SourceFunctionArn, a new IAM condition key that can be used for IAM policy conditions that specify the Amazon Resource Name (ARN) of the function from which a request is made. You can use the Condition element in your IAM policy to compare the lambda:SourceFunctionArn condition key in the request context with values that you specify in your policy. This allows you to implement advanced security controls for the AWS API calls taken by your Lambda function code. For more details, have a look at the Lambda Developer Guide.

Amazon Fraud Detector launches Account Takeover Insights (ATI)Amazon Fraud Detector now supports an Account Takeover Insights (ATI) model, a low-latency fraud detection machine learning model specifically designed to detect accounts that have been compromised through stolen credentials, phishing, social engineering, or other forms of account takeover. The ATI model is designed to detect up to four times more ATI fraud than traditional rules-based account takeover solutions while minimizing the level of friction for legitimate users. To learn more, have a look at the Amazon Fraud Detector documentation.

Amazon EMR on EC2 clusters (EMR Clusters) introduces more fine-grained access controls – Previously, all jobs running on an EMR cluster used the IAM role associated with the EMR cluster’s EC2 instances to access resources. This role is called the EMR EC2 instance profile. With the new runtime roles for Amazon EMR Steps, you can now specify a different IAM role for your Apache Spark and Hive jobs, scoping down access at a job level. This simplifies access controls on a single EMR cluster that is shared between multiple tenants, wherein each tenant is isolated using IAM roles. You can now also enforce table and column permissions based on your Amazon EMR runtime role to manage your access to data lakes with AWS Lake Formation. For more details, read the What’s New post.

For a full list of AWS announcements, be sure to keep an eye on the What’s New at AWS page.

Other AWS News
Here are some additional news and customer stories you may find interesting:

AWS open-source news and updates – My colleague Ricardo Sueiras writes this weekly open-source newsletter in which he highlights new open-source projects, tools, and demos from the AWS Community. Read edition #121 here.

AI Use Case Explorer – If you are interested in AI use cases, have a look at the new AI Use Case Explorer. You can search over 100 use cases and 400 customer success stories by industry, business function, and the business outcome you want to achieve.

Bayer centralizes and standardizes data from the carbon program using AWS – To help Brazilian farmers adopt climate-smart agricultural practices and reduce carbon emissions in their activities, Bayer created the Carbon Program, which aims to build carbon-neutral agriculture practices. Learn how Bayer uses AWS to centralize and standardize the data received from the different partners involved in the project in this Bayer case study.

Upcoming AWS Events
Check your calendars and sign up for these AWS events:

AWS re:Inforce 2022 – The event will be held this week in person on July 26 and 27 in Boston, Massachusetts, USA. You can watch the keynote and leadership sessions online for free. AWS On Air will also stream live from re:Inforce.

AWS SummitAWS Global Summits – AWS Global Summits are free events that bring the cloud computing community together to connect, collaborate, and learn about AWS. Registrations are open for the following AWS Summits in August:

Imagine Conference 2022IMAGINE 2022 – The IMAGINE 2022 conference will take place on August 3 at the Seattle Convention Center, Washington, USA. It’s a no-cost event that brings together education, state, and local leaders to learn about the latest innovations and best practices in the cloud. You can register here.

I’ll be speaking at Data Con LA on August 13–14 in Los Angeles, California, USA. Feel free to say “Hi!” if you’re around. And if you happen to be at Ray Summit on August 23–24 in San Francisco, California, USA, stop by the AWS booth. I’ll be there to discuss all things Ray on AWS.

That’s all for this week. Check back next Monday for another Week in Review!

Antje

This post is part of our Week in Review series. Check back each week for a quick roundup of interesting news and announcements from AWS!

Enable federated governance using Trino and Apache Ranger on Amazon EMR

Post Syndicated from Varun Rao Bhamidimarri original https://aws.amazon.com/blogs/big-data/enable-federated-governance-using-trino-and-apache-ranger-on-amazon-emr/

Managing data through a central data platform simplifies staffing and training challenges and reduces the costs. However, it can create scaling, ownership, and accountability challenges, because central teams may not understand the specific needs of a data domain, whether it’s because of data types and storage, security, data catalog requirements, or specific technologies needed for data processing. One of the architecture patterns that has emerged recently to tackle this challenge is the data mesh architecture, which gives ownership and autonomy to individual teams who own the data. One of the major components of implementing a data mesh architecture lies in enabling federated governance, which includes centralized authorization and audits.

Apache Ranger is an open-source project that provides authorization and audit capabilities for Hadoop and related big data applications like Apache Hive, Apache HBase, and Apache Kafka.

Trino, on the other hand, is a highly parallel and distributed query engine, and provides federated access to data by using connectors to multiple backend systems like Hive, Amazon Redshift, and Amazon OpenSearch Service. Trino acts as a single access point to query all data sources.

By combining Trino query federation features with the authorization and audit capability of Apache Ranger, you can enable federated governance. This allows multiple purpose-built data engines to function as one, with a single centralized place to manage data access controls.

This post shares details on how to architect this solution using the new EMR Ranger Trino plugin on Amazon EMR 6.7.

Solution overview

Trino allows you to query data in different sources, using an extensive set of connectors. This feature enables you to have a single point of entry for all data sources that can be queried through SQL.

The following diagram illustrates the high-level overview of the architecture.

This architecture is based on four major components:

  • Windows AD, which is responsible for providing the identities of users across the system. It’s mainly composed of a key distribution center (KDC) that provides kerberos tickets to AD users to interact with the EMR cluster, and a Lightweight Directory Access Protocol (LDAP) server that defines the organization of users in logical structures.
  • An Apache Ranger server, which runs on an Amazon Elastic Compute Cloud (Amazon EC2) instance whose lifecycle is independent from the one of the EMR cluster. Apache Ranger is composed of a Ranger admin server that stores and retrieves policies in and from a MySQL database running in Amazon Relational Database Service (Amazon RDS), a usersync server that connects to the Windows AD LDAP server to synchronize identities to make them available for policy settings, and an optional Apache Solr server to index and store audits.
  • An Amazon RDS for MySQL database instance used by the Hive metastore to store metadata related to the tables schemas, and the Apache Ranger server to store the access control policies.
  • An EMR cluster with the following configuration updates:
    • Apache Ranger security configuration.
    • A local KDC that establishes a one-way trust with Windows AD in order to have the Kerberized EMR cluster recognize the user identities from the AD.
    • A Hue user interface with LDAP authentication enabled to run SQL queries on the Trino engine.
  • An Amazon CloudWatch log group to store all audit logs for the AWS managed Ranger plugins.
  • (Optional) Trino connectors for other execution engines like Amazon Redshift, Amazon OpenSearch Service, PostgresSQL, and others.

Prerequisites

Before getting started, you must have the following prerequisites. For more information, refer to the Prerequisites and Setting up your resources sections in Introducing Amazon EMR integration with Apache Ranger.

To set up the new Apache Ranger Trino plugin, the following steps are required:

  1. Delete any existing Presto service definitions in the Apache Ranger admin server:
    #Delete Presto Service Definition
    curl -f -u *<admin users login>*:*_<_**_password_ **_for_** _ranger admin user_**_>_* -X DELETE -k 'https://*<RANGER SERVER ADDRESS>*:6182/service/public/v2/api/servicedef/name/presto'

  2. Download and add new Apache Ranger service definitions for Trino in the Apache Ranger admin server:
     wget https://s3.amazonaws.com/elasticmapreduce/ranger/service-definitions/version-2.0/ranger-servicedef-amazon-emr-trino.json
    
    curl -u *<admin users login>*:*_<_**_password_ **_for_** _ranger admin user_**_>_* -X POST -d @ranger-servicedef-amazon-emr-trino.json \
    -H "Accept: application/json" \
    -H "Content-Type: application/json" \
    -k 'https://*<RANGER SERVER ADDRESS>*:6182/service/public/v2/api/servicedef'

  3. Create a new Amazon EMR security configuration for Apache Ranger installation to include Trino policy repository details. For more information, see Create the EMR security configuration.
  4. Optionally, if you want to use the Hue UI to run Trino SQL, add the hue user to the Apache Ranger admin server. Run the following command on the Ranger admin server:
    # Note: input parameter Ranger host IP address
     
     set -x
    ranger_server_fqdn=$1
    RANGER_HTTP_URL=https://$ranger_server_fqdn:6182
    
    cat > hueuser.json << EOF
    { 
      "name": "hue1",
      "firstName": "hue",
      "lastName": "",
      "loginId": "hue1",
      "emailAddress" : null,
      "description" : "hue user",
      "password" : "user1pass",
      "groupIdList": [],
      "groupNameList": [],
      "status": 1,
      "isVisible": 1,
      "userRoleList": [ "ROLE_USER" ],
      "userSource": 0
    }
    EOF
    
    #add users 
    curl -u admin:admin -v -i -s -X POST  -d @hueuser.json -H "Accept: application/json" -H "Content-Type: application/json"  -k $RANGER_HTTP_URL/service/xusers/secure/users

After you add the hue user, it’s used to impersonate SQL calls submitted by AD users.

Warning: Impersonation feature should always be used carefully to avoid giving any/all users access to high privileges.

This post also demonstrates the capabilities of running queries against external databases, such as Amazon Redshift and PostgresSQL using Trino connectors, while controlling access at the database, table, row, and column level using the Apache Ranger policies. This requires you to set up the database engines you want to connect with. The following example code demonstrates using the Amazon Redshift connector. To set up the connector, create the file redshift.properties under /etc/trino/conf.dist/catalog on all Amazon EMR nodes and restart the Trino server.

  • Create the redshift.properties property file on all the Amazon EMR nodes with the following code:
    # Create a new redshift.properties file
    /etc/trino/conf.dist/catalog/redshift.properties

  • Update the property file with the Amazon Redshift cluster details:
    connector.name=redshift
    connection-url=jdbc:redshift://XXXXX:5439/dev
    connection-user=XXXX
    connection-password=XXXXX

  • Restart the Trino server:
    # Restart Trino server 
    sudo systemctl stop trino-server.service
    sudo systemctl start trino-server.service

  • In a production environment, you can automate this step using the following inside your EMR Classification:
    {
    "Classification": "trino-connector-redshift",
    "Properties": {
    "connector.name": "redshift",
    "connection-url": "jdbc:redshift://XXXXX:5439/dev",
    "connection-user": "XXXX",
    "connection-password": "XXXX"
    }
    }

Test your setup

In this section, we go through an example where the data is distributed across Amazon Redshift for dimension tables and Hive for fact tables. We can use Trino to join data between these two engines.

On Amazon Redshift, let’s define a new dimension table called Products and load it with data:

--- Setup products table in Redshift
 > create table public.products 
 (company VARCHAR, link VARCHAR, price FLOAT, product_category VARCHAR, 
 release_date VARCHAR, sku VARCHAR);

--- Copy data from S3

 > COPY public.products
  FROM 's3://aws-bigdata-blog/artifacts/aws-blog-emr-ranger/data/staging/products/'
  IAM_ROLE '<XXXXXXXXX>'
  FORMAT AS PARQUET;

Then use the Hue UI to create the Hive external table Orders:

CREATE EXTERNAL TABLE IF NOT EXISTS default.orders 
(customer_id STRING, order_date STRING, price DOUBLE, sku STRING)
STORED AS PARQUET
LOCATION 's3://aws-bigdata-blog/artifacts/aws-blog-emr-ranger/data/staging/orders';

Now let’s use Trino to join both datasets:

-- Join the dimension table in Redshift (products) with the fact table in hive (orders), 
to get the sum of sales by product_category and sku
SELECT sum(orders.price) total_sales, products.sku, products.product_category
FROM hive.staging.orders join redshift.public.products on orders.sku = products.sku
group by products.sku, products.product_category limit 10

The following screenshot shows our results.

Row filtering and column masking

Apache Ranger supports policies to allow or deny access based on several attributes, including user, group, and predefined roles, as well as dynamic attributes like IP address and time of access. In addition, the model supports authorization based on the classification of the resources such as like PII, FINANCE, and SENSITIVE.

Another feature is the ability to allow users to access only a subset of rows in a table or restrict users to access only masked or redacted values of sensitive data. Examples of this include the ability to restrict users to access only records of customers located in the same country where the user works, or allow a user who is doctor to see only records of patients that are associated with that doctor.

The following screenshots show how, by using Trino Ranger policies, you can enable row filtering and column masking of data in Amazon Redshift tables.

The example policy masks the firstname column, and applies a filter condition on the city column to restrict users to view rows for a specific city only.

The following screenshot shows our results.

Dynamic row filtering using user session context

The Trino Ranger plugin passes down Trino session data like current_user() that you can use in the policy definition. This can vastly simplify your row filter conditions by removing the need for hardcoded values and using a mapping lookup. For more details on dynamic row filtering, refer to Row-level filtering and column-masking using Apache Ranger policies in Apache Hive.

Known issue with Amazon EMR 6.7

Amazon EMR 6.7 has a known issue when enabling Kerberos 1-way trust with Microsoft windows AD. Please run this bootstrap script following these instructions as part of the cluster launch.

Limitations

When using this solution, keep in mind the following limitations, further details can be found here:

  • Non-Kerberos clusters are not supported.
  • Audit logs are not visible on the Apache Ranger UI, because these are sent to CloudWatch.
  • The AWS Glue Data Catalog isn’t supported as the Apache Hive Metastore.
  • The integration between Amazon EMR and Apache Ranger limits the available applications. For a full list, refer to Application support and limitations.

Troubleshooting

If you can’t log in to the EMR cluster’s node as an AD user, and you get the error message Permission denied, please try again.

This can happen if the SSSD service has stopped on the node you are trying to access. To fix this, connect to the node using the configured SSH key-pair or by making use of Session Manager  and run the following command.

sudo service sssd restart

If you’re unable to download policies from Ranger admin server, and you get the error message Error getting policies with the HTTP status code 400. This can be caused because either the certificate has expired or the Ranger policy definition is not set up correctly.

To fix this, check the Ranger admin logs. If it shows the following error, it’s likely that the certificates have expired.

VXResponse@347f4bdbstatusCode={1} msgDesc={Unauthorized access - unable to get client certificate} messageList={[VXMessage={org.apache.ran
ger.view.VXMessage@7ebc298cname={OPER_NOT_ALLOWED_FOR_ENTITY} rbKey={xa.error.oper_not_allowed_for_state} message={Operation not allowed for entity} objectId={n
ull} fieldName={null} }]} }

You will need to perform the following steps to address the issue

  • Recreate the certs using the create-tls-certs.sh script and upload them to Secrets Manager.
  • Then update the Ranger admin server configuration with new certificate, and restart Ranger admin service.
  • Create a new EMR security configuration using the new certificate, and re-launch EMR cluster using new security configurations.

The issue can also be caused due to a misconfigured Ranger policy definition. The Ranger admin service policy definition should trust the self-signed certificate chain. Make sure the following configuration attribute for the service definitions has the correct domain name or pattern to match the domain name used for your EMR cluster nodes.

If the EMR cluster keeps failing with the error message Terminated with errors: An internal error occurred, check the Amazon EMR primary node secret agent logs.

If it shows the following message, the cluster is failing because the specified CloudWatch log group doesn’t exist:

Exception in thread "main" com.amazonaws.services.logs.model.ResourceNotFoundException: The specified log group does not exist. (Service: AWSLogs; Status Code: 400; Error Code: ResourceNotFoundException; Request ID: d9fa2ef1-17cb-4b8f-a07f-8a6aea245173; Proxy: null)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1862)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1415)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1384)

A query run through trino-cli might fail with the error Unable to obtain password from user. For example:

ERROR   remote-task-callback-42 io.trino.execution.StageStateMachine    Stage 20220422_023236_00005_zn4vb.1 failed
com.google.common.util.concurrent.UncheckedExecutionException: com.google.common.util.concurrent.UncheckedExecutionException: java.lang.RuntimeException: javax.security.auth.login.LoginException: Unable to obtain password from user

This issue can occur due to incorrect realm names in the etc/trino/conf.dist/catalog/hive.properties file. Check the domain or realm name and other Kerberos related configs in the etc/trino/conf.dist/catalog/hive.properties file. Optionally, also check the /etc/trino/conf.dist/trino-env.sh and /etc/trino/conf.dist/config.properties files in case any config changes has been made.

Clean up

Clean up the resources created either manually or by the AWS CloudFormation template provided in GitHub repo to avoid unnecessary cost to your AWS account. You can delete the CloudFormation stack by selecting the stack on the AWS CloudFormation console and choosing Delete. This action deletes all the resources it provisioned. If you manually updated a template-provisioned resource, you may encounter some issues during cleanup; you need to clean these up independently.

Conclusion

A data mesh approach encourages the idea of data domains where each domain team owns their data and is responsible for data quality and accuracy. This draws parallels with a microservices architecture. Building federated data governance like we show in this post is at the core of implementing a data mesh architecture. Combining the powerful query federation capabilities of Apache Trino with the centralized authorization and audit capabilities of Apache Ranger provides an end-to-end solution to operate and govern a data mesh platform.

In addition to the already available Ranger integrations capabilities for Apache SparkSQL, Amazon S3, and Apache Hive, starting from 6.7 release, Amazon EMR includes plugins for Ranger Trino integrations. For more information, refer to EMR Trino Ranger plugin.


About the authors

Varun Rao Bhamidimarri is a Sr Manager, AWS Analytics Specialist Solutions Architect team. His focus is helping customers with adoption of cloud-enabled analytics solutions to meet their business requirements. Outside of work, he loves spending time with his wife and two kids, stay healthy, mediate and recently picked up gardening during the lockdown.

Partha Sarathi Sahoo is an Analytics Specialist TAM – at AWS based in Sydney, Australia. He brings 15+ years of technology expertise and helps Enterprise customers optimize Analytics workloads. He has extensively worked on both on-premise and cloud Bigdata workloads along with various ETL platform in his previous roles. He also actively works on conducting proactive operational reviews around the Analytics services like Amazon EMR, Redshift, and OpenSearch.

Anis Harfouche is a Data Architect at AWS Professional Services. He helps customers achieving their business outcomes by designing, building and deploying data solutions based on AWS services.

Disaster recovery considerations with Amazon EMR on Amazon EC2 for Spark workloads

Post Syndicated from Bharat Gamini original https://aws.amazon.com/blogs/big-data/disaster-recovery-considerations-with-amazon-emr-on-amazon-ec2-for-spark-workloads/

Amazon EMR is a cloud big data platform for running large-scale distributed data processing jobs, interactive SQL queries, and machine learning (ML) applications using open-source analytics frameworks such as Apache Spark, Apache Hive, and Presto. Amazon EMR launches all nodes for a given cluster in the same Amazon Elastic Compute Cloud (Amazon EC2) Availability Zone to improve performance. During an Availability Zone failure or due to any unexpected interruption, Amazon EMR may not be accessible, and we need a disaster recovery (DR) strategy to mitigate this problem.

Part of architecting a resilient, highly available Amazon EMR solution is the consideration that failures do occur. These unexpected interruptions can be caused by natural disasters, technical failures, and human interactions resulting in an Availability Zone outage. The EMR cluster could also become unreachable due to failure of critical services running on the EMR master node, network issues, or other issues.

In this post, we show you how to architect your Amazon EMR environment for disaster recovery to maintain business continuity with minimum Recovery Time Objective (RTO) during Availability Zone failure or when your EMR cluster is inoperable.

Although various disaster recovery strategies are available in the cloud, we discuss active-active and active-passive DR strategies for Amazon EMR in this post. We focus on a use case for Spark batch workloads where persistent storage is decoupled from Amazon EMR and the EMR cluster is running with a single master node. If the EMR cluster is used for persistent storage, it requires an additional strategy to replicate data from the EMR cluster, which we will cover in subsequent posts.

Prerequisites

To follow along with this post, you should have a knowledge of Amazon Managed Workflows for Apache Airflow (Amazon MWAA) and an understanding of Network Load Balancers.

Solution overview

The following diagram illustrates the solution architecture.

Customers often use Amazon MWAA to submit Spark jobs to an EMR cluster using an Apache Livy REST interface. We can configure Apache Livy to use a Network Load Balancer hostname instead of an Amazon EMR master hostname, so that we don’t need to update Livy connections from Amazon MWAA whenever a new cluster is created or stopped. You can register Network Load Balancer target groups with multiple EMR cluster master nodes for an active-active setup. In the case of an active-passive setup, we can create a new EMR cluster when a failure is detected and register the new EMR master with the Network Load Balancer target group. The Network Load Balancer automatically performs health checks and distributes requests to healthy targets. With this solution, we can maintain business continuity when an EMR cluster isn’t reachable due to Availability Zone failure or when the cluster is unhealthy due to any other reason.

Active-active DR strategy

An active-active DR setup focuses on running two EMR clusters with identical configuration in two different Availability Zones. To reduce the running costs of two active EMR clusters, we can launch both clusters with minimum capacity, and managed scaling automatically scales the cluster based on the workload. EMR managed scaling only launches instances when there is demand for resources and stops the unneeded instances when the work is finished. With this strategy, we can reduce our recovery time to near zero with optimal cost. This active-active DR strategy is suitable when businesses want to have near-zero downtime with automatic failover for your analytics workloads.

In the following section, we walk through the steps to implement the solution and provide references to related resources that provide more detailed guidance.

Create EMR clusters

We create two EMR clusters in different Availability Zones within the same Region of your choice. Use the following AWS Command Line Interface (AWS CLI) command and modify or add required configurations as per your needs:

aws emr create-cluster \
  --name "<emr-cluster-az-a>" \
  --release-label emr-6.4.0 \
  --log-uri "s3://<your-log-bucket>" \
  --applications Name=Spark Name=Livy \
  --ec2-attributes "KeyName=<your-key-name>,SubnetId=<private-subnet-in-az-a>" \
  --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m4.large InstanceGroupType=CORE,InstanceCount=1,InstanceType=m4.large \
  --use-default-roles

We can create the cluster with EMR managed scaling, which lets you automatically increase or decrease the number of instances or units in your cluster based on workload. Amazon EMR continuously evaluates cluster metrics to make scaling decisions that optimize your clusters for cost and speed.

Create and configure a Network Load Balancer

You can create a Network Load Balancer using the AWS CLI (see Create a Network Load Balancer using the AWS CLI) or the AWS Management Console (see Create a Network Load Balancer). For this post, we do so on the console.

  • Create a target group (emr-livy-dr) and register both EMR clusters’ master IP addresses in the target group.

  • Create an internal Network Load Balancer in the same VPC or Region as your EMR clusters, and choose two different Availability Zones and select the private subnets.
    These subnets don’t need to be in the same subnets as the EMR clusters, but the clusters must allow the traffic from the Network Load Balancer, which is discussed in next steps.

  • Create a TCP listener on port 8998 (the default EMR cluster Livy port) to forward requests to the target group you created.

  • Modify the EMR clusters’ master security groups to allow the Network Load Balancer’s private IP addresses to access port 8998.

You can find the Network Load Balancer’s private IP address by searching the elastic network interfaces for the Network Load Balancer’s name. For access control instructions, refer to How do I attach a security group to my Elastic Load Balancer.

When the target groups become healthy, the Network Load Balancer forwards requests to registered targets when it receives requests on Livy port 8998.

  • Get the DNS name of the Network Load Balancer.

We can also use an Amazon Route 53 alias record to use our own domain name to route traffic to the Network Load Balancer DNS name. We use this DNS name in our Amazon MWAA Livy connection.

Create and configure Amazon MWAA

Complete the following steps:

  • Make sure the execution role you’re using with Amazon MWAA has proper access to EMR clusters and other required services.
  • Update the Amazon MWAA Livy connection (livy_default) host with the Network Load Balancer hostname you created.
  • Create a new Livy connection ID if it’s not already available.

  • Use the following sample DAG to submit a sample Spark application using LivyOperator. We assign the livy_default connection to the livy_conn_id in the DAG code.
  • Enable the DAG and verify if the Spark application is successful on one of the EMR clusters.
from datetime import timedelta, datetime
from airflow.utils.dates import days_ago
from airflow import DAG
from airflow.providers.apache.livy.operators.livy import LivyOperator

default_args = {
    'owner': 'airflow',
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

dag_name = "livy_spark_dag"
# Replace S3 bucket name
# You can use sample spark jar from EMR cluster master node
# /usr/lib/spark/examples/jars/spark-examples.jar
s3_bucket = "artifacts-bucket"
jar_location = "s3://{}/spark-examples.jar".format(s3_bucket)

dag = DAG(
    dag_id = dag_name,
    default_args=default_args,
    schedule_interval='@once',
    start_date = days_ago(1),
    catchup=False,
    tags=['emr', 'spark', 'livy']
)

livy_spark = LivyOperator(
    file=jar_location,
    class_name="org.apache.spark.examples.SparkPi",
    driver_memory="1g",
    driver_cores=1,
    executor_memory="1g",
    executor_cores=2,
    num_executors=1,
    task_id="livy_spark",
    conf={
    "spark.submit.deployMode": "cluster",
    "spark.app.name": dag_name
    },
    livy_conn_id="livy_default",
    dag=dag,
)

livy_spark

Test the DR plan

We can test our DR plan by creating scenarios that could be caused by real disasters. Perform the following steps to validate if our DR strategy works automatically during a disaster:

  1. Run the sample DAG multiple times and verify if Spark applications are randomly submitted to the registered EMR clusters.
  2. Stop one of the clusters and verify if jobs are automatically submitted to the other cluster in a different Availability Zone without any issues.

Active-passive DR strategy

Although the active-active DR strategy has benefits of maintaining near-zero recovery time, it’s complex to maintain two environments because both environments require patching and constant monitoring. In cases where Recovery Time Objective (RTO) and Recovery Point Objective (RPO) aren’t critical for your workloads, we can adopt an active-passive strategy. This approach offers a more economical and operationally less complex approach.

In this approach, we use a single EMR cluster as an active cluster and in case of disaster (due to Availability Zone failures or any other reason the EMR cluster is unhealthy), we launch a second EMR cluster in a different Availability Zone and redirect all our workloads to the newly launched cluster. End-users may notice some delay because launching a second EMR cluster takes time.

The high-level architecture of the active-passive DR solution is shown in the following diagram.

Complete the following steps to implement this solution:

  • Create an EMR cluster in a single Availability Zone.
  • Create target groups and register the EMR cluster master node IP address. Create target group for Resource Manager(8088), Name Node(9870) and Livy(8998) services. Change the port numbers if services are running on different ports.

  • Create a Network Load Balancer and add TCP listeners and forward requests to the respective target groups.

  • Create an Amazon MWAA environment with proper access to the EMR cluster in the same Region.
  • Edit the Amazon MWAA Livy connection to use the Network Load Balancer DNS name.
  • Use the updated Livy connection in Amazon MWAA DAGs to submit Spark applications.
  • Validate if we can successfully submit Spark applications via Livy to the EMR cluster.
  • Set up a DAG on Amazon MWAA or similar scheduling tool that continuously monitors the existing EMR cluster health.
  • Monitor the following key services running on the Amazon EMR master host using REST APIs or commands provided by each service. Add more health checks as required.
  • If the health check process detects a failure of the first EMR cluster, create a new EMR cluster in a different Availability Zone.
  • Automatically register the newly created EMR cluster master IP address to the Network Load Balancer target groups.
  • When the Network Load Balancer health checks are successful with the new EMR cluster master IP, delete the unhealthy EMR cluster master IP address from the target group and stop the old EMR cluster.
  • Validate the DR plan.

Follow the steps mentioned in the active-active DR strategy to create the following resources:

  • Amazon EMR
  • Amazon MWAA
  • Network Load Balancer

The following sample script provides the functionality described in this section. Use this as reference and modify it accordingly to fit your use case.

#!/bin/bash

usage() {
	cat <<EOF
   Usage: ./dr_health_check.sh j-2NPQWXK1U4E6G

   This script takes current EMR cluster id as argument and monitors the cluster health and
   creates new EMR cluster in different AZ if existing cluster is unhealthy/unreachable

EOF
	exit 1
}

[[ $# -lt 1 ]] && {
	echo Specify cluster id as argument to the script
	usage
}

#Set NLB DNS name and region
hostname="emr-ap-ae4ffe5g23fd9245.elb.us-west-2.amazonaws.com"
region="us-west-2"
cluster_id=$1
cluster_status=""

export AWS_DEFAULT_REGION=$region

#Depending on the use case perform below health checks for more than one time in a loop and if cluster state is still unhealthy then only perform remaining steps
#Ports and SSL properties for curl command may differ depending on how services are set up
rm_state=$(curl -s --connect-timeout 5 --max-time 10 http://$hostname:8088/ws/v1/cluster | jq -r .clusterInfo.state)
if [[ $? -ne 0 || "$rm_state" != "STARTED" ]]; then
	echo "ResourceManager port not reachable or service not running"
	cluster_status="unhealthy"
fi

nn_state=$(curl -s --connect-timeout 5 --max-time 10 http://$hostname:9870/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus | jq -r .beans[0].State)
if [[ $? -ne 0 || "$nn_state" != "active" ]]; then
	echo "NameNode port not reachable or service not running"
	cluster_status="unhealthy"
fi

livy_state=$(curl -s --connect-timeout 5 --max-time 10 http://$hostname:8998/sessions)
if [[ $? -ne 0 ]]; then
	echo "Livy port not reachable"
	cluster_status="unhealthy"
fi

cluster_name=$(aws emr describe-cluster --cluster-id $cluster_id | jq -r ".Cluster.Name")

update_target_groups() {
	new_master_ip=$1
	current_master_ip=$2
	current_az=$3

	nlb_arn=$(aws elbv2 describe-load-balancers --query "LoadBalancers[?DNSName==\`$hostname\`].[LoadBalancerArn]" --output text)
	target_groups=$(aws elbv2 describe-target-groups --load-balancer-arn $nlb_arn --query "TargetGroups[*].TargetGroupArn" --output text)
	IFS=" " read -a tg_array <<<$target_groups
	for tg in "${tg_array[@]}"; do
		echo "Registering new EMR master IP with target group $tg"
		aws elbv2 register-targets --target-group-arn $tg --targets Id=$new_master_ip,AvailabilityZone=all

		echo "De-registering old/unhealthy EMR master IP from target group $tg"
		aws elbv2 deregister-targets --target-group-arn $tg --targets Id=$current_master_ip,AvailabilityZone=all
	done
}

if [[ $cluster_status == "unhealthy" ]]; then
	echo "Cluster status is $cluster_status, creating new EMR cluster"
	current_az=$(aws emr describe-cluster --cluster-id $cluster_id | jq -r ".Cluster.Ec2InstanceAttributes.Ec2AvailabilityZone")
	new_az=$(aws ec2 describe-availability-zones --output json --filters "Name=region-name,Values=$region" --query "AvailabilityZones[?ZoneName!=\`$current_az\`].ZoneName|[0]" --output=text)
	current_master_ip=$(aws emr list-instances --cluster-id $cluster_id --instance-group-types MASTER --query "Instances[*].PrivateIpAddress" --output text)
	echo "Current/unhealthy cluster id $cluster_id, cluster name $cluster_name,AZ $current_az, Master private ip $current_master_ip"

	echo "Creating new EMR cluster in $new_az"
	emr_op=$(aws emr create-cluster \
		--name "$cluster_name-$new_az" \
		--release-label emr-6.4.0 \
		--applications Name=Spark Name=Livy \
		--ec2-attributes "AvailabilityZone=$new_az" \
		--instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m4.large InstanceGroupType=CORE,InstanceCount=1,InstanceType=m4.large \
		--use-default-role \
		--region $region)

	new_cluster_id=$(echo $emr_op | jq -r ".ClusterId")

	#wait for cluster provisioning to get master ip address
	sleep 2m

	new_master_ip=$(aws emr list-instances --cluster-id $new_cluster_id --instance-group-types MASTER --query "Instances[*].PrivateIpAddress" --output text)
	echo "New EMR cluster id $new_cluster_id and Master node IP $new_master_ip"

	echo "Terminating unhealthy cluster $cluster_id/$cluster_name in $current_az"
	aws emr modify-cluster-attributes --cluster-id $cluster_id --no-termination-protected
	aws emr terminate-clusters --cluster-ids $cluster_id

	echo "Register new EMR master IP address with NLB target groups and de-register unhealthy EMR master"
	update_target_groups $new_master_ip $current_master_ip $current_az
else
	echo "Current cluster $cluster_id/$cluster_name is healthy"
fi

Summary

In this post, we shared some solutions and considerations to improve DR implementation using Amazon EMR on Amazon EC2, Network Load Balancer, and Amazon MWAA. Based on your use case, you can determine the type of DR strategy you want to deploy. We have provided the steps required to create the necessary environments and set up a successful DR strategy.

For more details about the systems and processes described in this post, refer to the following:


About the Author

Bharat Gamini is a Data Architect focused on Big Data & Analytics at Amazon Web Services. He helps customers architect and build highly scalable, robust and secure cloud-based analytical solutions on AWS.

Build a high-performance, ACID compliant, evolving data lake using Apache Iceberg on Amazon EMR

Post Syndicated from Sekar Srinivasan original https://aws.amazon.com/blogs/big-data/build-a-high-performance-acid-compliant-evolving-data-lake-using-apache-iceberg-on-amazon-emr/

Amazon EMR is a cloud big data platform for running large-scale distributed data processing jobs, interactive SQL queries, and machine learning (ML) applications using open-source analytics frameworks such as Apache Spark, Apache Hive, and Presto.

Apache Iceberg is an open table format for huge analytic datasets. Table formats typically indicate the format and location of individual table files. Iceberg adds functionality on top of that to help manage petabyte-scale datasets as well as newer data lake requirements such as transactions, upsert/merge, time travel, and schema and partition evolution. Iceberg adds tables to compute engines including Spark, Trino, PrestoDB, Flink, and Hive using a high-performance table format that works just like a SQL table.

Amazon EMR release 6.5.0 and later includes Apache Iceberg so you can reliably work with huge tables with full support for ACID (Atomic, Consistent, Isolated, Durable) transactions in a highly concurrent and performant manner without getting locked into a single file format.

In this post, we discuss the modern data lake requirements and the challenges—including support for ACID transactions and concurrent writers, partition and schema evolution—that come with these. We also discuss how Iceberg solves these challenges. Additionally, we provide a step-by-step guide on how to get started with an Iceberg notebook in Amazon EMR Studio. You can access this sample notebook from the GitHub repo. You can also find this notebook in your EMR Studio workspace under Notebook Examples.

Modern data lake challenges

Amazon EMR integrates with Amazon Simple Storage Service (Amazon S3) natively for persistent data storage, and allows you to independently scale your data in Amazon S3 and compute on your EMR cluster. This enables you to bring in data from multiple sources (for example, transactional data from operational databases, social media feeds, and SaaS data sources) using different tools, and each data source has its own transient EMR cluster to perform transformation and ingestion in parallel. You can now keep one central copy of your data and share it with multiple user groups that run analytics and even make in-place updates on a data lake. We’re increasingly seeing the following requirements (and challenges) emerge as mainstream:

  • Consistent reads and writes across multiple concurrent users – There are two primary concerns:
    • Reader-writer isolation – When a job is updating a huge dataset, another job accessing the same data frequently works on a partially updated dataset, leaving the data in an inconsistent state.
    • Concurrent writes on the same dataset – Table formats relying on coarse-grained locks slow down the system. This limitation is even more telling in real-time streaming workloads.
  • Consistent table updates across multiple files or partitions – With Hive tables, writing to multiple partitions at once isn’t an atomic operation. If you’re overwriting a partition, for instance, you might delete several files across partitions without having any guarantees that you will replace them, potentially resulting in data loss. For huge tables, it’s not practical to use global locks and keep the readers and writers waiting. Common workarounds (such as rewriting all the data in all the partitions that need to be changed at the same time and then pointing to the new locations) cause huge data duplication and redundant extract, transform, and load (ETL) jobs.
  • Continuous schema evolution – Simple DDL commands often render the data unusable. For instance, say a data engineer renames a column and writes some data. The consuming analytics tool now can’t read it because the metastore can’t track former names for columns. That rename operation has effectively dropped a column and added a new column. Now there is data written in both schemas. Historically, schema changes required expensive backfills and redundant ETL processes.
  • Different query patterns on the same data – If you change the partitioning to optimize your query after a year, say from daily to hourly, you have to rewrite the table with the new hour column as the partition. In addition, you have to rewrite queries to use the new partition column in your table.
  • ACID transactions, streaming upserts, file size optimization, and data snapshots – Existing tools that support these features lock you into specific file formats, complicating interoperability across the analytics ecosystem.
  • Support for mixed file formats – With existing solutions, if you rename a column in one file format (say Parquet, ORC, or Avro), you get a different behavior than if you rename a column in a different file format. There is inconsistency in data types supported by different file formats. These limitations necessitate additional ETL steps.

The problem

When multiple users share the same data, varied requirements ensue. The data platform needs to be transactional to handle concurrent upserts and reads.

Table formats such as Hive track a list of partitions inside the table within a data catalog. However, the underlying files are still not tracked transactionally, because we’re relying on an immutable object storage that is just not designed to be transactional. After the specific partitions to be updated or inserted have been identified, we still need to list all the files in those partitions at the leaf level of the partition hierarchy before we can filter out which of those files are relevant. For huge analytic datasets with thousands of files in each partition, listing all those files each time you run a query slows it down considerably. Furthermore, doing atomic commits—getting thousands of files in the table live in exactly the same moment—becomes impractical.

Apache Iceberg on Amazon EMR

Iceberg development was started by Netflix in December 2017 and was donated to the Apache software foundation in November 2018 as an incubator project. In May 2020, it graduated from the incubator.

Iceberg on Amazon EMR comes completely integrated and tested for running in production backed by Enterprise Support. This means you get 24/7 technical support from Amazon EMR experts, tools and technology to automatically manage the health of your environment, and consultative architectural, performance, and troubleshooting guidance on Iceberg issues.

Iceberg has integrations with other AWS services. For example, you can use the AWS Glue Data Catalog as the metastore for Iceberg tables. Iceberg also supports other catalog types such as Hive, Hadoop, Amazon DynamoDB, Amazon Relational Database Service (Amazon RDS), and other custom implementations. When using AWS Glue as the data catalog, the AWS Glue database serves as your Iceberg namespace. Similarly, the AWS Glue table and AWS Glue TableVersion serve as the Iceberg table and table version, respectively. Your AWS Glue Data Catalog could be in the same or different account or even a different Region, making multi-account, multi-Region pipelines easily deployable. Amazon Athena supports read, time travel, write, and DDL queries for Apache Iceberg tables that use the Apache Parquet format for data and the AWS Glue Data Catalog for their metastore.

How Iceberg addresses these challenges

Iceberg tracks individual data files in a table instead of simply maintaining a pointer to high-level table or partition locations. This allows writers to create data files in-place and only adds files to the table in an explicit commit. Every time new datasets are ingested into this table, a new point-in-time snapshot gets created. At query time, there is no need to list a directory to find the files we need to work with, because the snapshot already has that information pre-populated during the write time. Because of this design, Iceberg solves the problems listed earlier in the following ways:

  • Consistent reads and writes across multiple concurrent users – Iceberg relies on optimistic concurrency to support concurrent reads and writes from multiple user groups. If two operations are running at the same time, only one of them will be successful. The other job will retry, but that retry will be implicit to the user and that will be done at the metadata level. If Iceberg detects that the second update is not in conflict, it will commit it successfully.
  • Consistent table updates across multiple partitions – In Iceberg, the partition of a file isn’t determined by the physical location of the files within directories or prefixes. Instead, Iceberg stores partition information within manifests of the data files. Therefore, updates across multiple partitions entail a simple, atomic metadata change.
  • Continuous schema evolution – Iceberg tracks columns by using unique IDs and not by the column name, which enables easy schema evolution. You can safely add, drop, rename, or even reorder columns. You can also update column data types if the update is safe (such as widening from INT to BIGINT or float to double)
  • Different query patterns on the same data – Iceberg keeps track of the relationship between partitioning values and the column that they came from. Logical data is decoupled from physical layout, which enables easy partition evolution as well. Partition values can be implicitly derived using a transform such as day(timestamp) or hour(timestamp) of an existing column.
  • ACID transactions, streaming upserts, file size optimization, and data snapshots – Iceberg supports ACID transactions with serializable isolation. Furthermore, Iceberg supports deletes, upserts, change data capture (CDC), time travel (getting the state of the data from a past time regardless of the current state of the data), and compaction (consolidating small files into larger files to reduce metadata overhead and improve query speed). Table changes are atomic, and readers never see partial or uncommitted changes.
  • Support for mixed file formats – Because schema fields are tracked by unique IDs independent of the underlying file format, you can have consistent queries across file formats such as Avro, Parquet, and ORC.

Using Apache Iceberg with Amazon EMR

In this post, we demonstrate creating an Amazon EMR cluster that supports Iceberg using the AWS Command Line Interface (AWS CLI). You can also create the cluster from the Amazon EMR console. We use Amazon EMR Studio to run notebook code on our EMR cluster. To set up an EMR Studio, refer to Set up an EMR Studio. First, we note down the subnets that we specified when we created our EMR Studio. Now we launch our EMR cluster using the AWS CLI:

aws emr create-cluster \
--name iceberg-emr-cluster \
--use-default-roles \
--release-label emr-6.6.0 \
--instance-count 1 \
--instance-type r5.4xlarge \
--applications Name=Hadoop Name=Livy Name=Spark Name=JupyterEnterpriseGateway \
--ec2-attributes SubnetId=<EMR-STUDIO-SUBNET>\
--configurations '[{"Classification":"iceberg-
defaults","Properties":{"iceberg.enabled":"true"}},{"Classification":"spark-hive-
site","Properties":{"hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.met
astore.AWSGlueDataCatalogHiveClientFactory"}}]'

We choose emr-6.6.0 as the release label. This release comes with Iceberg version 0.13.1 pre-installed. We launch a single-node EMR cluster with the instance type R5.4xlarge and with the following applications installed: Hadoop, Spark, Livy, and Jupyter Enterprise Gateway. Make sure that you replace <EMR-STUDIO-SUBNET> with a subnet ID from the list of EMR Studio’s subnets you noted earlier. We need to enable Iceberg and the AWS Glue Data Catalog on our cluster. To do this, we use the following configuration classifications:

[
  {
    "Classification": "iceberg-defaults ",
    "Properties": {
      "iceberg.enabled":"true"
    }
  },
  {
    "Classification": "spark-hive-site ",
    "Properties": {
      "hive.metastore.client.factory.class":        
         "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
    }
  }
]

Initial setup

Let’s first create an S3 bucket location in the same Region as the EMR cluster to save a sample dataset that we’re going to create and work with. In this post, we use the placeholder bucket name YOUR-BUCKET-NAME. Remember to replace this with a globally unique bucket name when testing this out in your environment. From our EMR Studio workspace, we attach our cluster and use the PySpark kernel.

You can upload the sample notebook from the GitHub repo or use the Iceberg example under Notebook Examples in your own EMR Studio workspace and run the cells following the instructions in the notebook.

Configure a Spark session

In this command, we set our AWS Glue Data Catalog name as glue_catalog1. You can replace it with a different name. But if you do so, remember to change the Data Catalog name throughout this example, because we use the fully qualified table name including the Data Catalog name in all of our commands going forward. In the following command, remember to replace YOUR-BUCKET-NAME with your own bucket name:

%%configure -f
{
    "conf":  {
             "spark.sql.catalog.glue_catalog1": "org.apache.iceberg.spark.SparkCatalog",
             "spark.sql.catalog.glue_catalog1.warehouse": 
                   "s3://YOUR-BUCKET-NAME/iceberg/glue_catalog1/tables/",
             "spark.sql.catalog.glue_catalog1.catalog-impl":    "org.apache.iceberg.aws.glue.GlueCatalog",
             "spark.sql.catalog.glue_catalog1.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
             "spark.sql.catalog.glue_catalog1.lock-impl": "org.apache.iceberg.aws.glue.DynamoLockManager",
             "spark.sql.catalog.glue_catalog1.lock.table": "myGlueLockTable",
  "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
           } 
}

Let’s assume that the name of your catalog is glue_catalog1. The preceding code has the following components:

  • glue_catalog1.warehouse points to the Amazon S3 path where you want to store your data and metadata.
  • To make the catalog an AWS Glue Data Catalog, set glue_catalog1.catalog-impl to org.apache.iceberg.aws.glue.GlueCatalog. This key is required to point to an implementation class for any custom catalog implementation.
  • Use org.apache.iceberg.aws.s3.S3FileIO as the glue_catalog1.io-impl in order to take advantage of Amazon S3 multipart upload for high parallelism.
  • We use an Amazon DynamoDB table for lock implementation. This is optional, and is recommended for high concurrency workloads. To do that, we set lock-impl for our catalog to org.apache.iceberg.aws.glue.DynamoLockManager and we set lock.table to myGlueLockTable as the table name so that for every commit, the Data Catalog first obtains a lock using this table and then tries to safely modify the AWS Glue table. If you choose this option, the table gets created in your own account. Note that you need to have the necessary access permissions to create and use a DynamoDB table. Furthermore, additional DynamoDB charges apply.

Now that you’re all set with your EMR cluster for compute, S3 bucket for data, and AWS Glue Data Catalog for metadata, you can start creating a table and running the DML statements.

For all commands going forward, we use the %%sql cell magic to run Spark SQL commands in our EMR Studio notebook. However, for brevity, we don’t show the cell magic command. But you may need to use that in your Studio notebook for the SQL commands to work.

Create an Iceberg table in the AWS Glue Data Catalog

The default catalog is the AwsDataCatalog. Let’s switch to our AWS Glue catalog glue_catalog1, which has support for Iceberg tables. There are no namespaces as yet. A namespace in Iceberg is the same thing as a database in AWS Glue.

%%sql
use glue_catalog1

Let’s create a table called orders. The DDL syntax looks the same as creating a Hive table, for example, except that we include USING iceberg:

CREATE TABLE glue_catalog1.salesdb.orders
    (
      order_id              int,
      product_name          string,
      product_category      string,
      qty                   int,
      unit_price            decimal(7,2),
      order_datetime        timestamp
    )
USING iceberg
PARTITIONED BY (days(order_datetime))

Note that we’re also partitioning this table by extracting the day out of the order_datetime column. We don’t have to create a separate column for the partition.

DML statements

We then insert records to our table. Here is an example:

INSERT INTO glue_catalog1.salesdb.orders VALUES 
    (
        1, 
        'Harry Potter and the Prisoner of Azkaban',
        'Books',
        2,
        7.99,
        current_timestamp()
    )

DML statements result in snapshots getting created. Note the snapshot_id and the timestamp column called committed_at:

SELECT * FROM glue_catalog1.salesdb.orders.snapshots;

We now insert four more records and then query the orders table and confirm that the five records are present:

SELECT * FROM glue_catalog1.salesdb.orders

Querying from Athena

Because Iceberg on Amazon EMR comes pre-integrated with the AWS Glue Data Catalog, we can now query the Iceberg tables from AWS analytics services that support Iceberg. Let’s query the salesdb/orders table from Athena as shown in the following screenshot.

Upserts

The notebook then gives examples for updates and deletes, and even upserts. We use the MERGE INTO statement for upserts, which uses the source table orders_update with new and updated records:

MERGE INTO glue_catalog1.salesdb.orders target 
USING glue_catalog1.salesdb.orders_update source          
ON target.order_id = source.order_id              
WHEN MATCHED THEN 
    UPDATE SET
        order_id = source.order_id,
        product_name = source.product_name,
        product_category = source.product_category,
        qty = source.qty,
        unit_price = source.unit_price,
        order_datetime = source.order_datetime
WHEN NOT MATCHED THEN
    INSERT *
select * from glue_catalog1.salesdb.orders;

Schema evolution

We then walk through schema evolution using simple ALTER TABLE commands to add, rename, and drop columns. The following example how simple it is to rename a column:

ALTER TABLE glue_catalog1.salesdb.orders RENAME COLUMN qty TO quantity
DESC table glue_catalog1.salesdb.orders

Time travel

Iceberg also allows us to travel backward or forward by storing point-in-time snapshots. We can travel using timestamps when the snapshots were created or directly using the snapshot_id. The following is an example of a CALL statement that uses rollback_to_snapshot:

%%sql
CALL glue_catalog1.system.rollback_to_snapshot('salesdb.orders', 8008410363488501197)

We then travel forward in time by calling set_current_snapshot:

%%sql
CALL glue_catalog1.system.set_current_snapshot('salesdb.orders', 8392090950225782953)

Partition evolution

The notebook ends with an example that shows how partition evolution works in Iceberg. Iceberg stores partition information as part of the metadata. Because there is no separate partition column in the data itself, changing the partitioning scheme to hourly partitions for example is just a matter of calling a different partition transform hours(…) on an existing column order_datetime as shown in the following example:

%%sql
ALTER TABLE glue_catalog1.salesdb.orders ADD PARTITION FIELD hours(order_datetime)

You can continue to use the old partition on the old data. New data is written using the new spec in a new layout. Metadata for each of the partition versions is kept separately.

The notebook shows how you can query the table using the new hourly partition:

%%sql
SELECT * FROM glue_catalog1.salesdb.orders where hour(order_datetime)=1

You can continue to query your old data using the day() transform. There is only the original order_datetime column in the table.

%%sql
SELECT * FROM glue_catalog1.salesdb.orders where day(order_datetime)>=14

You don’t have to store additional columns to accommodate multiple partitioning schemes. The partition definitions are in the metadata, providing the flexibility to evolve and change the partition definitions in the future.

Conclusion

In this post, we introduced Apache Iceberg and explained how Iceberg solves some challenges in modern data lakes. We then walked you through how to run Iceberg on Amazon EMR using the AWS Glue Data Catalog as the metastore, and query the data using Athena. You can also run upserts on this data from Athena. There is no additional cost to using Iceberg with Amazon EMR.

For more information about Iceberg, refer to How Iceberg works. Iceberg on Amazon EMR, with its integration with AWS Analytics services, can simplify the way you process, upsert, and delete data, with full support for ACID transactions in Amazon S3. You can also implement schema evolution, partition evolution, time travel, and compaction of data.


About the Author

Sekar Srinivasan is a Sr. Specialist Solutions Architect at AWS focused on Big Data and Analytics. Sekar has over 20 years of experience working with data. He is passionate about helping customers build scalable solutions modernizing their architecture and generating insights from their data. In his spare time he likes to work on non-profit projects, especially those focused on underprivileged Children’s education.

Build an Apache Iceberg data lake using Amazon Athena, Amazon EMR, and AWS Glue

Post Syndicated from Kishore Dhamodaran original https://aws.amazon.com/blogs/big-data/build-an-apache-iceberg-data-lake-using-amazon-athena-amazon-emr-and-aws-glue/

Most businesses store their critical data in a data lake, where you can bring data from various sources to a centralized storage. The data is processed by specialized big data compute engines, such as Amazon Athena for interactive queries, Amazon EMR for Apache Spark applications, Amazon SageMaker for machine learning, and Amazon QuickSight for data visualization.

Apache Iceberg is an open-source table format for data stored in data lakes. It is optimized for data access patterns in Amazon Simple Storage Service (Amazon S3) cloud object storage. Iceberg helps data engineers tackle complex challenges in data lakes such as managing continuously evolving datasets while maintaining query performance. Iceberg allows you to do the following:

  • Maintain transactional consistency where files can be added, removed, or modified atomically with full read isolation and multiple concurrent writes
  • Implement full schema evolution to process safe table schema updates as the table data evolves
  • Organize tables into flexible partition layouts with partition evolution, enabling updates to partition schemes as queries and data volume changes without relying on physical directories
  • Perform row-level update and delete operations to satisfy new regulatory requirements such as the General Data Protection Regulation (GDPR)
  • Provide versioned tables and support time travel queries to query historical data and verify changes between updates
  • Roll back tables to prior versions to return tables to a known good state in case of any issues

In 2021, AWS teams contributed the Apache Iceberg integration with the AWS Glue Data Catalog to open source, which enables you to use open-source compute engines like Apache Spark with Iceberg on AWS Glue. In 2022, Amazon Athena announced support of Iceberg and Amazon EMR added support of Iceberg starting with version 6.5.0.

In this post, we show you how to use Amazon EMR Spark to create an Iceberg table, load sample books review data, and use Athena to query, perform schema evolution, row-level update and delete, and time travel, all coordinated through the AWS Glue Data Catalog.

Solution overview

We use the Amazon Customer Reviews public dataset as our source data. The dataset contains data files in Apache Parquet format on Amazon S3. We load all the book-related Amazon review data as an Iceberg table to demonstrate the advantages of using the Iceberg table format on top of raw Parquet files. The following diagram illustrates our solution architecture.

Architecture that shows the flow from Amazon EMR loading data into Amazon S3, and queried by Amazon Athena through AWS Glue Data Catalog.

To set up and test this solution, we complete the following high-level steps:

  1. Create an S3 bucket.
  2. Create an EMR cluster.
  3. Create an EMR notebook.
  4. Configure a Spark session.
  5. Load data into the Iceberg table.
  6. Query the data in Athena.
  7. Perform a row-level update in Athena.
  8. Perform a schema evolution in Athena.
  9. Perform time travel in Athena.
  10. Consume Iceberg data across Amazon EMR and Athena.

Prerequisites

To follow along with this walkthrough, you must have the following:

  • An AWS Account with a role that has sufficient access to provision the required resources.

Create an S3 bucket

To create an S3 bucket that holds your Iceberg data, complete the following steps:

  1. On the Amazon S3 console, choose Buckets in the navigation pane.
  2. Choose Create bucket.
  3. For Bucket name, enter a name (for this post, we enter aws-lake-house-iceberg-blog-demo).

Because S3 bucket names are globally unique, choose a different name when you create your bucket.

  1. For AWS Region, choose your preferred Region (for this post, we use us-east-1).

Create a new Amazon S3 bucket. Choose us-east-1 as region

  1. Complete the remaining steps to create your bucket.
  2. If this is the first time that you’re using Athena to run queries, create another globally unique S3 bucket to hold your Athena query output.

Create an EMR cluster

Now we’re ready to start an EMR cluster to run Iceberg jobs using Spark.

  1. On the Amazon EMR console, choose Create cluster.
  2. Choose Advanced options.
  3. For Software Configuration, choose your Amazon EMR release version.

Iceberg requires release 6.5.0 and above.

  1. Select JupyterEnterpriseGateway and Spark as the software to install.
  2. For Edit software settings, select Enter configuration and enter [{"classification":"iceberg-defaults","properties":{"iceberg.enabled":true}}].
  3. Leave other settings at their default and choose Next.

Choose Amazon EMR release 6.6.0 and JupyterEnterpriseGateway and Spark. Enter configuration information.

  1. You can change the hardware used by the Amazon EMR cluster in this step. In this demo, we use the default setting.
  2. Choose Next.
  3. For Cluster name, enter Iceberg Spark Cluster.
  4. Leave the remaining settings unchanged and choose Next.

Provide Iceberg Spark Cluster as the Cluster name

  1. You can configure security settings such as adding an EC2 key pair to access your EMR cluster locally. In this demo, we use the default setting.
  2. Choose Create cluster.

You’re redirected to the cluster detail page, where you wait for the EMR cluster to transition from Starting to Waiting.

Create an EMR notebook

When the cluster is active and in the Waiting state, we’re ready to run Spark programs in the cluster. For this demo, we use an EMR notebook to run Spark commands.

  1. On the Amazon EMR console, choose Notebooks in the navigation pane.
  2. Choose Create notebook.
  3. For Notebook name, enter a name (for this post, we enter iceberg-spark-notebook).
  4. For Cluster, select Choose an existing cluster and choose Iceberg Spark Cluster.
  5. For AWS service role, choose Create a new role to create EMR_Notebook_DefaultRole or choose a different role to access resources in the notebook.
  6. Choose Create notebook.

Create an Amazon EMR notebook. Use EMR_Notebooks_DefaultRole

You’re redirected to the notebook detail page.

  1. Choose Open in JupyterLab next to your notebook.
  2. Choose to create a new notebook.
  3. Under Notebook, choose Spark.

Choose Spark from the options provided in the Launcher

Configure a Spark session

In your notebook, run the following code:

%%configure -f
{
  "conf": {
    "spark.sql.catalog.demo": "org.apache.iceberg.spark.SparkCatalog",
    "spark.sql.catalog.demo.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
    "spark.sql.catalog.demo.warehouse": "s3://<your-iceberg-blog-demo-bucket>",
    "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
  }
}

This sets the following Spark session configurations:

  • spark.sql.catalog.demo – Registers a Spark catalog named demo, which uses the Iceberg Spark catalog plugin
  • spark.sql.catalog.demo.catalog-impl – The demo Spark catalog uses AWS Glue as the physical catalog to store Iceberg database and table information
  • spark.sql.catalog.demo.warehouse – The demo Spark catalog stores all Iceberg metadata and data files under the root path s3://<your-iceberg-blog-demo-bucket>
  • spark.sql.extensions – Adds support to Iceberg Spark SQL extensions, which allows you to run Iceberg Spark procedures and some Iceberg-only SQL commands (you use this in a later step)

Load data into the Iceberg table

In our Spark session, run the following commands to load data:

// create a database in AWS Glue named reviews if not exist
spark.sql("CREATE DATABASE IF NOT EXISTS demo.reviews")

// load reviews related to books
val book_reviews_location = "s3://amazon-reviews-pds/parquet/product_category=Books/*.parquet"
val book_reviews = spark.read.parquet(book_reviews_location)

// write book reviews data to an Iceberg v2 table
book_reviews.writeTo("demo.reviews.book_reviews").tableProperty("format-version", "2").createOrReplace()

Iceberg format v2 is needed to support row-level updates and deletes. See Format Versioning for more details.

It may take up to 15 minutes for the commands to complete. When it’s complete, you should be able to see the table on the AWS Glue console, under the reviews database, with the table_type property shown as ICEBERG.

Shows the table properties for book_reviews table

The table schema is inferred from the source Parquet data files. You can also create the table with a specific schema before loading data using Spark SQL, Athena SQL, or Iceberg Java and Python SDKs.

Query in Athena

Navigate to the Athena console and choose Query editor. If this is your first time using the Athena query editor, you need to configure to use the S3 bucket you created earlier to store the query results.

The table book_reviews is available for querying. Run the following query:

SELECT * FROM reviews.book_reviews LIMIT 5;

The following screenshot shows the first five records from the table being displayed.

Amazon Athena query the first 5 rows and show the results

Perform a row-level update in Athena

In the next few steps, let’s focus on a record in the table with review ID RZDVOUQG1GBG7. Currently, it has no total votes when we run the following query:

SELECT total_votes FROM reviews.book_reviews 
WHERE review_id = 'RZDVOUQG1GBG7'

Query total_votes for a particular review which shows a value of 0

Let’s update the total_votes value to 2 using the following query:

UPDATE reviews.book_reviews
SET total_votes = 2
WHERE review_id = 'RZDVOUQG1GBG7'

Update query to set the total_votes for the previous review_id to 2

After your update command runs successfully, run the below query and note the updated result showing a total of two votes:

SELECT total_votes FROM reviews.book_reviews
WHERE review_id = 'RZDVOUQG1GBG7'

Athena enforces ACID transaction guarantee for all the write operations against an Iceberg table. This is done through the Iceberg format’s optimistic locking specification. When concurrent attempts are made to update the same record, a commit conflict occurs. In this scenario, Athena displays a transaction conflict error, as shown in the following screenshot.

Concurrent updates causes a failure. This shows the TRANSACTION_CONFLICT error during this scenario.

Delete queries work in a similar way; see DELETE for more details.

Perform a schema evolution in Athena

Suppose the review suddenly goes viral and gets 10 billion votes:

UPDATE reviews.book_reviews
SET total_votes = 10000000000
WHERE review_id = 'RZDVOUQG1GBG7'

Based on the AWS Glue table information, the total_votes is an integer column. If you try to update a value of 10 billion, which is greater than the maximum allowed integer value, you get an error reporting a type mismatch.

Updating to a very large value greater than maximum allowed integer value results in an error

Iceberg supports most schema evolution features as metadata-only operations, which don’t require a table rewrite. This includes add, drop, rename, reorder column, and promote column types. To solve this issue, you can change the integer column total_votes to a BIGINT type by running the following DDL:

ALTER TABLE reviews.book_reviews
CHANGE COLUMN total_votes total_votes BIGINT;

You can now update the value successfully:

UPDATE reviews.book_reviews
SET total_votes = 10000000000
WHERE review_id = 'RZDVOUQG1GBG7'

Querying the record now gives us the expected result in BIGINT:

SELECT total_votes FROM reviews.book_reviews
WHERE review_id = 'RZDVOUQG1GBG7'

Perform time travel in Athena

In Iceberg, the transaction history is retained, and each transaction commit creates a new version. You can perform time travel to look at a historical version of a table. In Athena, you can use the following syntax to travel to a time that is after when the first version was committed:

SELECT total_votes FROM reviews.book_reviews
FOR SYSTEM_TIME AS OF localtimestamp + interval '-20' minute
WHERE review_id = 'RZDVOUQG1GBG7'

Query an earlier snapshot using time travel feature

Consume Iceberg data across Amazon EMR and Athena

One of the most important features of a data lake is for different systems to seamlessly work together through the Iceberg open-source protocol. After all the operations are performed in Athena, let’s go back to Amazon EMR and confirm that Amazon EMR Spark can consume the updated data.

First, run the same Spark SQL and see if you get the same result for the review used in the example:

val select_votes = """SELECT total_votes FROM demo.reviews.book_reviews
WHERE review_id = 'RZDVOUQG1GBG7'"""

spark.sql(select_votes).show()

Spark shows 10 billion total votes for the review.

Shows the latest value of total_votes when querying using the Amazon EMR notebook

Check the transaction history of the operation in Athena through Spark Iceberg’s history system table:

val select_history = "SELECT * FROM demo.reviews.book_reviews.history"

spark.sql(select_history).show()

This shows three transactions corresponding to the two updates you ran in Athena.

Shows snapshots corresponding to the two updates you ran in Athena

Iceberg offers a variety of Spark procedures to optimize the table. For example, you can run an expire_snapshots procedure to remove old snapshots, and free up storage space in Amazon S3:

import java.util.Calendar
import java.text.SimpleDateFormat

val now = Calendar.getInstance().getTime()
val form = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val now_formatted = form.format(now.getTime())
val procedure = s"""CALL demo.system.expire_snapshots(
  table => 'reviews.book_reviews',
  older_than => TIMESTAMP '$now_formatted',
  retain_last => 1)"""

spark.sql(procedure)

Note that, after running this procedure, time travel can no longer be performed against expired snapshots.

Examine the history system table again and notice that it shows you only the most recent snapshot.

Running the following query in Athena results in an error “No table snapshot found before timestamp…” as older snapshots were deleted, and you are no longer able to time travel to the older snapshot:

SELECT total_votes FROM reviews.book_reviews
FOR SYSTEM_TIME AS OF localtimestamp + interval '-20' minute
WHERE review_id = 'RZDVOUQG1GBG7'

Clean up

To avoid incurring ongoing costs, complete the following steps to clean up your resources:

  1. Run the following code in your notebook to drop the AWS Glue table and database:
// DROP the table 
spark.sql("DROP TABLE demo.reviews.book_reviews") 
// DROP the database 
spark.sql("DROP DATABASE demo.reviews")
  1. On the Amazon EMR console, choose Notebooks in the navigation pane.
  2. Select the notebook iceberg-spark-notebook and choose Delete.
  3. Choose Clusters in the navigation pane.
  4. Select the cluster Iceberg Spark Cluster and choose Terminate.
  5. Delete the S3 buckets and any other resources that you created as part of the prerequisites for this post.

Conclusion

In this post, we showed you an example of using Amazon S3, AWS Glue, Amazon EMR, and Athena to build an Iceberg data lake on AWS. An Iceberg table can seamlessly work across two popular compute engines, and you can take advantage of both to design your customized data production and consumption use cases.

With AWS Glue, Amazon EMR, and Athena, you can already use many features through AWS integrations, such as SageMaker Athena integration for machine learning, or QuickSight Athena integration for dashboard and reporting. AWS Glue also offers the Iceberg connector, which you can use to author and run Iceberg data pipelines.

In addition, Iceberg supports a variety of other open-source compute engines that you can choose from. For example, you can use Apache Flink on Amazon EMR for streaming and change data capture (CDC) use cases. The strong transaction guarantee and efficient row-level update, delete, time travel, and schema evolution experience offered by Iceberg offers a sound foundation and infinite possibilities for users to unlock the power of big data.


About the Authors

Kishore Dhamodaran is a Senior Solutions Architect at AWS. Kishore helps strategic customers with their cloud enterprise strategy and migration journey, leveraging his years of industry and cloud experience.

Jack Ye is a software engineer of the Athena Data Lake and Storage team. He is an Apache Iceberg Committer and PMC member.

Mohit Mehta is a Principal Architect at AWS with expertise in AI/ML and data analytics. He holds 12 AWS certifications and is passionate about helping customers implement cloud enterprise strategies for digital transformation. In his free time, he trains for marathons and plans hikes across major peaks around the world.

Giovanni Matteo Fumarola is the Engineering Manager of the Athena Data Lake and Storage team. He is an Apache Hadoop Committer and PMC member. He has been focusing in the big data analytics space since 2013.

Jared Keating is a Senior Cloud Consultant with AWS Professional Services. Jared assists customers with their cloud infrastructure, compliance, and automation requirements, drawing from his 20+ years of IT experience.

AWS Week In Review – June 6, 2022

Post Syndicated from Antje Barth original https://aws.amazon.com/blogs/aws/aws-week-in-review-june-6-2022/

This post is part of our Week in Review series. Check back each week for a quick roundup of interesting news and announcements from AWS!

I’ve just come back from a long (extended) holiday weekend here in the US and I’m still catching up on all the AWS launches that happened this past week. I’m particularly excited about some of the data, machine learning, and quantum computing news. Let’s have a look!

Last Week’s Launches
The launches that caught my attention last week are the following:

Amazon EMR Serverless is now generally available Amazon EMR Serverless allows you to run big data applications using open-source frameworks such as Apache Spark and Apache Hive without configuring, managing, and scaling clusters. The new serverless deployment option for Amazon EMR automatically scales resources up and down to provide just the right amount of capacity for your application, and you only pay for what you use. To learn more, check out Channy’s blog post and listen to The Official AWS Podcast episode on EMR Serverless.

AWS PrivateLink is now supported by additional AWS services AWS PrivateLink provides private connectivity between your virtual private cloud (VPC), AWS services, and your on-premises networks without exposing your traffic to the public internet. The following AWS services just added support for PrivateLink:

  • Amazon S3 on Outposts has added support for PrivateLink to perform management operations on your S3 storage by using private IP addresses in your VPC. This eliminates the need to use public IPs or proxy servers. Read the June 1 What’s New post for more information.
  • AWS Panorama now supports PrivateLink, allowing you to access AWS Panorama from your VPC without using public endpoints. AWS Panorama is a machine learning appliance and software development kit (SDK) that allows you to add computer vision (CV) to your on-premises cameras. Read the June 2 What’s New post for more information.
  • AWS Backup has added PrivateLink support for VMware workloads, providing direct access to AWS Backup from your VMware environment via a private endpoint within your VPC. Read the June 3 What’s New post for more information.

Amazon SageMaker JumpStart now supports incremental model training and automatic tuning – Besides ready-to-deploy solution templates for common machine learning (ML) use cases, SageMaker JumpStart also provides access to more than 300 pre-trained, open-source ML models. You can now incrementally train all the JumpStart models with new data without training from scratch. Through this fine-tuning process, you can shorten the training time to reach a better model. SageMaker JumpStart now also supports model tuning with SageMaker Automatic Model Tuning from its pre-trained model, solution templates, and example notebooks. Automatic tuning allows you to automatically search for the best hyperparameter configuration for your model.

Amazon Transcribe now supports automatic language identification for multi-lingual audioAmazon Transcribe converts audio input into text using automatic speech recognition (ASR) technology. If your audio recording contains more than one language, you can now enable multi-language identification, which identifies all languages spoken in the audio file and creates a transcript using each identified language. Automatic language identification for multilingual audio is supported for all 37 languages that are currently supported for batch transcriptions. Read the What’s New post from Amazon Transcribe to learn more.

Amazon Braket adds support for Borealis, the first publicly accessible quantum computer that is claimed to offer quantum advantage – If you are interested in quantum computing, you’ve likely heard the term “quantum advantage.” It refers to the technical milestone when a quantum computer outperforms the world’s fastest supercomputers on a well-defined task. Until now, none of the devices claimed to demonstrate quantum advantage have been accessible to the public. The Borealis device, a new photonic quantum processing unit (QPU) from Xanadu, is the first publicly available quantum computer that is claimed to have achieved quantum advantage. Amazon Braket, the quantum computing service from AWS, has just added support for Borealis. To learn more about how you can test a quantum advantage claim for yourself now on Amazon Braket, check out the What’s New post covering the addition of Borealis support.

For a full list of AWS announcements, be sure to keep an eye on the What’s New at AWS page.

Other AWS News
Some other updates and news that you may have missed:

New AWS Heroes – A warm welcome to our newest AWS Heroes! The AWS Heroes program is a worldwide initiative that acknowledges individuals who have truly gone above and beyond to share knowledge in technical communities. Get to know them in the June 2022 introduction blog post!

AWS open-source news and updates – My colleague Ricardo Sueiras writes this weekly open-source newsletter in which he highlights new open-source projects, tools, and demos from the AWS Community. Read edition #115 here.

Upcoming AWS Events
Join me in Las Vegas for Amazon re:MARS 2022. The conference takes place June 21–24 and is all about the latest innovations in machine learning, automation, robotics, and space. I will deliver a talk on how machine learning can help to improve disaster response. Say “Hi!” if you happen to be around and see me.

We also have more AWS Summits coming up over the next couple of months, both in-person and virtual.

In Europe:

In North America:

In South America:

Find an AWS Summit near you, and get notified when registration opens in your area.

Imagine Conference 2022You can now register for IMAGINE 2022 (August 3, Seattle). The IMAGINE 2022 conference is a no-cost event that brings together education, state, and local leaders to learn about the latest innovations and best practices in the cloud.

Sign up for the SQL Server Database Modernization webinar on June 21 to learn how to modernize and cost-optimize Microsoft SQL Server on AWS.

That’s all for this week. Check back next Monday for another Week in Review!

— Antje

Amazon EMR Serverless Now Generally Available – Run Big Data Applications without Managing Servers

Post Syndicated from Channy Yun original https://aws.amazon.com/blogs/aws/amazon-emr-serverless-now-generally-available-run-big-data-applications-without-managing-servers/

At AWS re:Invent 2021, we introduced three new serverless options for our data analytics services – Amazon EMR Serverless, Amazon Redshift Serverless, and Amazon MSK Serverless – that make it easier to analyze data at any scale without having to configure, scale, or manage the underlying infrastructure.

Today we announce the general availability of Amazon EMR Serverless, a serverless deployment option for customers to run big data analytics applications using open-source frameworks like Apache Spark and Hive without configuring, managing, and scaling clusters or servers.

With EMR Serverless, you can run analytics workloads at any scale with automatic scaling that resizes resources in seconds to meet changing data volumes and processing requirements. EMR Serverless automatically scales resources up and down to provide just the right amount of capacity for your application, and you only pay for what you use.

During the preview, we heard from customers that EMR Serverless is cost-effective because they do not incur cost from having to overprovision resources to deal with demand spikes. They do not have to worry about right-sizing instances or applying OS updates, and can focus on getting products to market faster.

Amazon EMR provides various deployment options to run applications to fit varied needs such as EMR clusters on Amazon Elastic Compute Cloud (Amazon EC2), Amazon Elastic Kubernetes Service (Amazon EKS) clusters, AWS Outposts, or EMR Serverless.

  • EMR on Amazon EC2 clusters is suitable for customers that need maximum control and flexibility over how to run their application. With EMR clusters, customers can choose the EC2 instance type to enhance the performance of certain applications, customize the Amazon Machine Image (AMI), choose EC2 instance configuration, customize, and extend open-source frameworks and install additional custom software on cluster instances.
  • EMR on Amazon EKS is suitable for customers that want to standardize on EKS to manage clusters across applications or use different versions of an open-source framework on the same cluster.
  • EMR on AWS Outposts is for customers who want to run EMR closer to their data center within an Outpost.
  • EMR Serverless is suitable for customers that want to avoid managing and operating clusters, and simply want to run applications using open-source frameworks.

Also, when you build an application using an EMR release (for example, a Spark job using EMR release 6.4), you can choose to run it on an EMR cluster, EMR on EKS, or EMR Serverless without having to rewrite the application. This allows you to build applications for a given framework version and retain the flexibility to change the deployment model based on future operational needs.

Getting Started with Amazon EMR Serverless
To get started with EMR Serverless, you can use Amazon EMR Studio, a free EMR feature which provides an end to end development and debugging experience. With EMR Studio, you can create EMR Serverless applications (Spark or Hive), choose the version of open-source software for your application, submit jobs, check the status of running jobs, and invoke Spark UI or Tez UI for job diagnostics.

When you select the Get started button in the EMR Serverless Console, you can create and set up EMR Studio with preconfigured EMR Serverless applications.

In EMR Studio, when you choose Applications in the Serverless menu, you can create one or more EMR Serverless applications and choose the open source framework and version for your use case. If you want separate logical environments for test and production or for different line-of-business use cases, you can create separate applications for each logical environment.

An EMR Serverless application is a combination of (a) the EMR release version for the open-source framework version you want to use and (b) the specific runtime that you want your application to use, such as Apache Spark or Apache Hive.

When you choose Create application, you can set your application NameType of either Spark or Hive, and supported Release version. You can also select the option of default or custom settings for pre-initialized capacity, application limits, and Amazon Virtual Private Cloud (Amazon VPC) connectivity options. Each EMR Serverless application is isolated from other applications and runs within a secure VPC.

Use the default option if you want jobs to start immediately. But charges apply for each worker when the application is started. To learn more about pre-initialized capacity, see Configuring and managing pre-initialized capacity.

When you select Start application, your application is setup to start with pre-initialized capacity of 1 Spark driver and 1 Spark executor. Your application is by default configured to start when jobs are submitted and stop when the application is idle for more than 15 minutes.

You can customize these settings and setup different application limits by selecting Choose custom settings.

In the Job runs menu, you can see a list of run jobs for your application.

Choose Submit job and set up job details such as the name, AWS Identity and Access Management (IAM) role used by the job, script location, and arguments of the JAR or Python script in the Amazon Simple Storage Service (Amazon S3) bucket that you want to run.

If you want logs for your Spark or Hive jobs to be submitted to your S3 bucket, you will need to setup the S3 bucket in the same Region where you are running EMR Serverless jobs.

Optionally, you can set additional configuration properties that you can specify for each job, such as Spark properties, job configurations to override the default configurations for applications (such as using the AWS Glue Data Catalog as its metastore), storing logs to Amazon S3, and retaining logs for 30 days.

The following is an example of running a Python script using the StartJobRun API.

$ aws emr-serverless start-job-run \
    --application-id <application_id> \
    --execution-role-arn <iam_role_arn> \
    --job-driver '{
        "sparkSubmit": {
            "entryPoint": "s3://spark-scripts/scripts/spark-etl.py",
            "entryPointArguments": "s3://spark-scripts/output",
            "sparkSubmitParameters": "--conf spark.executor.cores=1 --conf spark.executor.memory=4g --conf spark.driver.cores=1 --conf spark.driver.memory=4g --conf spark.executor.instances=1"
        }
    }' \
    --configuration-overrides '{
        "monitoringConfiguration": {
           "s3MonitoringConfiguration": {
             "logUri": "s3://spark-scripts/logs/"
           }
        }
    }'

You can check on job results in your S3 bucket. For details, you can use Spark UI for Spark Application, and Hive/Tez UI in the Job runs menu to understand how the job ran or to debug it if it failed.

For more debugging, EMR Serverless will push event logs to the sparklogs folder in your S3 log destination for Spark applications. In the case of Hive applications, EMR Serverless will continuously upload the Hive driver and Tez tasks logs to the HIVE_DRIVER or TEZ_TASK folders of your S3 log destination. To learn more, see Logging in the AWS documentation.

Things to Know
With EMR Serverless, you can get all the benefits of running Amazon EMR. I want to quote some things to know about EMR Serverless from an AWS Big Data Blog post of preview announcements:

  • Automatic and fine-grained scaling – EMR Serverless automatically scales up workers at each stage of processing your job and scales them down when they’re not required. You’re charged for aggregate vCPU, memory, and storage resources used from the time a worker starts running until it stops, rounded up to the nearest second with a 1-minute minimum. For example, your job may require 10 workers for the first 10 minutes of processing the job and 50 workers for the next 5 minutes. With fine-grained automatic scaling, you only incur cost for 10 workers for 10 minutes and 50 workers for 5 minutes. As a result, you don’t have to pay for underutilized resources.
  • Resilience to Availability Zone failures – EMR Serverless is a Regional service. When you submit jobs to an EMR Serverless application, it can run in any Availability Zone in the Region. In case an Availability Zone is impaired, a job submitted to your EMR Serverless application is automatically run in a different (healthy) Availability Zone. When using resources in a private VPC, EMR Serverless recommends that you specify the private VPC configuration for multiple Availability Zones so that EMR Serverless can automatically select a healthy Availability Zone.
  • Enable shared applications – When you submit jobs to an EMR Serverless application, you can specify the IAM role that must be used by the job to access AWS resources such as S3 objects. As a result, different IAM principals can run jobs on a single EMR Serverless application, and each job can only access the AWS resources that the IAM principal is allowed to access. This enables you to set up scenarios where a single application with a pre-initialized pool of workers is made available to multiple tenants wherein each tenant can submit jobs using a different IAM role but use the common pool of pre-initialized workers to immediately process requests.

Now Available
Amazon EMR Serverless is available in US East (N. Virginia), US West (Oregon), Europe (Ireland), and Asia Pacific (Tokyo) Regions. With EMR Serverless, there are no upfront costs, and you pay only for the resources you use. You pay for the amount of vCPU, memory, and storage resources consumed by your applications. For pricing details, see the EMR Serverless pricing page.

To learn more, visit the Amazon EMR Serverless User Guide. Please send feedback to AWS re:Post for Amazon EMR Serverless or through your usual AWS support contacts.

Learn all the details about Amazon EMR Serverless and get started today.

Channy

Deep dive into Amazon EMR Kerberos authentication integrated with Microsoft Active Directory

Post Syndicated from Anandkumar Kaliaperumal original https://aws.amazon.com/blogs/big-data/deep-dive-into-amazon-emr-kerberos-authentication-integrated-with-microsoft-active-directory/

Many of our customers that use Amazon EMR as their big data platform need to integrate with their existing Microsoft Active Directory (AD) for user authentication. This integration requires the Kerberos daemon of Amazon EMR to establish a trusted connection with an AD domain, which involves a lot of moving pieces and can be difficult to get right.

This post describes what a one-way trust with Active Directory means, and how the commands work that are used in setting it up. We describe how DNS names, Kerberos realms, and AD domains are different, and the consequences of that for Amazon EMR security configuration and cluster one-way trust settings. We also discuss how you can’t use AWS Managed Microsoft AD for the Amazon EMR key distribution center (KDC) trust, and must either use an existing or new AD server.

AWS has already put out documentation and blog posts that cover some of this area, and this post is meant to complement them rather than replace them. As such, we recommend reading the following before continuing:

Is this the right architecture for you?

There are several options for authenticating Amazon EMR with Kerberos, and choosing the right approach will depend on your use case. For more information, refer to Kerberos architecture options. In this post, we cover the case where your users are already in your AD domain, and you want to use those identities to authorize actions in Amazon EMR. If you don’t already have an AD, or you want to extend an existing non-AD Kerberos realm, then one of the other options will be better suited. Don’t make more work for yourself than you have to!

Connecting to Active Directory

There are two goals when connecting Amazon EMR to AD:

  • Establish a one-way trust with from Kerberos to AD so that users working in Amazon EMR can use their AD credentials to access services. This requires ports 88, 464 (for Kerberos), and 139 (for LDAP) to be accessible from the EMR cluster.
  • Connect the EMR cluster with AD so that the servers making up the EMR cluster can be registered in the AD domain. This requires a user configured in the AD domain with sufficient privileges to make those registrations—typically called a bind user.

Types of Active Directory

The combination of the preceding requirements means that only an AD server can meet the goals. You can’t use AWS Managed Microsoft AD, nor Microsoft Azure AD. This leaves two options as the best practice.

Firstly, you can connect Amazon EMR directly to an existing AD server (whether on premises or in the cloud), as shown in the following diagram.

Alternatively, you can build a new AD server on Amazon Elastic Compute Cloud (Amazon EC2), add it to the existing corporate AD forest, and have Amazon EMR connect to this new cloud-based AD server (as shown in the following diagram).

Domains vs. realms

It is critical to understand which terms apply to which technology to avoid confusion.

Active Directory manages domains, which contain many registered entities like users and computers. They’re typically written in lowercase (for example, corp.mycompany.com) and look very similar to internet domains. They don’t necessarily have to resolve to an IP address, but they typically do.

Kerberos uses realms for a similar concept, and its registered entities are referred to as principals. Realms are typically written in uppercase, and often use a naming style that looks like an internet domain except for the case (for example, EMR.MYCOMPANY.COM). They don’t need to resolve to an IP address and typically do not.

When configuring the realm for an EMR cluster’s Kerberos daemon, the name is completely arbitrary. It serves as a namespace for the principals defined within it, but it doesn’t have to match the domain names of the instances in your EMR cluster. For example, if the private DNS names of your EC2 machines use the default ec2.internal domain, your EMR realm name doesn’t have to be ec2.internal; it could be mykerberosrealm or anything you like.

Bind user

A bind user has to be created in Active Directory with the permission to register (“bind”) EMR computers into the AD domain. Amazon EMR registers these computers under CN=Computers.

The Kerberos principals that Amazon EMR creates for use with the components of Hadoop are strictly local to the Amazon EMR Kerberos installation—they’re not registered in the AD domain.

DNS

The EMR Kerberos daemon has to be able to resolve the DNS name of your AD server in order to establish the trust between them. If those DNS domains are managed in your corporate DNS servers, you need Amazon Route 53 forwarders to your corporate DNS servers for Amazon EMR to resolve them. Because the trust is only one-way, the AD server doesn’t need to be able to resolve the internal DNS names of the EMR cluster nodes.

Establish trust

The trust that you need to establish from Amazon EMR to AD only needs to be one-way (Amazon EMR trusts AD), not two-way (they trust each other). To establish the one-way trust, use the ksetup and netdom utilities on the command line of the AD machine. The recommended encryption type attribute (SetEncTypeAttr) for the domain is an AES-256 cipher. The following code uses the example of the EMR Kerberos realm EMR.MYCOMPANY.COM and the AD domain corp.mycompany.com:

C:\Users\Administrator> ksetup /addkdc EMR.MYCOMPANY.COM
C:\Users\Administrator> netdom trust EMR.MYCOMPANY.COM /Domain:corp.mycompany.com /add /realm /passwordt:MyVeryStrongPassword
C:\Users\Administrator> ksetup /SetEncTypeAttr EMR.MYCOMPANY.COM AES256-CTS-HMAC-SHA1-96

In the first ksetup command, you don’t need to provide the fully qualified domain name of the cluster KDC as a final argument. That is only needed for a two-way trust, and is optional in a one-way trust.

Amazon EMR security configuration

Before you start the EMR cluster, you must create a security configuration that contains the details of the AD server and domain to which you’re connecting. The following screenshot shows an example of that configuration.

These fields are case-sensitive, so make sure that you enter everything correctly. Note that the domain and realm in this configuration both refer to the AD server, and not to the EMR cluster! Because AD can act as a Kerberos daemon as well as Active Directory, both of those fields are configured here. In both cases, however, they refer to the AD server and not the EMR cluster’s Kerberos domain.

Amazon EMR security options when starting a cluster

When you start an EMR cluster, you configure the security options as shown in the following screenshot.

Here you use the security configuration you just created, and specify the details of the EMR Kerberos realm as well as the parameters needed to establish the trust with the AD domain in the security configuration. You provide information for the following fields:

  • Realm – The Kerberos realm you specify is entirely up to you, but must be the same as the one you used when establishing the trust on the AD machine.
  • KDC admin password – This isn’t used anywhere else in the cluster configuration, so you can set it to something unique and secure specifically for this cluster. It’s only needed for any future management of the cluster-dedicated KDC.
  • Cross-realm trust principal password – This is the password you set with the netdom command.
  • Active Directory domain join user –This is the bind user your AD admin created.
  • Active Directory domain join password – This is the password for the bind user from AD.

Clean up

When you’re done testing this solution, remember to clean up the resources. If you used the CloudFormation templates to create the resources, then use the AWS CloudFormation console to delete the stack. Alternatively, you can use the AWS Command Line Interface (AWS CLI) or SDKs. For instructions, refer to Deleting a stack. Deleting a stack also deletes the resources created by that stack.

If one of your stacks doesn’t delete, make sure that there are no dependencies on the resources created by that stack. For example, if you deployed an Amazon VPC using AWS CloudFormation and then deployed a domain controller into that VPC using a different CloudFormation stack, you must first delete the domain controller stack before you can delete the VPC stack.

Conclusion

The steps in this post walked you through creating the trust between Amazon EMR’s Kerberos daemon and an Active Directory domain. We hope that this has demystified the process and makes it easy for you to create secure, AD-integrated EMR clusters in the future.


About the Authors

Anandkumar Kaliaperumal is a Senior Data Architect with the Professional Services SDT, where he focuses on helping customers with their Hadoop and data lake migrations. He lives with his growing family in Dallas.

Bharath Kumar Boggarapu is a Data Architect at AWS Professional Services with expertise in big data technologies. He is passionate about helping customers build performant and robust data-driven solutions and realize their data and analytics potential. His areas of interests are open-source frameworks, automation, and data architecting. In his free time, he loves to spend time with family, play tennis, and travel.

Oliver Meyn was a Senior Data Architect in the Canadian Professional Services Shared Delivery Team, where he helped customers with migrating their data and workflows to AWS. He lives in Toronto with his family and far too many bicycles.

How Paytm modernized their data pipeline using Amazon EMR

Post Syndicated from Rajat Bhardwaj original https://aws.amazon.com/blogs/big-data/how-paytm-modernized-their-data-pipeline-using-amazon-emr/

This post was co-written by Rajat Bhardwaj, Senior Technical Account Manager at AWS and Kunal Upadhyay, General Manager at Paytm.

Paytm is India’s leading payment platform, pioneering the digital payment era in India with 130 million active users. Paytm operates multiple lines of business, including banking, digital payments, bill recharges, e-wallet, stocks, insurance, lending and mobile gaming. At Paytm, the Central Data Platform team is responsible for turning disparate data from multiple business units into insights and actions for their executive management and merchants, who are small, medium or large business entities accepting payments from the Paytm platforms.

The Data Platform team modernized their legacy data pipeline with AWS services. The data pipeline collects data from different sources and runs analytical jobs, generating approximately 250K reports per day, which are consumed by Paytm executives and merchants. The legacy data pipeline was set up on premises using a proprietary solution and didn’t utilize the open-source Hadoop stack components such as Spark or Hive. This legacy setup was resource-intensive, having high CPU and I/O requirements. Analytical jobs took approximately 8–10 hours to complete, which often led to Service Level Agreements (SLA) breaches. The legacy solution was also prone to outages due to higher than expected hardware resource consumption. Its hardware and software limitations impacted the ability of the system to scale during peak load. Data models used in the legacy setup processed the entire data every time, which led to an increased processing time.

In this post, we demonstrate how the Paytm Central Data Platform team migrated their data pipeline to AWS and modernized it using Amazon EMR, Amazon Simple Storage Service (Amazon S3) and underlying AWS Cloud infrastructure along with Apache Spark. We optimized the hardware usage and reduced the data analytical processing, resulting in shorter turnaround time to generate insightful reports, all while maintaining operational stability and scale irrespective of the size of daily ingested data.

Overview of solution

The key to modernizing a data pipeline is to adopt an optimal incremental approach, which helps reduce the end-to-end cycle to analyze the data and get meaningful insights from it. To achieve this state, it’s vital to ingest incremental data in the pipeline, process delta records and reduce the analytical processing time. We configured the data sources to inherit the unchanged records and tuned the Spark jobs to only analyze the newly inserted or updated records. We used temporal data columns to store the incremental datasets until they’re processed. Data intensive Spark jobs are configured in incremental on-demand deduplicating mode to process the data. This helps to eliminate redundant data tuples from the data lake and reduces the total data volume, which saves compute and storage capacity. We also optimized the scanning of raw tables to restrict the scans to only the changed record set which reduced scanning time by approximately 90%. Incremental data processing also helps to reduce the total processing time.

At the time of this writing, the existing data pipeline has been operationally stable for 2 years. Although this modernization was vital, there is a risk of an operational outage while the changes are being implemented. Data skewing needs to be handled in the new system by an appropriate scaling strategy. Zero downtime is expected from the stakeholders because the reports generated from this system are vital for Paytm’s CXO, executive management and merchants on a daily basis.

The following diagram illustrates the data pipeline architecture.

Benefits of the solution

The Paytm Central Data Office team, comprised of 10 engineers, worked with the AWS team to modernize the data pipeline. The team worked for approximately 4 months to complete this modernization and migration project.

Modernizing the data pipeline with Amazon EMR 6.3 helped efficiently scale the system at a lower cost. Amazon EMR managed scaling helped reduce the scale-in and scale-out time and increase the usage of Amazon Elastic Compute Cloud (Amazon EC2) Spot Instances for running the Spark jobs. Paytm is now able to utilize a Spot to On-Demand ratio of 80:20, resulting in higher cost savings. Amazon EMR managed scaling also helped automatically scale the EMR cluster based on YARN memory usage with the desired type of EC2 instances. This approach eliminates the need to configure multiple Amazon EMR scaling policies tied to specific types of EC2 instances as per the compute requirements for running the Spark jobs.

In the following sections, we walk through the key tasks to modernize the data pipeline.

Migrate over 400 TB of data from the legacy storage to Amazon S3

Paytm team built a proprietary data migration application with the open-source AWS SDK for Java for Amazon S3 using the Scala programming language. This application can connect with multiple cloud providers , on-premises data centers and migrate the data to a central data lake built on Amazon S3.

Modernize the transformation jobs for over 40 data flows

Data flows are defined in the system for ingesting raw data, preprocessing the data and aggregating the data that is used by the analytical jobs for report generation. Data flows are developed using Scala programming language on Apache Spark. We use an Azkaban batch workflow job scheduler for ordering and tracking the Spark job runs. Workflows are created on Amazon EMR to schedule these Spark jobs multiple times during a day. We also implemented Spark optimizations to improve the operational efficiency for these jobs. We use Spark broadcast joins to handle the data skewness, which can otherwise lead to data spillage, resulting in extra storage needs. We also tuned the Spark jobs to avoid a  large number of small files, which is a known problem with Spark if not handled effectively. This is mainly because Spark is a parallel processing system and data loading is done through multiple tasks where each task can load into multiple partition. Data-intensive jobs are run using Spark stages.

The following is the code snippet for the Scala jobs:

nodes:
  - name: jobC
    type: noop
    # jobC depends on jobA and jobB
    dependsOn:
      - jobA
      - jobB

  - name: jobA
    type: command
    config:
      command: echo "This is an echoed text."

  - name: jobB
    type: command
    config:
      command: pwd

Validate the data

Accuracy of the data reports is vital for the modern data pipeline. The modernized pipeline has additional data reconciliation steps to improve the correctness of data across the platform. This is achieved by having greater programmatic control over the processed data. We could only reconcile data for the legacy pipeline after the entire data processing was complete. However, the modern data pipeline enables all the transactions to be reconciled at every step of the transaction, which gives granular control for data validation. It also helps isolate the cause of any data processing errors. Automated tests were done before go-live to compare the data records generated by the legacy vs. the modern system to ensure data sanity. These steps helped ensure the overall sanity of the processed data by the new system. Deduplication of data is done frequently via on-demand queries to eliminate redundant data, thereby reducing the processing time. As an example, if there are transactions which are already consumed by the end clients but still a part of the data-set, these can be eliminated by the deduplication, resulting in processing of only the newer transactions for the end client consumption.

The following sample query uses Spark SQL for on-demand deduplication of raw data at the reporting layer:

Insert over table  <<table>>
select col1,col2,col3 ---...coln 
from (select t.*
            ,row_number() over(order by col) as rn 
      from <<table>>
     ) t
where rn = 1

What we achieved as part of the modernization

With the new data pipeline, we reduced the compute infrastructure by 400% which helps to save  compute cost. The earlier legacy stack was running on over 6,000 virtual cores. Optimization techniques helped to run the same system at an improved scale, with approximately 1,500 virtual cores. We are able to reduce the compute and storage capacity for 400 TB of data and 40 data flows after migrating to Amazon EMR. We also achieved Spark optimizations, which helped to reduce the runtime of the jobs by 95% (from 8–10 hours to 20–30 minutes), CPU consumption by 95%, I/O by 98% and overall computation time by 80%. The incremental data processing approach helped to scale the system despite data skewness, which wasn’t the case with the legacy solution.

Conclusion

In this post, we showed how Paytm modernized their data lake and data pipeline using Amazon EMR, Amazon S3, underlying AWS Cloud infrastructure and Apache Spark. Choice of these cloud & big-data technologies helped to address the challenges for operating a big data pipeline because the type and volume of data from disparate sources adds complexity to the analytical processing.

By partnering with AWS, the Paytm Central Data Platform team created a modern data pipeline in a short amount of time. It provides reduced data analytical times with astute scaling capabilities, generating high-quality reports for the executive management and merchants on a daily basis.

As next steps, do a deep dive bifurcating the data collection and data processing stages for your data pipeline system. Each stage of the data pipeline should be appropriately designed and scaled to reduce the processing time while maintaining integrity of the reports generated as an output.

If you have feedback about this post, submit comments in the Comments section below.


About the Authors

Rajat Bhardwaj is a Senior Technical Manager with Amazon Web Services based in India, having 23 years of work experience with multiple roles in software development, telecom, and cloud technologies. He works along with AWS Enterprise customers, providing advocacy and strategic technical guidance to help plan and build solutions using AWS services and best practices. Rajat is an avid runner, having competed several half and full marathons in recent years.

Kunal Upadhyay is a General Manager with Paytm Central Data Platform team based out of Bengaluru, India. Kunal has 16 years of experience in big data, distributed computing, and data intelligence. When not building software, Kunal enjoys travel and exploring the world, spending time with friends and family.

Access Apache Livy using a Network Load Balancer on a Kerberos-enabled Amazon EMR cluster

Post Syndicated from Bharat Gamini original https://aws.amazon.com/blogs/big-data/access-apache-livy-using-a-network-load-balancer-on-a-kerberos-enabled-amazon-emr-cluster/

Amazon EMR is a cloud big data platform for running large-scale distributed data processing jobs, interactive SQL queries, and machine learning (ML) applications using open-source analytics frameworks such as Apache Spark, Apache Hive, and Presto. Amazon EMR supports Kerberos for authentication; you can enable Kerberos on Amazon EMR and put the cluster in a private subnet to maximize security.

To access the cluster, the best practice is to use a Network Load Balancer (NLB) to expose only specific ports, which are access-controlled via security groups. By default, the NLB prevents Kerberos ticket authentication to any Amazon EMR service.

Apache Livy is a service that enables easy interaction with a Spark cluster over a REST interface. It enables easy submission of Spark jobs or snippets of Spark code, synchronous or asynchronous result retrieval, as well as SparkContext management, all via a simple REST interface or an RPC client library.

In this post, we discuss how to provide Kerberos ticket access to Livy for external systems like Airflow and Notebooks using an NLB. You can apply this process to other Amazon EMR services beyond Livy, such as Trino and Hive.

Solution overview

The following are the high-level steps required:

  1. Create an EMR cluster with Kerberos security configuration.
  2. Create an NLB with required listeners and target groups.
  3. Update the Kerberos Key Distribution Center (KDC) to create a new service principal and keytab changes.
  4. Update the Livy configuration file.
  5. Verify Livy is accessible via the NLB.
  6. Run the Python Livy test case.

Prerequisites

The advanced configuration presented in this post assumes familiarity with Amazon EMR, Kerberos, Livy, Python and bash.

Create an EMR cluster

Create the Kerberos security configuration using the AWS Command Line Interface (AWS CLI) as follows (this creates the KDC on the EMR primary node):

aws emr create-security-configuration --name kdc-security-config --security-configuration '{
   "EncryptionConfiguration":{
      "InTransitEncryptionConfiguration":{
         "TLSCertificateConfiguration":{
            "CertificateProviderType":"PEM",
            "S3Object":"s3://${conf_bucket}/${certs.zip}"
         }
      },
      "AtRestEncryptionConfiguration":{
         "S3EncryptionConfiguration":{
            "EncryptionMode":"SSE-S3"
         }
      },
      "EnableInTransitEncryption":true,
      "EnableAtRestEncryption":true
   },
   "AuthenticationConfiguration":{
      "KerberosConfiguration":{
         "Provider":"ClusterDedicatedKdc",
         "ClusterDedicatedKdcConfiguration":{
            "TicketLifetimeInHours":24
         }
      }
   }
}'

It’s a security best practice to keep passwords in AWS Secrets Manager. You can use a bash function like the following as the argument to the --kerberos-attributes option so no passwords are stored in the launch script or command line. The function outputs the required JSON for the --kerberos-attributes option after retrieving the password from Secrets Manager.

krbattrs() { # Pull the KDC password from Secrets Manager without saving to disk or var
cat << EOF
  {
    "Realm": "EC2.INTERNAL",
    "KdcAdminPassword": "$(aws secretsmanager get-secret-value \
        --secret-id KDCpasswd  |jq -r .SecretString)"
  }
EOF
}

Create the cluster using the AWS CLI as follows:

aws emr create-cluster \
  --name "<your-cluster-name>" \
  --release-label emr-6.4.0 \
  --log-uri "s3://<your-log-bucket>" \
  --applications Name=Hive Name=Spark \
  --ec2-attributes "KeyName=<your-key-name>,SubnetId=<your-private-subnet>" \
  --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m3.xlarge InstanceGroupType=CORE,InstanceCount=1,InstanceType=m3.xlarge \
  --security-configuration <kdc-security-config> \
  --kerberos-attributes $(krbattrs) \
  --use-default-roles

Create an NLB

Create an internet-facing NLB with TCP listeners in your VPC and subnet. An Internet-facing load balancer routes requests from clients to targets over the internet.  Conversely, an Internal NLB routes requests to targets using private IP addresses. For instructions, refer to Create a Network Load Balancer.

The following screenshot shows the listener details.

Create target groups and register the EMR primary instance (Livy3) and KDC instance (KDC3). For this post, these instances are the same; use the respective instances if KDC is running on a different instance.

The KDC and EMR security groups must allow the NLB’s private IP address to access ports 88 and 8998, respectively. You can find the NLB’s private IP address by searching the elastic network interfaces for the NLB’s name. For access control instructions, refer to this article on the knowledge center. Now that the security groups allow access, the NLB health check should pass, but Livy isn’t usable via the NLB until you make further changes (detailed in the following sections). The NLB is actually being used as a proxy to access Livy rather than doing any load balancing.

Update the Kerberos KDC

The KDC used by the Livy service must contain a new HTTP Service Principal Name (SPN) using the public NLB host name.

  • You can create the new principle from the EMR primary host using the full NLB public host name:
sudo kadmin.local addprinc HTTP/[email protected]

Replace the fully qualified domain name (FQDN) and Kerberos realm as needed. Ensure the NLB hostname is all lowercase.

After the new SPN exists, you create two keytabs containing that SPN. The first keytab is for the Livy service. The second keytab, which must use the same KVNO number as the first keytab, is for the Livy client.

  • Create Livy service keytab as follows:
sudo kadmin.local ktadd -norandkey -k /etc/livy2.keytab livy/[email protected]
sudo kadmin.local ktadd -norandkey -k /etc/livy2.keytab HTTP/[email protected]
sudo chown livy:livy /etc/livy2.keytab
sudo chmod 600 /etc/livy2.keytab
sudo -u livy klist -e -kt /etc/livy2.keytab

Note the key version number (KVNO) for the HTTP principal in the output of the preceding klist command. The KVNO numbers for the HTTP principal must match the KVNO numbers in the user keytab. Copy the livy2.keytab file to the EMR cluster Livy host if it’s not already there.

  • Create a user or client keytab as follows:
sudo kadmin.local ktadd -norandkey -k /var/tmp/user1.keytab [email protected]
sudo kadmin.local ktadd -norandkey -k /var/tmp/user1.keytab HTTP/[email protected]

Note the -norandkey option used when adding the SPN. That preserves the KVNO created in the preceding livy2.keytab.

  • Copy the user1.keytab to the client machine running the Python test case as user1.

Replace the FQDN, realm, and keytab path as needed.

Update the Livy configuration file

The primary change on the EMR cluster primary node running the Livy service is to the /etc/livy/conf/livy.conf file. You change the authentication principal that Livy uses, as well as the associated Kerberos keytab created earlier.

  • Make the following changes to the livy.conf file with sudo:
livy.server.auth.kerberos.principal = HTTP/[email protected]
livy.server.auth.kerberos.keytab = /etc/livy2.keytab

Don’t change the livy.server.launch.kerberos.* values.

  • Restart and verify the Livy service:
sudo systemctl restart livy-server
sudo systemctl status livy-server
  • Verify the Livy port is listening:
sudo lsof -Pi :8998

COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 30106 livy 196u IPv6 224853844 0t0 TCP *:8998 (LISTEN)

You can automate these steps (modifying the KDC and Livy config file) by adding a step to the EMR cluster. For more information, refer to Tutorial: Configure a cluster-dedicated KDC.

Verify Livy is accessible via the NLB

You can now use user1.keytab to authenticate against the Livy REST endpoint. Copy the user1.keytab you created earlier to the host and user login, which run the Livy test case. The host running the test case must be configured to connect to the modified KDC.

  • Create a Linux user (user1) on client host and EMR cluster.

If the client host has a terminal available that the user can run commands in, you can use the following commands to verify network connectivity to Livy before running the actual Livy Python test case.

  • Verify the NLB host and port are reachable (no data will be returned by the nc command):
$ nc -vz mynlb-a0ac4b07f9f7f0a1.elb.us-west-2.amazonaws.com 8998
Ncat: Version 7.50 ( https://nmap.org/ncat )
Ncat: Connected to 44.242.1.1:8998.
Ncat: 0 bytes sent, 0 bytes received in 0.02 seconds.
  • Create a TLS connection, which returns the server’s TLS certificate and TCP packets:
openssl s_client -connect mynlb-a0ac4b07f9f7f0a1.elb.us-west-2.amazonaws.com:8998

If the openssl command doesn’t return a TLS server certificate, the rest of the verification doesn’t succeed. You may have a proxy or firewall interfering with the connection. Investigate your network environment, resolve the issue, and repeat the openssl command to ensure connectivity.

  • Verify the Livy REST endpoint using curl. This verifies Livy REST but not Spark.
kinit -kt user1.keytab [email protected]
curl -k -u : --negotiate https://mynlb-a0ac4b07f9f7f0a1.elb.us-west-2.amazonaws.com:8998/sessions
{"from":0,"total":0,"sessions":[]}

curl arguments:
  "-k"   - Ignore secure connection check
  "-u :" - Use user name and passwords from environment
  "--negotiate" – Enables negotiate(SPNEGO) authentication

Run the Python Livy test case

The Livy test case is a simple Python3 script named livy-verify.py. You can run this script from a client machine to run Spark commands via Livy using the NLB. The script is as follows:

#!/usr/bin/env python3
# pylint: disable=invalid-name,no-member

"""
Verify Livy (REST) service using pylivy module, install req modules or use PEX:
  sudo yum install python3-devel
  sudo python3 -m pip install livy==0.8.0 requests==2.27.1 requests_gssapi==1.2.3
  https://pypi.org/project/requests-gssapi/

Kerberos authN implicitly uses TGT from kinit in users login env
"""

import shutil
import requests
from requests_gssapi import HTTPSPNEGOAuth
from livy import LivySession

# Disable ssl-warnings for self-signed certs when testing
requests.packages.urllib3.disable_warnings()

# Set the base URI(use FQDN and TLS) to target a Livy service
# Redefine remote_host to specify the remote Livy hostname to connect to
remote_host="mynlb-a0ac4b07f9f7f0a1.elb.us-west-2.amazonaws.com"
livy_uri = "https://" + remote_host + ":8998"

def livy_session():
    ''' Connect to Livy (REST) and run trivial pyspark command '''
    sconf = {"spark.master": "yarn", "spark.submit.deployMode": "cluster"}

    if shutil.which('kinit'):
        kauth = HTTPSPNEGOAuth()
        # Over-ride with an explicit user principal
        #kauth = HTTPSPNEGOAuth(principal="[email protected]")
        print("kinit found, kauth using Krb cache")
    else:
        kauth = None
        print("kinit NOT found, kauth set to None")

    with LivySession.create(livy_uri, verify=False, auth=kauth, spark_conf=sconf) as ls:
        ls.run("rdd = sc.parallelize(range(100), 2)")
        ls.run("rdd.take(3)")

    return 'LivySession complete'

def main():
    """ Starting point """
    livy_session()

if __name__ == '__main__':
    main()

The test case requires the new SPN to be in the user’s Kerberos ticket cache. To get the service principal into the Kerberos cache, use the kinit command with the -S option:

kinit -kt user1.keytab -S HTTP/[email protected] [email protected]

Note the SPN and the User Principal Name (UPN) are both used in the kinit command.

The Kerberos cache should look like the following code, as revealed by the klist command:

klist
Ticket cache: FILE:/tmp/krb5cc_1001
Default principal: [email protected]

Valid starting Expires Service principal
01/20/2022 01:38:06 01/20/2022 11:38:06 HTTP/[email protected]
renew until 01/21/2022 01:38:06

Note the HTTP service principal in the klist ticket cache output.

After the SPN is in the cache as verified by klist, you can run the following command to verify that Livy accepts the Kerberos ticket and runs the simple PySpark script. It generates a simple array, [0,1,2], as the output. The preceding Python script has been copied to the /var/tmp/user1/ folder in this example.

/var/tmp/user1/livy-verify.py
kinit found, kauth using TGT
[0, 1, 2]

It can take a minute or so to generate the result. Any authentication errors will happen in seconds. If the test in the new environment generates the preceding array, the Livy Kerberos configuration has been verified.

Any other client program that needs to have Livy access must be a Kerberos client of the KDC that generated the keytabs. It must also have a client keytab (such as user1.keytab or equivalent) and the service principal key in its Kerberos ticket cache.

In some environments, a simple kinit as follows may be sufficient:

kdestroy
kinit -kt user1.keytab [email protected]

Summary

If you have an existing EMR cluster running Livy and using Kerberos (even in a private subnet), you can add an NLB to connect to the Livy service and still authenticate with Kerberos. For simplicity, we used a cluster-dedicated KDC in this post, but you can use any KDC architecture option supported by Amazon EMR. This post documented all the KDC and Livy changes to make it work; the script and procedure have been run successfully in multiple environments. You can modify the Python script as needed and try running the verification script in your environment.

For more details about the systems and processes described in this post, refer to the following:


About the Authors

John Benninghoff is a AWS Professional Services Sr. Data Architect, focused on Data Lake architecture and implementation.

Bharat Gamini is a Data Architect focused on Big Data & Analytics at Amazon Web Services. He helps customers architect and build highly scalable, robust and secure cloud-based analytical solutions on AWS. Besides family time, he likes watching movies and sports.

Amazon EMR on Amazon EKS provides up to 61% lower costs and up to 68% performance improvement for Spark workloads

Post Syndicated from Melody Yang original https://aws.amazon.com/blogs/big-data/amazon-emr-on-amazon-eks-provides-up-to-61-lower-costs-and-up-to-68-performance-improvement-for-spark-workloads/

Amazon EMR on Amazon EKS is a deployment option offered by Amazon EMR that enables you to run Apache Spark applications on Amazon Elastic Kubernetes Service (Amazon EKS) in a cost-effective manner. It uses the EMR runtime for Apache Spark to increase performance so that your jobs run faster and cost less.

Amazon EMR on Amazon EKS is a deployment option offered by Amazon EMR that enables you to run Apache Spark applications on Amazon Elastic Kubernetes Service (Amazon EKS) in a cost-effective manner. It uses the EMR runtime for Apache Spark to increase performance so that your jobs run faster and cost less.

In our benchmark tests using TPC-DS datasets at 3 TB scale, we observed that Amazon EMR on EKS provides up to 61% lower costs and up to 68% improved performance compared to running open-source Apache Spark on Amazon EKS via equivalent configurations. In this post, we walk through the performance test process, share the results, and discuss how to reproduce the benchmark. We also share a few techniques to optimize job performance that could lead to further cost-optimization for your Spark workloads.

How does Amazon EMR on EKS reduce cost and improve performance?

The EMR runtime for Spark is a performance-optimized runtime for Apache Spark that is 100% API compatible with open-source Apache Spark. It’s enabled by default with Amazon EMR on EKS. It helps run Spark workloads faster, leading to lower running costs. It includes multiple performance optimization features, such as Adaptive Query Execution (AQE), dynamic partition pruning, flattening scalar subqueries, bloom filter join, and more.

In addition to the cost benefit brought by the EMR runtime for Spark, Amazon EMR on EKS can take advantage of other AWS features to further optimize cost. For example, you can run Amazon EMR on EKS jobs on Amazon Elastic Compute Cloud (Amazon EC2) Spot Instances, providing up to 90% cost savings when compared to On-Demand Instances. Also, Amazon EMR on EKS supports Arm-based Graviton EC2 instances, which creates a 15% performance improvement and up to 30% cost savings when compared a Graviton2-based M6g to M5 instance type.

The recent graceful executor decommissioning feature makes Amazon EMR on EKS workloads more robust by enabling Spark to anticipate Spot Instance interruptions. Without the need to recompute or rerun impacted Spark jobs, Amazon EMR on EKS can further reduce job costs via critical stability and performance improvements.

Additionally, through container technology, Amazon EMR on EKS offers more options to debug and monitor Spark jobs. For example, you can choose Spark History Server, Amazon CloudWatch, or Amazon Managed Prometheus and Amazon Managed Grafana (for more details, refer to the Monitoring and Logging workshop). Optionally, you can use familiar command line tools such as kubectl to interact with a job processing environment and observe Spark jobs in real time, which provides a fail-fast and productive development experience.

Amazon EMR on EKS supports multi-tenant needs and offers application-level security control via a job execution role. It enables seamless integrations to other AWS native services without a key-pair set up in Amazon EKS. The simplified security design can reduce your engineering overhead and lower the risk of data breach. Furthermore, Amazon EMR on EKS handles security and performance patches so you can focus on building your applications.

Benchmarking

This post provides an end-to-end Spark benchmark solution so you can get hands-on with the performance test process. The solution uses unmodified TPC-DS data schema and table relationships, but derives queries from TPC-DS to support the Spark SQL test case. It is not comparable to other published TPC-DS benchmark results.

Key concepts

Transaction Processing Performance Council-Decision Support (TPC-DS) is a decision support benchmark that is used to evaluate the analytical performance of big data technologies. Our test data is a TPC-DS compliant dataset based on the TPC-DS Standard Specification, Revision 2.4 document, which outlines the business model and data schema, relationship, and more. As the whitepaper illustrates, the test data contains 7 fact tables and 17 dimension tables, with an average of 18 columns. The schema consists of essential retailer business information, such as customer, order, and item data for the classic sales channels: store, catalog, and internet. This source data is designed to represent real-world business scenarios with common data skews, such as seasonal sales and frequent names. Additionally, the TPC-DS benchmark offers a set of discrete scaling points (scale factors) based on the approximate size of the raw data. In our test, we chose the 3 TB scale factor, which produces 17.7 billion records, approximately 924 GB compressed data in Parquet file format.

Test approach

A single test session consists of 104 Spark SQL queries that were run sequentially. To get a fair comparison, each session of different deployment types, such as Amazon EMR on EKS, was run three times. The average runtime per query from these three iterations is what we analyze and discuss in this post. Most importantly, it derives two summarized metrics to represent our Spark performance:

  • Total execution time – The sum of the average runtime from three iterations
  • Geomean – The geometric mean of the average runtime

 Test results

In the test result summary (see the following figure), we discovered that the Amazon EMR-optimized Spark runtime used by Amazon EMR on EKS is approximately 2.1 times better than the open-source Spark on Amazon EKS in geometric mean and 3.5 times faster by the total runtime.

The following figure breaks down the performance summary by queries. We observed that EMR runtime for Spark was faster in every query compared to open-source Spark. Query q67 was the longest query in the performance test. The average runtime with open-source Spark was 1019.09 seconds. However, it took 150.02 seconds with Amazon EMR on EKS, which is 6.8 times faster. The highest performance gain in these long-running queries was q72—319.70 seconds (open-source Spark) vs. 26.86 seconds (Amazon EMR on EKS), a 11.9 times improvement.

Test cost

Amazon EMR pricing on Amazon EKS is calculated based on the vCPU and memory resources used from the time you start to download your EMR application Docker image until the Amazon EKS pod terminates. As a result, you don’t pay any Amazon EMR charges until your application starts to run, and you only pay for the vCPU and memory used during a job—you don’t pay for the full amount of compute resources in an EC2 instance.

Overall, the estimated benchmark cost in the US East (N. Virginia) Region is $22.37 per run for open-source Spark on Amazon EKS and $8.70 per run for Amazon EMR on EKS – that’s 61% cheaper due to the 68% quicker job runtime. The following table provides more details.

Benchmark Job Runtime (Hour) Estimated Cost Total EC2 Instance Total vCPU Total Memory (GiB) Root Device (EBS)
Amazon EMR on EKS 0.68 $8.70 6 216 432 20 GiB gp2
Open-Source Spark on Amazon EKS 2.13 $22.37 6 216 432 20 GiB gp2

Amazon EMR on Amazon EC2

(1 primary and 5 core nodes)

0.80 $8.80 6 196 424 20 GiB gp2

The cost estimate doesn’t account for Amazon Simple Storage Service (Amazon S3) storage, or PUT and GET requests. The Amazon EMR on EKS uplift calculation is based on the hourly billing information provided by AWS Cost Explorer.

Cost breakdown

The following is the cost breakdown for the Amazon EMR on EKS job ($8.70): 

  • Total uplift on vCPU – (126.93 * $0.01012) = (total number of vCPU used * per vCPU-hours rate) = $1.28
  • Total uplift on memory – (258.7 * $0.00111125) = (total amount of memory used * per GB-hours rate) = $0.29
  • Total Amazon EMR uplift cost – $1.57
  • Total Amazon EC2 cost – (6 * $1.728 * 0.68) = (number of instances * c5d.9xlarge hourly rate * job runtime in hour) = $7.05
  • Other costs – ($0.1 * 0.68) + ($0.1/730 * 20 * 6 * 0.68) = (shared Amazon EKS cluster charge per hour * job runtime in hour) + (EBS per GB-hourly rate * root EBS size * number of instances * job runtime in hour) = $0.08 

The following is the cost breakdown for the open-source Spark on Amazon EKS job ($22.37): 

  • Total Amazon EC2 cost – (6 * $1.728 * 2.13) = (number of instances * c5d.9xlarge hourly rate * job runtime in hour) = $22.12
  • Other costs – ($0.1 * 2.13) + ($0.1/730 * 20 * 6 * 2.13) = (shared EKS cluster charge per hour * job runtime in hour) + (EBS per GB-hourly rate * root EBS size * number of instances * job runtime in hour) = $0.25

The following is the cost breakdown for the Amazon EMR on Amazon EC2 ($8.80):

  • Total Amazon EMR cost – (5 * $0.27 * 0.80) + (1 * $0.192 * 0.80) = (number of core nodes * c5d.9xlarge Amazon EMR price * job runtime in hour) + (number of primary nodes * m5.4xlarge Amazon EMR price * job runtime in hour) = $1.23
  • Total Amazon EC2 cost – (5 * $1.728 * 0.80) + (1 * $0.768 * 0.80) = (number of core nodes * c5d.9xlarge instance price * job runtime in hour) + (number of primary nodes * m5.4xlarge instance price * job runtime in hour) = $7.53
  • Other Cost – ($0.1/730 * 20 GiB * 6 * 0.80) + ($0.1/730 * 256 GiB * 1 * 0.80) = (EBS per GB-hourly rate * root EBS size * number of instances * job runtime in hour) + (EBS per GB-hourly rate * default EBS size for m5.4xlarge * number of instances * job runtime in hour) = $0.041

Benchmarking considerations

In this section, we share some techniques and considerations for the benchmarking.

Set up an Amazon EKS cluster with Availability Zone awareness

Our Amazon EKS cluster configuration looks as follows:

apiVersion: eksctl.io/v1alpha5
kind: ClusterConfig
metadata:
  name: $EKSCLUSTER_NAME
  region: us-east-1
availabilityZones:["us-east-1a"," us-east-1b"]  
managedNodeGroups: 
  - name: mn-od
    availabilityZones: ["us-east-1b"] 

In the cluster configuration, the mn-od managed node group is assigned to the single Availability Zone b, where we run the test against.

Availability Zones are physically separated by a meaningful distance from other Availability Zones in the same AWS Region. This produces round trip latency between two compute instances located in different Availability Zones. Spark implements distributed computing, so exchanging data between compute nodes is inevitable when performing data joins, windowing, and aggregations across multiple executors. Shuffling data between multiple Availability Zones adds extra latency to the network I/O, which therefore directly impacts Spark performance. Additionally, when data is transferred between two Availability Zones, data transfer charges apply in both directions.

For this benchmark, which is a time-sensitive workload, we recommend running in a single Availability Zone and using On-Demand instances (not Spot) to have a dedicated compute resource. In an existing Amazon EKS cluster, you may have multiple instance types and a Multi-AZ setup. You can use the following Spark configuration to achieve the same goal:

--conf spark.kubernetes.node.selector.eks.amazonaws.com/capacityType=ON_DEMAND
--conf spark.kubernetes.node.selector.topology.kubernetes.io/zone=us-east-1b

Use instance store volume to increase disk I/O

Spark data shuffle, the process of reading and writing intermediate data to disk, is a costly operation. Besides the network I/O speed, Spark demands high performant disk to support a large amount of data redistribution activities. I/O operations per second (IOPS) is an equally important measure to baseline Spark performance. For instance, the SQL queries 23a, 23b, 50, and 93 are shuffle-intensive Spark workloads in TPC-DS, so choosing an optimal storage strategy can significantly shorten their runtime. General speaking, the recommended options are either attaching multiple EBS disk volumes per node in Amazon EKS or using the d series EC2 instance type, which offers high disk I/O performance within a compute family (for example, c5d.9xlarge is the d series in the c5 compute optimized family).

The following table summarizes the hardware specification we used:

Instance On-Demand Hourly Price vCPU Memory (GiB) Instance Store Networking Performance (Gbps) 100% Random Read IOPS Write IOPS
c5d.9xlarge $1.73 36 72 1 x 900GB NVMe SSD 10 350,000 170,000

To simplify our hardware configuration, we chose the AWS Nitro System EC2 instance type c5d.9xlarge, which comes with a NVMe-based SSD instance store volume. As of this writing, the built-in NVMe SSD disk requires less effort to set up and provides optimal disk performance we need. In the following code, the one-off preBoostrapCommand is triggered to mount an instance store to a node in Amazon EKS:

managedNodeGroups: 
  - name: mn-od
    preBootstrapCommands:
      - "sleep 5; sudo mkfs.xfs /dev/nvme1n1;sudo mkdir -p /local1;sudo echo /dev/nvme1n1 /local1 xfs defaults,noatime 1 2 >> /etc/fstab"
      - "sudo mount -a"
      - "sudo chown ec2-user:ec2-user /local1"

Run as a predefined job user, not a root user

For security, it’s not recommended to run Spark jobs as a root user. But how can you access the NVMe SSD volume mounted to the Amazon EKS cluster as a non-root Spark user?

An init container is created for each Spark driver and executor pods in order to set the volume permission and control the data access. Let’s check out the Spark driver pod via the kubectl exec command, which allows us execute into the running container and have an interactive session. We can observe the following:

  • The init container is called volume-permission.
  • The SSD disk is called /ossdata1. The Spark driver has stored some data to the disk.
  • The non-root Spark job user is called hadoop.

This configuration is provided in a format of a pod template file for Amazon EMR on EKS, so you can dynamically tailor job pods when Spark configuration doesn’t support your needs. Be aware that the predefined user’s UID in the EMR runtime for Spark is 999, but it’s set to 1000 in open-source Spark. The following is a sample Amazon EMR on EKS driver pod template:

apiVersion: v1
kind: Pod
spec:
  nodeSelector:
    app: sparktest
  volumes:
    - name: spark-local-dir-1
      hostPath:
        path: /local1  
  initContainers:  
  - name: volume-permission
    image: public.ecr.aws/y4g4v0z7/busybox
    # grant volume access to "hadoop" user with uid 999
    command: ['sh', '-c', 'mkdir /data1; chown -R 999:1000 /data1'] 
    volumeMounts:
      - name: spark-local-dir-1
        mountPath: /data1
  containers:
  - name: spark-kubernetes-driver
    volumeMounts:
      - name: spark-local-dir-1
        mountPath: /data1

In the job submission, we map the pod templates via the Spark configuration:

"spark.kubernetes.driver.podTemplateFile": "s3://'$S3BUCKET'/pod-template/driver-pod-template.yaml",
"spark.kubernetes.executor.podTemplateFile": "s3://'$S3BUCKET'/pod-template/executor-pod-template.yaml",

Spark on k8s operator is a popular tool to deploy Spark on Kubernetes. Our open-source Spark benchmark uses the tool to submit the job to Amazon EKS. However, the Spark operator currently doesn’t support file-based pod template customization, due to the way it operates. So we embed the disk permission setup into the job definition, as in the example on GitHub.

Disable dynamic resource allocation and enable Adaptive Query Execution in your application

Spark provides a mechanism to dynamically adjust compute resources based on workload. This feature is called dynamic resource allocation. It provides flexibility and efficiency to manage compute resources. For example, your application may give resources back to the cluster if they’re no longer used, and may request them again later when there is demand. It’s quite useful when your data traffic is unpredictable and an elastic compute strategy is needed at your application level. While running the benchmarking, our source data volume (3 TB) is certain and the jobs were run on a fixed-size Spark cluster that consists of six EC2 instances. You can turn off the dynamic allocation in EMR on EC2 as shown in the following code, because it doesn’t suit our purpose and might add latency to the test result. The rest of Spark deployment options, such as Amazon EMR on EKS, has the dynamic allocation off by default, so we can ignore these settings.

--conf spark.dynamicAllocation.enabled=false 
--conf spark.shuffle.service.enabled=false

Dynamic resource allocation is a different concept from automatic scaling in Amazon EKS, such as the Cluster Autoscaler. Disabling the dynamic allocation feature only fixes our 6-node Spark cluster size per job, but doesn’t stop the Amazon EKS cluster from expanding or shrinking automatically. It means our Amazon EKS cluster is still able to scale between 1 and 30 EC2 instances, as configured in the following code:

managedNodeGroups: 
  - name: mn-od
    availabilityZones: ["us-east-1b"] 
    instanceType: c5d.9xlarge
    minSize: 1
    desiredCapacity: 1
    maxSize: 30

Spark Adaptive Query Execution (AQE) is an optimization technique in Spark SQL since Spark 3.0. It dynamically re-optimizes the query execution plan at runtime, which supports a variety of optimizations, such as the following:

  • Dynamically switch join strategies
  • Dynamically coalesce shuffle partitions
  • Dynamically handle skew joins

The feature is enabled by default in EMR runtime for Spark, but disabled by default in open-source Apache Spark 3.1.2. To provide the fair comparison, make sure it’s set in the open-source Spark benchmark job declaration:

  sparkConf:
    # Enable AQE
    "spark.sql.adaptive.enabled": "true"
    "spark.sql.adaptive.localShuffleReader.enabled": "true"
    "spark.sql.adaptive.coalescePartitions.enabled": "true"
    "spark.sql.adaptive.skewJoin.enabled": "true"

Walkthrough overview

With these considerations in mind, we run three Spark jobs in Amazon EKS. This helps us compare Spark 3.1.2 performance in various deployment scenarios. For more details, check out the GitHub repository.

In this walkthrough, we show you how to do the following:

  • Produce a 3 TB TPC-DS complaint dataset
  • Run a benchmark job with the open-source Spark operator on Amazon EKS
  • Run the same benchmark application with Amazon EMR on EKS

We also provide information on how to benchmark with Amazon EMR on Amazon EC2.

Prerequisites

Install the following tools for the benchmark test:

Provision resources

The provision script creates the following resources:

  • A new Amazon EKS cluster
  • Amazon EMR on EKS enabler
  • The required AWS Identity and Access Management (IAM) roles
  • The S3 bucket emr-on-eks-nvme-${ACCOUNTID}-${AWS_REGION}, referred to as <S3BUCKET> in the following steps

The provisioning process takes approximately 30 minutes.

  1. Download the project with the following command:
    git clone https://github.com/aws-samples/emr-on-eks-bencharmk.git
    cd emr-on-eks-bencharmk

  2. Create a test environment (change the Region if necessary):
    export EKSCLUSTER_NAME=eks-nvme
    export AWS_REGION=us-east-1
    
    ./provision.sh

Modify the script if needed for testing against an existing Amazon EKS cluster. Make sure the existing cluster has the Cluster Autoscaler and Spark Operator installed. Examples are provided by the script.

  1. Validate the setup:
    # should return results
    kubectl get pod -n oss | grep spark-operator
    kubectl get pod -n kube-system | grep nodescaler

Generate TPC-DS test data (optional)

In this optional task, you generate TPC-DS test data in s3://<S3BUCKET>/BLOG_TPCDS-TEST-3T-partitioned. The process takes approximately 80 minutes.

The job generates TPC-DS compliant datasets with your preferred scale. In this case, it creates 3 TB of source data (approximately 924 GB compressed) in Parquet format. We have pre-populated the source dataset in the S3 bucket blogpost-sparkoneks-us-east-1 in Region us-east-1. You can skip the data generation job if you want to have a quick start.

Be aware of that cross-Region data transfer latency will impact your benchmark result. It’s recommended to generate the source data to your S3 bucket if your test Region is different from us-east-1.

  1. Start the job:
    kubectl apply -f examples/tpcds-data-generation.yaml

  2. Monitor the job progress:
    kubectl get pod -n oss
    kubectl logs tpcds-data-generation-3t-driver -n oss

  3. Cancel the job if needed:
    kubectl delete -f examples/tpcds-data-generation.yaml

The job runs in the namespace oss with a service account called oss in Amazon EKS, which grants a minimum permission to access the S3 bucket via an IAM role. Update the example .yaml file if you have a different setup in Amazon EKS.

Benchmark for open-source Spark on Amazon EKS

Wait until the data generation job is complete, then update the default input location parameter (s3://blogpost-sparkoneks-us-east-1/blog/BLOG_TPCDS-TEST-3T-partitioned) to your S3 bucket in the tpcds-benchmark.yaml file.. Other parameters in the application can also be adjusted. Check out the comments in the yaml file for details. This process takes approximately 130 minutes.

If the data generation job is skipped, run the following steps without waiting.

  1. Start the job:
    kubectl apply -f examples/tpcds-benchmark.yaml

  2. Monitor the job progress:
    kubectl get pod -n oss
    kubectl logs tpcds-benchmark-oss-driver -n oss

  3. Cancel the job if needed:
    kubectl delete -f examples/tpcds-benchmark.yaml

The benchmark application outputs a CSV file capturing runtime per Spark SQL query and a JSON file with query execution plan details. You can use the collected metrics and execution plans to compare and analyze performance between different Spark runtimes (open-source Apache Spark vs. EMR runtime for Spark).

Benchmark with Amazon EMR on EKS

Wait for the data generation job finish before starting the benchmark for Amazon EMR on EKS. Don’t forget to change the input location (s3://blogpost-sparkoneks-us-east-1/blog/BLOG_TPCDS-TEST-3T-partitioned) to your S3 bucket. The output location is s3://<S3BUCKET>/EMRONEKS_TPCDS-TEST-3T-RESULT. If you use the pre-populated TPC-DS dataset, start the Amazon EMR on EKS benchmark without waiting. This process takes approximately 40 minutes.

  1. Start the job (change the Region if necessary):
    export EMRCLUSTER_NAME=emr-on-eks-nvme
    export AWS_REGION=us-east-1
    
    ./examples/emr6.5-benchmark.sh

Amazon EKS offers multi-tenant isolation and optimized resource allocation features, so it’s safe to run two benchmark tests in parallel on a single Amazon EKS cluster.

  1. Monitor the job progress in real time:
    kubectl get pod -n emr
    #run the command then search "execution time" in the log to analyze individual query's performance
    kubectl logs YOUR_DRIVER_POD_NAME -n emr spark-kubernetes-driver

  2. Cancel the job (get the IDs from the cluster list on the Amazon EMR console):
    aws emr-containers cancel-job-run --virtual-cluster-id <YOUR_VIRTUAL_CLUSTER_ID> --id <YOUR_JOB_ID>

The following are additional useful commands:

#Check volume status
kubectl exec -it YOUR_DRIVER_POD_NAME -c spark-kubernetes-driver -n emr -- df -h

#Login to a running driver pod
kubectl exec -it YOUR_DRIVER_POD_NAME -c spark-kubernetes-driver -n emr – bash

#Monitor compute resource usage
watch "kubectl top node"

Benchmark for Amazon EMR on Amazon EC2

Optionally, you can use the same benchmark solution to test Amazon EMR on Amazon EC2. Download the benchmark utility application JAR file from a running Kubernetes container, then submit a job via the Amazon EMR console. More details are available in the GitHub repository.

Clean up

To avoid incurring future charges, delete the resources generated if you don’t need the solution anymore. Run the following cleanup script (change the Region if necessary):

cd emr-on-eks-bencharmk

export EKSCLUSTER_NAME=eks-nvme
export AWS_REGION=us-east-1

./deprovision.sh

Conclusion

Without making any application changes, we can run Apache Spark workloads faster and cheaper with Amazon EMR on EKS when compared to Apache Spark on Amazon EKS. We used a benchmark solution running on a 6-node c5d.9xlarge Amazon EKS cluster and queried a TPC-DS dataset at 3 TB scale. The performance test result shows that Amazon EMR on EKS provides up to 61% lower costs and up to 68% performance improvement over open-source Spark 3.1.2 on Amazon EKS.

If you’re wondering how much performance gain you can achieve with your use case, try out the benchmark solution or the EMR on EKS Workshop. You can also contact your AWS Solution Architects, who can be of assistance alongside your innovation journey.


About the Authors

Melody Yang is a Senior Big Data Solution Architect for Amazon EMR at AWS. She is an experienced analytics leader working with AWS customers to provide best practice guidance and technical advice in order to assist their success in data transformation. Her areas of interests are open-source frameworks and automation, data engineering and DataOps.

Kinnar Kumar Sen is a Sr. Solutions Architect at Amazon Web Services (AWS) focusing on Flexible Compute. As a part of the EC2 Flexible Compute team, he works with customers to guide them to the most elastic and efficient compute options that are suitable for their workload running on AWS. Kinnar has more than 15 years of industry experience working in research, consultancy, engineering, and architecture.

How SailPoint solved scaling issues by migrating legacy big data applications to Amazon EMR on Amazon EKS

Post Syndicated from Richard Li original https://aws.amazon.com/blogs/big-data/how-sailpoint-solved-scaling-issues-by-migrating-legacy-big-data-applications-to-amazon-emr-on-amazon-eks/

This post is co-written with Richard Li from SailPoint.

SailPoint Technologies is an identity security company based in Austin, TX. Its software as a service (SaaS) solutions support identity governance operations in regulated industries such as healthcare, government, and higher education. SailPoint distinguishes multiple aspects of identity as individual identity security services, including cloud governance, SaaS management, access risk governance, file access management, password management, provisioning, recommendations, and separation of duties, as well as access certification, access insights, access modeling, and access requests.

In this post, we share how SailPoint updated its platform for big data operations, and solved scaling issues by migrating legacy big data applications to Amazon EMR on Amazon EKS.

The challenge with the legacy data environment

SailPoint acquired a SaaS software platform that processes and analyzes identity, resource, and usage data from multiple cloud providers, and provides access insights, usage analysis, and access risk analysis. The original design criteria of the platform was focused on serving small to medium-sized companies. To quickly process these analytics insights, many of these processing workloads were done inside many microservices through streaming connections.

After acquisition, we set a goal to expand the platform’s capability to handle customers with large cloud footprints over multiple cloud providers, sometime over hundreds or even thousands of accounts producing large amount of cloud event data.

The legacy architecture has a simplistic approach for data processing, as shown in the following diagram. We were processing the vast majority of event data in-service and directly ingested into Amazon Relational Database Service (Amazon RDS), which we then merged with a graph database to form the final view..

We needed to convert this into a scalable process that could handle customers of any size. To address this challenge, we had to quickly introduce a big data processing engine in the platform.

How migrating to Amazon EMR on EKS helped solve this challenge

When evaluating the platform for our big data operations, several factors made Amazon EMR on EKS a top choice.

The amount of event data we receive at any given time is generally unpredictable. To stay cost-effective and efficient, we need a platform that is capable of scaling up automatically when the workload increases to reduce wait time, and can scale down when the capacity is no longer needed to save cost. Because our existing application workloads are already running on an Amazon Elastic Kubernetes Service (Amazon EKS) cluster with the cluster autoscaler enabled, running Amazon EMR on EKS on top of our existing EKS cluster fits this need.

Amazon EMR on EKS can safely coexist on an EKS cluster that is already hosting other workloads, be contained within a specified namespace, and have controlled access through use of Kubernetes role-based access control and AWS Identity and Access Management (IAM) roles for service accounts. Therefore, we didn’t have to build new infrastructures just for Amazon EMR. We simply linked up Amazon EMR on EKS with our existing EKS cluster running our application workloads. This reduced the amount of DevOps support needed, and significantly sped up our implementation and deployment timeline.

Unlike Amazon EMR on Amazon Elastic Compute Cloud (Amazon EC2), because our EKS cluster spans over multiple Availability Zones, we can control Spark pods placements using Kubernetes’s pod scheduling and placement strategy to achieve higher fault tolerance.

With the ability to create and use custom images in Amazon EMR on EKS, we could also utilize our existing container-based application build and deployment pipeline for our Amazon EMR on EKS workload without any modifications. This also gave us additional benefit in reducing job startup time because we package all job scripts as well as all dependencies with the image, without having to fetch them at runtime.

We also utilize AWS Step Functions as our core workflow engine. The native integration of Amazon EMR on EKS with Step Functions is another bonus where we didn’t have to build custom code for job dispatch. Instead, we could utilize the Step Functions native integration to seamlessly integrate Amazon EMR jobs with our existing workflow, with very little effort.

In merely 5 months, we were able to go from design, to proof of concept, to rolling out phase 1 of the event analytics processing. This vastly improved our event analytics processing capability by extending horizontal scalability, which gave us the ability to take customers with significantly larger cloud footprints than the legacy platform was designed for.

During the development and rollout of the platform, we also found that the Spark History Server provided by Amazon EMR on EKS was very useful in terms of helping us identify performance issues and tune the performance of our jobs.

As of this writing, the phase 1 rollout, which includes the event processing component of the core analytics processing, is complete. We’re now expanding the platform to migrate additional components onto Amazon EMR on EKS. The following diagram depicts our future architecture with Amazon EMR on EKS when all phases are complete.

In addition, to improve performances and reduce costs, we’re currently testing the Spark dynamic resource allocation support of Amazon EMR on EKS. This would automatically scale up and down the job executors based on load, and therefore boost performance when needed and reduce cost when the workload is low. Furthermore, we’re investigating the possibility to reduce the overall cost and increase performance by utilizing the pod template feature that would allow us to seamlessly transition our Amazon EMR job workload to AWS Graviton based instances.

Conclusion

With Amazon EMR on EKS, we can now onboard new customers and process vast amounts of data in a cost-effective manner, which we couldn’t do with our legacy environment. We plan to expand our Amazon EMR on EKS footprint to handle all our transform and load data analytics processes.


About the Authors

Richard Li is a senior staff software engineer on the SailPoint Technologies Cloud Access Management team.

Janak Agarwal is a product manager for Amazon EMR on Amazon EKS at AWS.

Kiran Guduguntla is a WW Go-to-Market Specialist for Amazon EMR at AWS. He works with AWS customers across the globe to strategize, build, develop, and deploy modern data analytics solutions.

Best practices to optimize data access performance from Amazon EMR and AWS Glue to Amazon S3

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/best-practices-to-optimize-data-access-performance-from-amazon-emr-and-aws-glue-to-amazon-s3/

Customers are increasingly building data lakes to store data at massive scale in the cloud. It’s common to use distributed computing engines, cloud-native databases, and data warehouses when you want to process and analyze your data in data lakes. Amazon EMR and AWS Glue are two key services you can use for such use cases. Amazon EMR is a managed big data framework that supports several different applications, including Apache Spark, Apache Hive, Presto, Trino, and Apache HBase. AWS Glue Spark jobs run on top of Apache Spark, and distribute data processing workloads in parallel to perform extract, transform, and load (ETL) jobs to enrich, denormalize, mask, and tokenize data on a massive scale.

For data lake storage, customers typically use Amazon Simple Storage Service (Amazon S3) because it’s secure, scalable, durable, and highly available. Amazon S3 is designed for 11 9’s of durability and stores over 200 trillion objects for millions of applications around the world, making it the ideal storage destination for your data lake. Amazon S3 averages over 100 million operations per second, so your applications can easily achieve high request rates when using Amazon S3 as your data lake.

This post describes best practices to achieve the performance scaling you need when analyzing data in Amazon S3 using Amazon EMR and AWS Glue. We specifically focus on optimizing for Apache Spark on Amazon EMR and AWS Glue Spark jobs.

Optimizing Amazon S3 performance for large Amazon EMR and AWS Glue jobs

Amazon S3 is a very large distributed system, and you can scale to thousands of transactions per second in request performance when your applications read and write data to Amazon S3. Amazon S3 performance isn’t defined per bucket, but per prefix in a bucket. Your applications can achieve at least 3,500 PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per prefix in a bucket. Additionally, there are no limits to the number of prefixes in a bucket, so you can horizontally scale your read or write performance using parallelization. For example, if you create 10 prefixes in an S3 bucket to parallelize reads, you could scale your read performance to 55,000 read requests per second. You can similarly scale writes by writing data across multiple prefixes.

You can scale performance by utilizing automatic scaling in Amazon S3 and scan millions of objects for queries run over petabytes of data. Amazon S3 automatically scales in response to sustained new request rates, dynamically optimizing performance. While Amazon S3 is internally optimizing for a new request rate, you receive HTTP 503 request responses temporarily until the optimization completes:

AmazonS3Exception: Please reduce your request rate. (Service: Amazon S3; Status Code: 503; Error Code: SlowDown)

Such situations require the application to retry momentarily, but after Amazon S3 internally optimizes performance for the new request rate, all requests are generally served without retries. One such situation is when multiple workers in distributed compute engines such as Amazon EMR and AWS Glue momentarily generate a high number of requests to access data under the same prefix.

When using Amazon EMR and AWS Glue to process data in Amazon S3, you can employ certain best practices to manage request traffic and avoid HTTP Slow Down errors. Let’s look at some of these strategies.

Best practices to manage HTTP Slow Down responses

You can use the following approaches to take advantage of the horizontal scaling capability in Amazon S3 and improve the success rate of your requests when accessing Amazon S3 data using Amazon EMR and AWS Glue:

  • Modify the retry strategy for Amazon S3 requests
  • Adjust the number of Amazon S3 objects processed
  • Adjust the number of concurrent Amazon S3 requests

We recommend choosing and applying the options that fit best for your use case to optimize data processing on Amazon S3. In the following sections, we describe best practices of each approach.

Modify the retry strategy for Amazon S3 requests

This is the easiest way to avoid HTTP 503 Slow Down responses and improve the success rate of your requests. To access Amazon S3 data, both Amazon EMR and AWS Glue use the EMR File System (EMRFS), which retries Amazon S3 requests with jitters when it receives 503 Slow Down responses. To improve the success rate of your Amazon S3 requests, you can adjust your retry strategy by configuring certain properties. In Amazon EMR, you can configure parameters in your emrfs-site configuration. In AWS Glue, you can configure the parameters in job parameters. You can adjust your retry strategy in the following ways:

  • Increase the EMRFS default retry limit – By default, EMRFS uses an exponential backoff strategy to retry requests to Amazon S3. The default EMRFS retry limit is 15. However, you can increase this limit when you create a new cluster, on a running cluster, or at application runtime. To increase the retry limit, you can change the value of the fs.s3.maxRetries parameter. Note that you may experience longer job duration if you set a higher value for this parameter. We recommend experimenting with different values, such as 20 as a starting point, confirm the duration overhead of the jobs for each value, and then adjust this parameter based on your requirement.
  • For Amazon EMR, use the AIMD retry strategy – With Amazon EMR versions 6.4.0 and later, EMRFS supports an alternative retry strategy based on an additive-increase/multiplicative-decrease (AIMD) model. This strategy can be useful in shaping the request rate from large clusters. Instead of treating each request in isolation, this mode keeps track of the rate of recent successful and throttled requests. Requests are limited to a rate determined from the rate of recent successful requests. This decreases the number of throttled requests, and therefore the number of attempts needed per request. To enable the AIMD retry strategy, you can set the fs.s3.aimd.enabled property to true. You can further refine the AIMD retry strategy using the advanced AIMD retry settings.

Adjust the number of Amazon S3 objects processed

Another approach is to adjust the number of Amazon S3 objects processed so you have fewer requests made concurrently. When you lower the number of objects to be processed in a job, you use fewer Amazon S3 requests, thereby lowering the request rate or transactions per second (TPS) required for each job. Note the following considerations:

  • Preprocess the data by aggregating multiple smaller files into fewer, larger chunks – For example, use s3-dist-cp or an AWS Glue compaction blueprint to merge a large number of small files (generally less than 64 MB) into a smaller number of optimally sized files (such as 128–512 MB). This approach reduces the number of requests required, while simultaneously improving the aggregate throughput to read and process data in Amazon S3. You may need to experiment to arrive at the optimal size for your workload, because creating extremely large files can reduce the parallelism of the job.
  • Use partition pruning to scan data under specific partitions – In Apache Hive and Hive Metastore-compatible applications such as Apache Spark or Presto, one table can have multiple partition folders. Partition pruning is a technique to scan only the required data in a specific partition folder of a table. It’s useful when you want to read a specific portion from the entire table. To take advantage of predicate pushdown, you can use partition columns in the WHERE clause in Spark SQL or the filter expression in a DataFrame. In AWS Glue, you can also use a partition pushdown predicate when creating DynamicFrames.
  • For AWS Glue, enable job bookmarks – You can use AWS Glue job bookmarks to process continuously ingested data repeatedly. It only picks unprocessed data from the previous job run, thereby reducing the number of objects read or retrieved from Amazon S3.
  • For AWS Glue, enable bounded executionsAWS Glue bounded execution is a technique to only pick unprocessed data, with an upper bound on the dataset size or the number of files to be processed. This is another way to reduce the number of requests made to Amazon S3.

Adjust the number of concurrent Amazon S3 requests

To adjust the number of Amazon S3 requests to have fewer concurrent reads per prefix, you can configure Spark parameters. By default, Spark populates 10,000 tasks to list prefixes when creating Spark DataFrames. You may experience Slow Down responses, especially when you read from a table with highly nested prefix structures. In this case, it’s a good idea to configure Spark to limit the number of maximum listing parallelism by decreasing the parameter spark.sql.sources.parallelPartitionDiscovery.parallelism (the default is 10000).

To have fewer concurrent write requests per prefix, you can use the following techniques:

  • Reduce the number of Spark RDD partitions before writes – You can do this by using df.repartition(n) or df.coalesce(n) in DataFrames. For Spark SQL, you can also use query hints like REPARTITION or COALESCE. You can see the number of tasks (=RDD partitions) on the Spark UI.
  • For AWS Glue, group the input data – If the datasets are made up of small files, we recommend grouping the input data because it reduces the number of RDD partitions, and reduces the number of Amazon S3 requests to write the files.
  • Use the EMRFS S3-optimized committer – The EMRFS S3-optimized committer is used by default in Amazon EMR 5.19.0 and later, and AWS Glue 3.0. In AWS Glue 2.0, you can configure it in the job parameter --enable-s3-parquet-optimized-committer. The committer uses Amazon S3 multipart uploads instead of renaming files, and it usually reduces the number of HEAD/LIST requests significantly.

The following are other techniques to adjust the Amazon S3 request rate in Amazon EMR and AWS Glue. These options have the net effect of reducing parallelism of the Spark job, thereby reducing the probability of Amazon S3 Slow Down responses, although it can lead to longer job duration. We recommend testing and adjusting these values for your use case.

  • Reduce the number of concurrent jobs – Start with the most read/write heavy jobs. If you configured cross-account access for Amazon S3, keep in mind that other accounts might also be submitting jobs to the prefix.
  • Reduce the number of concurrent Spark tasks – You have several options:
    • For Amazon EMR, set the number of Spark executors (for example, the spark-submit option --num-executors and Spark parameter spark.executor.instance).
    • For AWS Glue, set the number of workers in the NumberOfWorkers parameter.
    • For AWS Glue, change the WorkerType parameter to a smaller one (for example, G.2X to G.1X).
    • Configure Spark parameters:
      • Decrease the number of spark.default.parallelism.
      • Decrease the number of spark.sql.shuffle.partitions.
      • Increase the number of spark.task.cpus (the default is 1) to allocate more CPU cores per Spark task.

Conclusion

In this post, we described the best practices to optimize data access from Amazon EMR and AWS Glue to Amazon S3. With these best practices, you can easily run Amazon EMR and AWS Glue jobs by taking advantage of Amazon S3 horizontal scaling, and process data in a highly distributed way at a massive scale.

For further guidance, please reach out to AWS Premium Support.

Appendix A: Configure CloudWatch request metrics

To monitor Amazon S3 requests, you can enable request metrics in Amazon CloudWatch for the bucket. Then, define a filter for the prefix. For a list of useful metrics to monitor, see Monitoring metrics with Amazon CloudWatch. After you enable metrics, use the data in the metrics to determine which of the aforementioned options is best for your use case.

Appendix B: Configure Spark parameters

To configure Spark parameters in Amazon EMR, there are several options:

  • spark-submit command – You can pass Spark parameters via the --conf option.
  • Job script – You can set Spark parameters in the SparkConf object in the job script codes.
  • Amazon EMR configurations – You can configure Spark parameters via API using Amazon EMR configurations. For more information, see Configure Spark.

To configure Spark parameters in AWS Glue, you can configure AWS Glue job parameters using key --conf with value like spark.hadoop.fs.s3.maxRetries=50.

To set multiple configs, configure your job parameters using key --conf with value like spark.hadoop.fs.s3.maxRetries=50 --conf spark.task.cpus=2.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is passionate about releasing AWS Glue connector custom blueprints and other software artifacts to help customers build their data lakes. In his spare time, he enjoys watching hermit crabs with his children.

Aditya Kalyanakrishnan is a Senior Product Manager on the Amazon S3 team at AWS. He enjoys learning from customers about how they use Amazon S3 and helping them scale performance. Adi’s based in Seattle, and in his spare time enjoys hiking and occasionally brewing beer.

New features from Apache Hudi 0.9.0 on Amazon EMR

Post Syndicated from Kunal Gautam original https://aws.amazon.com/blogs/big-data/new-features-from-apache-hudi-0-9-0-on-amazon-emr/

Apache Hudi is an open-source transactional data lake framework that greatly simplifies incremental data processing and data pipeline development. It does this by providing transaction support and record-level insert, update, and delete capabilities on data lakes on Amazon Simple Storage Service (Amazon S3) or Apache HDFS. Apache Hudi is integrated with open-source big data analytics frameworks, such as Apache Spark, Apache Hive, Presto, and Trino. Furthermore, Apache Hudi lets you maintain data in Amazon S3 or Apache HDFS in open formats such as Apache Parquet and Apache Avro.

Common use cases where we see customers use Apache Hudi are as follows:

  • To simplify data ingestion pipelines that deal with late-arriving or updated records from streaming and batch data sources.
  • To ingest data using Change Data Capture (CDC) from transactional systems.
  • To implement data-deletion pipelines to comply with data privacy regulations, e.g., GDPR (General Data Protection Regulation) compliance. Conforming to GDPR is a necessity of today’s modern data architectures, which includes the features of “right to erasure” or “right to be forgotten”, and it can be implemented using Apache Hudi capabilities in place of deletes and updates.

We are excited to announce that Apache Hudi 0.9.0 is available on Amazon EMR 5.34 and EMR 6.5.0. This is a major release, which includes Spark SQL DML and DDL support as its highlight, along with several other writer/reader side improvements. The 3x query performance improvement that we observe over Hudi 0.6.0 is especially remarkable so if you are looking to implement a transactional data lake with record level upserts and deletes or are using an older version of Hudi, this is a great version to use. In this post, we’ll focus on the following new features and improvements that come with the 0.9.0 release:

  • Spark SQL DML and DDL Support: Explore Spark SQL DML and DDL support.
  • Performance Improvements: Explore the performance improvements and new performance related features introduced on the writer and query side.
  • Additional Features: Explore additional useful features, such as Amazon DynamoDB-based locks for Optimistic Concurrency Control (OCC), delete partitions operation, etc.

Spark SQL DML and DDL support

The most exciting new feature is that Apache Hudi 0.9.0 adds support for DDL/DMLs using Spark SQL. This takes a huge step toward making Hudi more easily accessible, operable by all people (non-engineers, analysts, etc.). Moreover, it enables existing datasets to be easily migrated to Apache Hudi tables, and it takes a step closer to a low-code paradigm using Spark SQL DML and DDL hence eliminating the need to write scala/python code.

Users can now create tables using CREATE TABLE....USING HUDI and CREATE TABLE .. AS SELECT SQL statements to directly manage tables in AWS Glue catalog.

Then, users can use INSERT, UPDATE, MERGE INTO, and DELETE SQL statements to manipulate data. The INSERT OVERWRITE statement can be used to overwrite existing data in the table or partition for existing batch ETL pipelines.

Let’s run through a quick example where we create a Hudi table amazon_customer_review_hudi resembling the schema of Amazon Customer reviews Public Dataset and perform the following activities:

  • Pre-requisite: Create Amazon Simple Storage Service (S3) Buckets s3://EXAMPLE-BUCKET and s3://EXAMPLE-BUCKET-1
  • Create a partitioned Hudi table amazon_product_review_hudi
  • Create a source Hudi table amazon_customer_review_parquet_merge_source with contents that will be merged with the amazon_product_review_hudi table
  • Insert data into amazon_customer_review_parquet_merge_source and amazon_product_review_hudi as well as perform a merge operation by reading the data from
    amazon_customer_review_parquet_merge_source and merging with the Hudi table amazon_product_review_hudi
  • Perform a delete operation on amazon_customer_review_hudi over the previously inserted records

Configure Spark Session

We use the following script via EMR studio notebook, to configure Spark Session to work with Apache Hudi DML and DDL support. The following examples demonstrate how to launch the interactive Spark shell, use Spark submit, or use Amazon EMR Notebooks to work with Hudi on Amazon EMR. We recommend launching your EMR cluster with the following Apache Livy configuration:

[
    {
        "Classification": "livy-conf",
        "Properties": {
            "livy.file.local-dir-whitelist": "/usr/lib/hudi"
        }
    }
]

The above configuration lets you directly refer to the local /usr/lib/hudi/hudi-spark-bundle.jar on the EMR leader node while configuring the Spark session. Alternatively, you can also copy /usr/lib/hudi/hudi-spark-bundle.jar over to an HDFS location and refer to that while initializing Spark session. Here is a command for initializing the Spark session from a notebook:

%%configure -f
{
    "conf" : {
        "spark.jars":"file:///usr/lib/hudi/hudi-spark-bundle.jar",
        "spark.serializer":"org.apache.spark.serializer.KryoSerializer",
        "spark.sql.extensions":"org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
    }
}

Create a Table

Let’s create the following Apache Hudi tables amazon_customer_review_hudi and amazon_customer_review_parquet_merge_source

amazon_customer_review_hudi and amazon_customer_review_parquet_merge_source

%%sql 

/****************************
Create a HUDI table having schema same as of Amazon customer reviews table containing selected columns 
*****************************/

-- Hudi 0.9.0 configuration https://hudi.apache.org/docs/configurations
-- Hudi configurations can be set in options block as hoodie.datasource.hive_sync.assume_date_partitioning = 'false',


create table if not exists amazon_customer_review_hudi
    ( marketplace string, 
      review_id string, 
      customer_id string,
      product_title string,
      star_rating int,
      timestamp long ,
      review_date date,
      year string,
      month string ,
      day string
      )
      using hudi
      location 's3://EXAMPLE-BUCKET/my-hudi-dataset/'
      options ( 
      type = 'cow',  
      primaryKey = 'review_id', 
      preCombineField = 'timestamp',
      hoodie.datasource.write.hive_style_partitioning = 'true'
      )
      partitioned by (year,month,day);
      

-- Change Location 's3://EXAMPLE-BUCKET/my-hudi-dataset/' to appropriate S3 bucket you have created in your AWS account

%%sql 
/****************************
Create amazon_customer_review_parquet_merge_source  to be used as source for merging into amazon_customer_review_hudi.
The table contains deleteRecord column to track if deletion of record is needed
*****************************/


create table if not exists amazon_customer_review_parquet_merge_source 
       (
        marketplace string, 
        review_id string, 
        customer_id string,
        product_title string,
        star_rating int,
        review_date date,
        deleteRecord string
       )
       STORED AS PARQUET
       LOCATION 's3://EXAMPLE-BUCKET-1/toBeMergeData/'


-- Change Location (s3://EXAMPLE-BUCKET-1/toBeMergeData/') to appropriate S3 bucket you have created in your AWS account

For comparison if, amazon_customer_review_hudi was to be created using programmatic approach the PySpark sample code is as follows.

# Create a DataFrame
inputDF = spark.createDataFrame(
    [
         ("Italy", "11", "1111", "table", 5, 1648126827, "2015/05/02", "2015", "05", "02"),
         ("Spain", "22", "2222", "chair", 5, 1648126827, "2015/05/02", "2015", "05", "02")        
    ],
    ["marketplace", "review_id", "customer_id", "product_title", "star_rating", "timestamp", "review_date", "year", "month", "day" ]
)

# Print Schema of inputDF 
inputDF.printSchema()

# Specify common DataSourceWriteOptions in the single hudiOptions variable
hudiOptions = {
"hoodie.table.name": "amazon_customer_review_hudi",
"hoodie.datasource.write.recordkey.field": "review_id",
"hoodie.datasource.write.partitionpath.field": "year,month,day",
"hoodie.datasource.write.precombine.field": "timestamp",
"hoodie.datasource.write.hive_style_partitioning": "true", 
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.table": " amazon_customer_review_hudi",
"hoodie.datasource.hive_sync.partition_fields": "year,month,day",
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor"
}


# Create Hudi table and insert data into my_hudi_table_1 hudi table at the S3 location specified 
inputDF.write \
       .format("org.apache.hudi")\
       .option("hoodie.datasource.write.operation", "insert")\
       .options(**hudiOptions)\
       .mode("append")\
       .save("s3://EXAMPLE-BUCKET/my-hudi-dataset/") 

Insert data into the Hudi tables

Let’s insert records into the table amazon_customer_review_parquet_merge_source to be used for the merge operation. This includes inserting a row for fresh insert, update, and delete.

%%sql 

/****************************
 Insert a record into amazon_customer_review_parquet_merge_source for deletion 
*****************************/

-- The record will be deleted from amazon_customer_review_hudi after merge as deleteRecord  is set to yes

insert into amazon_customer_review_parquet_merge_source
    select
    'italy',
    '11',
    '1111',
    'table',
     5,
    TO_DATE(CAST(UNIX_TIMESTAMP('2015/05/02', 'yyyy/MM/dd') AS TIMESTAMP)) as  review_date,
    'yes' 
    
   

%%sql
/****************************
 Insert a record into amazon_customer_review_parquet_merge_source used for update
*****************************/

-- The record will be updated from amazon_customer_review_hudi with new Star rating and product_title after merge

insert into amazon_customer_review_parquet_merge_source
    select
    'spain',
    '22',
    '2222',
    'Relaxing chair',
     4,
    TO_DATE(CAST(UNIX_TIMESTAMP('2015/05/02', 'yyyy/MM/dd') AS TIMESTAMP)) as  review_date,
    'no' 


%%sql
/****************************
 Insert a record into amazon_customer_review_parquet_merge_source for insert 
*****************************/

-- The record will be inserted into amazon_customer_review_hudi after merge 

insert into amazon_customer_review_parquet_merge_source
    select
    'uk',
    '33',
    '3333',
    'hanger',
     3,
    TO_DATE(CAST(UNIX_TIMESTAMP('2015/05/02', 'yyyy/MM/dd') AS TIMESTAMP)) as  review_date,
    'no' 

Now let’s insert records into the amazon_customer_review_hudi table used as the destination table for the merge operation.

%%sql

/****************************
 Insert a record into amazon_customer_review_hudi table for deletion after merge 
*****************************/

-- Spark SQL date time functions https://spark.apache.org/docs/latest/api/sql/index.html#date_add

insert into amazon_customer_review_hudi 
    select 
    'italy',
    '11',
    '1111',
    'table',
     5,
    unix_timestamp(current_timestamp()) as timestamp,
    TO_DATE(CAST(UNIX_TIMESTAMP('2015/05/02', 'yyyy/MM/dd') AS TIMESTAMP)) as  review_date,
    date_format(date '2015-05-02', "yyyy") as year, 
    date_format(date '2015-05-02', "MM") as month,
    date_format(date '2015-05-02', "dd") as day  


%%sql
/****************************
 Insert a record into amazon_customer_review_hudi table for update after merge 
*****************************/

insert into  amazon_customer_review_hudi
    select 
    'spain',
    '22',
    '2222',
    'chair ',
     5,
    unix_timestamp(current_timestamp()) as timestamp,
    TO_DATE(CAST(UNIX_TIMESTAMP('2015/05/02', 'yyyy/MM/dd') AS TIMESTAMP)) as  review_date,
    date_format(date '2015-05-02', "yyyy") as year, 
    date_format(date '2015-05-02', "MM") as month,
    date_format(date '2015-05-02', "dd") as day  

Merge into

Let’s perform the merge from amazon_customer_review_parquet_merge_source into amazon_customer_review_hudi.

%%sql 

/*************************************
MergeInto : Merge Source Into Traget 
**************************************/

-- Source amazon_customer_review_parquet_merge_source 
-- Taget amazon_customer_review_hudi

merge into amazon_customer_review_hudi as target
using ( 
        select
        marketplace, 
        review_id, 
        customer_id,
        product_title,
        star_rating,
        review_date,
        deleteRecord,
        date_format(review_date, "yyyy") as year,
        date_format(review_date, "MM") as month,
        date_format(review_date, "dd") as day
        from amazon_customer_review_parquet_merge_source ) source
on target.review_id = source.review_id 
when matched and deleteRecord != 'yes' then 

update set target.timestamp = unix_timestamp(current_timestamp()),  
target.star_rating = source.star_rating, 
target.product_title = source.product_title

when matched and deleteRecord = 'yes' then delete

when not matched then insert 
      ( target.marketplace, 
        target.review_id, 
        target.customer_id,
        target.product_title,
        target.star_rating,
        target.timestamp ,
        target.review_date,
        target.year ,
        target.month  ,
        target.day
      ) 
      values
      (
        source.marketplace,
        source.review_id, 
        source.customer_id,
        source.product_title,
        source.star_rating,
        unix_timestamp(current_timestamp()),
        source.review_date,
        source.year , 
        source.month ,
        source.day 
       )

Considerations and Limitations

  • The merge-on condition can only be applied on primary key as of now.
    -- The merge condition is possible only on primary keys
    on target.review_id = source.review_id
  • Support for partial updates is supported for the Copy on Write (CoW) table, but it isn’t supported for the Merge on Read (MoR) tables.
  • The target table’s fields cannot be the right-value of the update expression for the MoR table:
    -- The update will result in an error as target columns are present on right hand side of the expression
    update set target.star_rating =  target.star_rating +1 

Delete a Record

Now let’s delete the inserted record.

%%sql

/*************************************
Delete the inserted record from amazon_customer_review_hudi table 
**************************************/
Delete from amazon_customer_review_hudi where review_id == '22'


%%sql 
/*************************************
Query the deleted record from amazon_customer_review_hudi table 
**************************************/
select * from amazon_customer_review_hudi where review_id == '22'

Schema Evolution

Hudi supports common schema evolution scenarios, such as adding a nullable field or promoting the datatype of a field. Let’s add a new column ssid (type int) to existing amazon_customer_review_hudi table, and insert a record with extra column. Hudi allows for querying both old and new data with the updated table schema.

%%sql

/*************************************
Adding a new column name ssid of type int to amazon_customer_review_hudi table
**************************************/

ALTER TABLE amazon_customer_review_hudi ADD COLUMNS (ssid int)

%%sql
/*************************************
Adding a new record to altered table amazon_customer_review_hudi 
**************************************/
insert into amazon_customer_review_hudi
    select 
    'germany',
    '55',
    '5555',
    'car',
     5,
    unix_timestamp(current_timestamp()) as timestamp,
    TO_DATE(CAST(UNIX_TIMESTAMP('2015/05/02', 'yyyy/MM/dd') AS TIMESTAMP)) as  review_date,
    10 as ssid,
    date_format(date '2015-05-02', "yyyy") as year, 
    date_format(date '2015-05-02', "MM") as month,
    date_format(date '2015-05-02', "dd") as day  

%%sql 
/*************************************
Promoting ssid type from int to long  
**************************************/
ALTER TABLE amazon_customer_review_hudi CHANGE COLUMN ssid ssid long


%%sql 
/*************************************
Querying data from amazon_customer_review_hudi table
**************************************/
select * from amazon_customer_review_hudi where review_id == '55'

Spark Performance Improvements

Query Side Improvements

Apache Hudi tables are now registered with the metastore as Spark Data Source tables. This enables Spark SQL queries on Hudi tables to use Spark’s native Parquet Reader in case of Copy on Write tables, and Hudi’s custom MergeOnReadSnapshotRelation in case of Merge on Read tables. Therefore, it no longer depends on Hive Input Format fallback within Spark, which isn’t as maintained and efficient as Spark’s native readers. This unlocks many optimizations, such as the use of Spark’s native parquet readers, and implementing Hudi’s own Spark FileIndex implementation. The File Index helps improve file listing performance via optimized caching, support for partition pruning, as well as the ability to list files via Hudi metadata table (instead of listing directly from Amazon S3). In addition, Hudi now supports time travel query via Spark data source, which lets you query snapshot of the dataset as of a historical time instant.

Other important things to note are:

  • Configurations such as spark.sql.hive.convertMetastoreParquet=false and mapreduce.input.pathFilter.class=org.apache.hudi.hadoop.HoodieROTablePathFilter are no longer needed while querying via Spark SQL.
  • Now you can use a non-globbed query path when querying Hudi datasets via Data Source API. This lets you query the table via base path without having to specify * in the query path.

We ran a performance benchmark derived from the 3 TB scale TPC-DS benchmark to determine the query performance improvements for Hudi 0.9.0 on EMR 6.5.0, relative to Hudi 0.6.0 on EMR 6.2.0 (at the beginning of 2021) for Copy on Write tables. The queries were run on 5-node c5.9xlarge EMR clusters.

In terms of Geometric Mean, the queries with Hudi 0.9.0 are three times faster than they were with Hudi 0.6.0. The following graphs compare the total aggregate runtime and geometric mean of runtime for all of the queries in the TPC-DS 3 TB query dataset between the two Amazon EMR/Hudi releases (lower is better):

Hudi-0.9 TPC-DS-1

In terms of Geometric Mean the queries with Hudi 0.9.0 are 3 times faster than they were with Hudi 0.6.0.

Writer side improvements

Virtual Keys Support

Apache Hudi maintains metadata by adding additional columns to the datasets. This lets it support upsert/delete operations and various capabilities around it, such as incremental queries, compaction, etc. These metadata columns (namely _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, _hoodie_file_name and _hoodie_commit_seqno) let Hudi uniquely identify a record, the partition/file in which a record exists, and the latest commit that updated a record.

However, generating and maintaining these metadata columns increases the storage footprint for Hudi tables on disk. Some of these columns, such as _hoodie_record_key and _hoodie_partition_path, can be constructed from other data columns already stored in the datasets. Apache Hudi 0.9.0 has introduced support for Virtual Keys. This lets users disable the generation of these metadata columns, and instead depend on actual data columns to construct the record key/partition paths dynamically using appropriate key generators. This helps in reducing the storage footprint, as well as improving ingestion time. However, this feature comes with the following caveats:

  • This is only meant to be used for Append Only / Immutable data. It can’t be used for use cases requiring upserts and deletes, which requires metadata columns such as _hoodie_record_key and _hoodie_partition_path for bloom indexes to work.
  • Incremental queries will not be supported, because they need _hoodie_commit_time to filter records written/updated at a specific time.
  • Once this feature is enabled, it can’t be turned off for an existing table.

The feature is turned off by default, and it can be enabled by setting hoodie.populate.meta.fields to false. We measured the write performance and storage footprint improvements using Bulk Insert with public Amazon Customer Reviews dataset. Here is the code snippet that we used:

import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.spark.sql.SaveMode

var srcPath = "s3://amazon-reviews-pds/parquet/"
var tableName = "amazon_reviews_table"
var tablePath = "s3://<bucket>/<prefix>/" + tableName

val inputDF = spark.read.format("parquet").load(srcPath)

inputDF.write.format("hudi")
 .option(HoodieWriteConfig.TABLE_NAME, tableName)
 .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
 .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
 .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "review_id")
 .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "product_category") 
 .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "review_date")
 .option("hoodie.populate.meta.fields", "<true/false>")
 .mode(SaveMode.Overwrite)
 .save(tablePath)

The experiment was run on a four node c4.2xlarge EMR cluster (one leader, three core). We observed a 10.63% improvement in the write runtime performance, and a 8.67% reduction in storage footprint with virtual keys enabled. The following graph compares the bulk insert runtime and table size with and without virtual keys (lower is better):

BDB-2071-Virtual_key_1

BDB-2071-Virtual_key_2” width=

Timeline Server-based Marker Mechanism

Apache Hudi supports the automatic cleaning up of uncommitted data written during write operations. This cleaning is supported by generating marker files corresponding to each data file, which serves as a method to track data files of interest rather than having to scan the entire table by listing all of the files. Although the existing marker mechanism is much more efficient than scanning the entire table for uncommitted data files, it can still have a performance impact for Amazon S3 data lakes. For example, writing a significant number of marker files (one per-data file) and then deleting them following a successful commit could take a non-trivial amount of time, sometimes in the order of several minutes. In addition, it has the potential to hit Amazon S3 throttling limits when a significant number of data/marker files are being written concurrently.

Apache Hudi 0.9.0 introduces a new timeline server based implementation of this marker mechanism. This makes it more efficient for Amazon S3 workloads by improving the overall write performance, as well as significantly decreasing the probability of hitting Amazon S3 throttle limits. The new mechanism uses Hudi’s timeline server component as a central place for processing all of the marker creation/deletion requests (from all executors), which allows for batching of these requests and reducing the number of requests to Amazon S3. Therefore, users with Amazon S3 data lakes can leverage this to improve write operations performance and avoid throttling due to marker files management. It would be especially impactful for scenarios where a significant number of data files (e.g., 10k or more) are being written.

This new mechanism is not enabled by default, and it can be enabled by setting hoodie.write.markers.type to timeline_server_based, for the write operation. For more details about the feature, refer to this blog post by the Apache Hudi community.

Additional Improvements

DynamoDB-based Locking

Optimistic Concurrency Control was one of the major features introduced with Apache Hudi 0.8.0 to allow multiple concurrent writers to ingest data into the same Hudi table. The feature requires acquiring locks for which either Zookeeper (default on EMR) or Hive Metastore could be used. However, these lock providers require all of the writers to be running on the same cluster on which the Zookeeper/Hive Metastore is running.

Apache Hudi 0.9.0 on Amazon EMR has introduced DynamoDB as a lock provider. This would let multiple writers running across different clusters ingest data into the same Hudi table. This feature was originally added to Hudi 0.9.0 on Amazon EMR, and it contributed back to open source Hudi in version 0.10.0. To configure this, the following properties should be set:

Configuration Value Description Required
hoodie.write.lock.provider org.apache.hudi.client.
transaction.lock.
DynamoDBBasedLockProvider
Lock Provider implementation to be used Yes
hoodie.write.lock.dynamodb.
table
<String> DynamoDB table name to be used for acquiring locks. If the table doesn’t exist, it will be created. The same table can be used across all of your Hudi jobs operating on the same or different tables Yes
hoodie.write.lock.dynamodb.
partition_key
<String> String Value to be used for the locks table partition key attribute. It must be a string that uniquely identifies a Hudi table, such as the Hudi table name No. Default: Hudi Table Name
hoodie.write.lock.dynamodb.
region
<String> AWS Region in which the DynamoDB locks table exists, or must be created.

No. Default:

us-east-1

hoodie.write.lock.dynamodb.
billing_mode
<String> DynamoDB billing mode to be used for the locks table while creating. If the table already exists, then this doesn’t have an effect No. Default:
PAY_PER_REQUEST
hoodie.write.lock.dynamodb.
read_capacity
<Integer> DynamoDB read capacity to be used for the locks table while creating. If the table already exists, then this doesn’t have an effect No. Default: 20
hoodie.write.lock.dynamodb.
write_capacity
<Integer> DynamoDB write capacity to be used for the locks table while creating. If the table already exists, then this doesn’t have an effect No. Default: 10

Furthermore, Optimistic Concurrency Control must be enabled via the following:

hoodie.write.concurrency.mode = optimistic_concurrency_control
hoodie.cleaner.policy.failed.writes = LAZY

You can seamlessly configure these properties at the cluster level, using EMR Configurations API with hudi-defaults classification, to avoid having to configure it with every job.

Delete partitions

Apache Hudi 0.9.0 introduces a DELETE_PARTITION operation for its Spark Data Source API that can be leveraged to delete partitions. Here is a scala example of how to leverage this operation:

import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.spark.sql.SaveMode

val deletePartitionDF = spark.emptyDataFrame

deletePartitionDF.write.format("hudi")
 .option(HoodieWriteConfig.TABLE_NAME, "<table name>")
 .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL)
 .option(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key(), "<partition_value1>,<partition_value2>")
 .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
 .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "<record key(s)>")
 .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "<partition field(s)>") 
 .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "<precombine key>")
 .mode(SaveMode.Append)
 .save("<table path>")

However, there is a known issue:

  • Hive Sync fails when performed along with DELETE_PARTITION operation because of a bug. Hive Sync will succeed for any future insert/upsert/delete operation performed following the delete partition operation. This bug has been fixed in Hudi release 0.10.0.

Asynchronous Clustering

Apache Hudi 0.9.0 introduces support for asynchronous clustering via Spark structured streaming sink and Delta Streamer. This lets users continue ingesting data into the data lake, while the clustering service continues to run in the background to reorganize data for improved query performance and optimal file sizes. This is made possible with the Optimistic Concurrency Control feature introduced in Hudi 0.8.0. Currently, clustering can only be scheduled for partitions that aren’t receiving any concurrent updates. Additional details on how to get started with this feature can be found in this blog post.

Conclusion

In this post, we shared some of the new and exciting features in Hudi 0.9.0 available on Amazon EMR versions 5.34 and 6.5.0 and later. These new features enable the ability for data pipelines to be built solely with SQL statements, thereby making it easier to build transactional data lakes on Amazon S3.

As a next step, for a hands on experience on Hudi 0.9.0 on EMR, try out the notebook here on EMR Studio using Amazon EMR version 6.5.0 and let us know your feedback.


About the Authors

Kunal Gautam is a Senior Big Data Architect at Amazon Web Services. Having experience in building his own Startup and working along with enterprises, he brings a unique perspective to get people, business and technology work in tandem for customers. He is passionate about helping customers in their digital transformation journey and enables them to build scalable data and advance analytics solutions to gain timely insights and make critical business decisions. In his spare time, Kunal enjoys Marathons, Tech Meetups and Meditation retreats.

Gabriele Cacciola is a Senior Data Architect working for the Professional Service team with Amazon Web Services. Coming from a solid Startup experience, he currently helps enterprise customers across EMEA implement their ideas, innovate using the latest tech and build scalable data and analytics solutions to make critical business decisions. In his free time, Gabriele enjoys football and cooking.

Udit Mehrotra is a software development engineer at Amazon Web Services and an Apache Hudi PMC member/committer. He works on cutting-edge features of Amazon EMR and is also involved in open-source projects such as Apache Hudi, Apache Spark, Apache Hadoop, and Apache Hive. In his spare time, he likes to play guitar, travel, binge watch, and hang out with friends.

Up to 15 times improvement in Hive write performance with the Amazon EMR Hive zero-rename feature

Post Syndicated from Suthan Phillips original https://aws.amazon.com/blogs/big-data/up-to-15-times-improvement-in-hive-write-performance-with-the-amazon-emr-hive-zero-rename-feature/

Our customers use Apache Hive on Amazon EMR for large-scale data analytics and extract, transform, and load (ETL) jobs. Amazon EMR Hive uses Apache Tez as the default job execution engine, which creates Directed Acyclic Graphs (DAGs) to process data. Each DAG can contain multiple vertices from which tasks are created to run the application in parallel. Their final output is written to Amazon Simple Storage Service (Amazon S3).

Hive initially writes data to staging directories and then move it to the final location after a series of rename operations. This design of Hive renames supports task failure recovery, such as rescheduling the failed task with another attempt, running speculative execution, and recovering from a failed job attempt. These move and rename operations don’t have a significant performance impact in HDFS because it’s only a metadata operation when compared to Amazon S3 where the performance can degrade significantly based on the number of files written.

This post discusses the new optimized committer for Hive in Amazon EMR and also highlights its impressive performance by running a TPCx-BB performance benchmark and comparing it with the Hive default commit logic.

How Hive commit logic works

By default, Apache Hive manages the task and job commit phase and doesn’t have support for pluggable Hadoop output committers, which you can use to customize Hive’s file commit behavior.

In its current state, the rename operation with Hive-managed and external tables happens in three places:

  • Task commit – The output of task attempts is stored in its own staging directory. In the task commit phase, they’re renamed and moved to a task-specific staging directory.
  • Job commit – In this phase, the final output is generated from the output of all committed tasks of a job attempt. Task-specific staging directories are renamed and moved to the job commit staging directory.
  • Move task – The job commit staging directory is renamed or moved to the final table directory.

The impact of these rename operations is more significant on Hive jobs writing a large number of files.

Hive EMRFS S3-optimized committer

To mitigate the slowdown in write performance due to renames, we added support for output committers in Hive. We developed a new output committer, the Hive EMRFS S3-optimized committer, to avoid Hive rename operations. This committer directly writes the data to the output location, and the file commit happens only at the end of the job to ensure that it is resilient to job failures.

It modifies the default Hive file naming convention from <task_id>_<attempt_id>_<copy_n> to <task_id>_<attempt_id>_<copy_n>-<query_id>. For example, after an insert query in a Hive table, the output file is generated as 000000_0-hadoop_20210714130459_ba7c23ec-5695-4947-9d98-8a40ef759222-1 instead of 000000_0, where the suffix is the combination of user_name, timestamp, and UUID, which forms the query ID.

Performance evaluation

We ran the TPCx-BB Express Benchmark tests with and without the new committer and evaluated the write performance improvement.

The following graph shows performance improvement measured as total runtime of the queries. With the new committer, the runtime is better(lower).

This optimization is for Hive writes and hence the majority of improvement occurred in the load test, which is the writing phase of the benchmark. We observed an approximate 15-times reduction in runtime. However, we didn’t see much improvement in the power test and throughput test because each query is just writing a single file to the final table.

The benchmark used in this post is derived from the industry-standard TPCx-BB benchmark, and has the following characteristics:

  • The schema and data are used unmodified from TPCx-BB.
  • The scale factor used is 1000.
  • The queries are used unmodified from TPCx-BB.
  • The suite has three tests: the load test is the process of building of test database and is write heavy; the power test determines the maximum speed the system takes to run all the queries; and the Throughput test runs the queries in concurrent streams. The run elapsed times are used as the primary metric.
  • The power tests and throughput tests include 25 out of 30 queries. The five queries for machine learning workloads were excluded.

Note that this is derived from the TPCx-BB benchmark, and as such is not comparable to published TPCx-BB results, as the results of our tests do not comply with the specification.

Understanding performance impact with different data sizes and number of files

To benchmark the performance impact with variable data sizes and number of files, we also evaluated the following INSERT OVERWRITE query over the store_sales table from the TPC-DS dataset with additional variations, such as size of data (1 GB, 5 GB, 10 GB, 25 GB, 50 GB, 100 GB), number of files, and number of partitions:

SET partitions=100.0
SET files_per_partition=10;

CREATE TABLE store_sales_simple_test
(ss_sold_time_sk int, ss_item_sk int, ss_customer_sk int,
ss_cdemo_sk int, ss_hdemo_sk int, ss_addr_sk int,
ss_store_sk int, ss_promo_sk int, ss_ticket_number bigint,
ss_quantity int, ss_wholesale_cost decimal(7,2),
ss_list_price decimal(7,2), ss_sales_price decimal(7,2),
ss_ext_discount_amt decimal(7,2),
ss_ext_sales_price decimal(7,2),
ss_ext_wholesale_cost decimal(7,2),
ss_ext_list_price decimal(7,2), ss_ext_tax decimal(7,2),
ss_coupon_amt decimal(7,2), ss_net_paid decimal(7,2),
ss_net_paid_inc_tax decimal(7,2),
ss_net_profit decimal(7,2), ss_sold_date_sk int)
PARTITIONED BY (part_key int)
STORED AS ORC
LOCATION 's3://<bucket>/<table_location>';

Insert overwrite table store_sales_simple_test
select * , FLOOR(RAND()*${partitions}) as part_key
from store_sales distribute by part_key, FLOOR(RAND()*${files_per_partition});

The results show that the number of files written is the critical factor for performance improvement when using this new committer in comparison to the default Hive commit logic.

In the following graph, the y-axis denotes the speedup (total time taken with rename / total time taken by query with committer), and the x-axis denotes the data size.

Enabling the feature

To enable Amazon EMR Hive to use HiveEMRFSOptimizedCommitter to commit data as the default for all Hive-managed and external tables, use the following hive-site configuration starting with EMR 6.5.0 or EMR 5.34.0 clusters:

[
  {
    "classification": "hive-site",
    "properties": {
      "hive.blobstore.use.output-committer": "true"
    }
  }
]

The new committer is not compatible with the hive.exec.parallel=true setting. Be sure to not enable both settings at the same time in Amazon EMR 6.5.0. In future EMR releases, parallel execution will automatically be disabled when the new Hive committer is used.

Limitations

This committer will not be used and default Hive commit logic will be applied in the following scenarios:

  • When merge small files (hive.merge.tezfiles) is enabled
  • When using Hive ACID tables
  • When partitions are distributed across file systems such as HDFS and Amazon S3

Summary

The Hive EMRFS S3-optimized committer improves write performance compared to the default Hive commit logic, eliminating Amazon S3 renames. You can use this feature starting with Amazon EMR 6.5.0 and Amazon EMR 5.34.0.

Stay tuned for additional updates on new features and further improvements in Apache Hive on Amazon EMR.


About the Authors

Suthan Phillips works with customers to provide them architectural guidance and helps them achieve performance enhancements for complex applications on Amazon EMR. In his spare time, he enjoys hiking and exploring the Pacific Northwest.

Aditya Shah is a Software Development Engineer at AWS. He is interested in Databases and Data warehouse engines and has worked on distributed filesystem, ACID compliance and metadata management of Apache Hive. When not thinking about data, he is browsing pages of internet to sate his appetite for random trivia and is a movie geek at heart.

Syed Shameerur Rahman is a software development engineer at Amazon EMR. He is interested in highly scalable, distributed computing. He is an active contributor of open source projects like Apache Hive, Apache Tez, Apache ORC and has contributed important features and optimizations. During his free time, he enjoys exploring new places and food.

How Cynamics built a high-scale, near-real-time, streaming AI inference system using AWS

Post Syndicated from Aviv Yehezkel original https://aws.amazon.com/blogs/big-data/how-cynamics-built-a-high-scale-near-real-time-streaming-ai-inference-system-using-aws/

This post is co-authored by Dr. Yehezkel Aviv, Co-Founder and CTO of Cynamics and Sapir Kraus, Head of Engineering at Cynamics.

Cynamics provides a new paradigm of cybersecurity — predicting attacks long before they hit by collecting small network samples (less than 1%), inferring from them how the full network (100%) behaves, and predicting threats using unique AI breakthroughs. The sample approach allows Cynamics to be generic, agnostic, and work for any client’s network architecture, no matter how messy the mix between legacy, private, and public clouds. Furthermore, the solution is scalable and provides full cover to the client’s network, no matter how large it is in volume and size. Moreover, because any network gateway (physical or virtual, legacy or cloud) supports one of the standard sampling protocols and APIs, Cynamics doesn’t require any installation of appliances nor agents, as well as no network changes and modifications, and the onboarding usually takes less than an hour.

In the crowded cybersecurity market, Cynamics is the first-ever solution based on small network samples, which has been considered a hard and unsolved challenge in academia (our academic paper “Network anomaly detection using transfer learning based on auto-encoders loss normalization” was recently presented in ACM CCS AISec 2021) and industry to this day.

The problem Cynamics faced

Early in the process, with the growth of our customer base, we were required to seamlessly support the increased scale and network throughput by our unique AI algorithms. We faced a few different challenges:

  • How can we perform near-real-time analysis on our streaming clients’ incoming data into our AI inference system to predict threats and attacks?
  • How can we seamlessly auto scale our solution to be cost-efficient with no impact on the platform ingestion rate?
  • Because many of our customers are from the public sector, how can we do this while supporting both AWS commercial and government environments (GovCloud)?

This post shows how we used AWS managed services and in particular Amazon Kinesis Data Streams and Amazon EMR to build a near-real-time streaming AI inference system serving hundreds of production customers in both AWS commercial and government environments, while seamlessly auto scaling.

Overview of solution

The following diagram illustrates our solution architecture:

To provide a cost-efficient, highly available solution that scales easily with user growth, while having no impact on near-real-time performance, we turned to Amazon EMR.

We currently process over 50 million records per day, which translates to just over 5 billion flows, and keeps growing on a daily basis. Using Amazon EMR along with Kinesis Data Streams provided the scalability we needed to achieve inference times of just a few seconds.

Although this technology was new to us, we minimized our learning curve by turning to the available guides from AWS for best practices on scale, partitioning, and resource management.

Workflow

Our workflow contains the following steps:

  1. Flow samples are sent by the client’s network devices directly to the Cynamics cloud. A network flow (or connection) is a set of packets with the same five-tuple ID: source-IP-address, destination-IP-address, source-port, destination-port, and protocol.
  2. The samples are analyzed by Network Load Balancers, which forward them into an auto scaling group of stateless flow transformers running on Graviton-powered Amazon Elastic Compute Cloud (Amazon EC2) instances. With Graviton-based processors in the flow transformers, we reduced our operational costs by over 30%.
  3. The flows are transformed to the Cynamics data format and enriched with additional information from Cynamics’ databases and in-house sources such as IP resolutions, intelligence, and reputation.

The following figures show the network scale for a single flow transformer machine over a week. The first figure illustrates incoming network packets for a single flow transformer machine.

The following shows outcoming network packets for a single flow transformer machine.

The following shows incoming network bytes for a single flow transformer machine.

The following shows outcoming network bytes for a single flow transformer machine.

  1. The flows are sent using Kinesis Data Streams to the real-time analysis engine.
  2. The Amazon EMR-based real-time engine consumes records in a few seconds batches using Yarn/Spark. The sampling rate of each client is dynamically tuned according to its throughput to ensure a fixed incoming data rate for all clients. We achieved this using Amazon EMR Managed Scaling with a custom policy (available with Amazon EMR versions 5.30.1 and later), which allows us to scale EMR nodes in or out based on Amazon CloudWatch metrics, with two different rules for scale-out and scale-in. The metric we created is based on the Amazon EMR running time, because our real-time AI threat detection runs on a sliding window interval of a few seconds.
    1. The scale-out policy tracks the average running time over a period of 10 minutes, and scales the EMR nodes if it’s longer than 95% of the required interval. This allows us to prevent processing delays.
    2. Similarly, the scale-in policy uses the same metric but measures the average over a 30-minute period, and scales the cluster accordingly. This enables us to optimize cluster costs and reduce the number of EMR nodes in off-hours.
  3. To optimize and seamlessly scale our AI inference calls, these were made available through an ALB and another auto scaling group of servers (AI model-service).
  4. We use Amazon DynamoDB as a fast and highly available states table.

The following figure shows the number of records processed by the Kinesis data stream over a single day.

The following shows the Kinesis data streams records rate per minute.

AI predictions and threat detections are sent to continued processing and alerting, and are saved in Amazon DocumentDB (with MongoDB compatibility).

Conclusion

With the approach described in this post, Cynamics has been providing threat prediction based on near-real-time analysis of its unique AI algorithms for a constantly growing customer base in a seamless and automatically scalable way. Since first implementing the solution, we’ve managed to easily and linearly scale our architecture, and were able to further optimize our costs by transitioning to Graviton-based processors in the flow transformers, which reduced over 30% of our flow transformers costs.

We’re considering the following next steps:

  • An automatic machine learning lifecycle using an Amazon SageMaker Studio pipeline, which includes the following steps:
  • Additional cost reduction by moving the EMR instances to be Graviton-based as well, which should yield an additional 20% reduction.

About the Authors

Dr. Yehezkel Aviv is the co-founder and CTO of Cynamics, leading the company innovation and technology. Aviv holds a PhD in Computer Science from the Technion, specializing in cybersecurity, AI, and ML.

Sapir Kraus is Head of Engineering at Cynamics, where his core focus is managing the software development lifecycle. His responsibilities also include software architecture and providing technical guidance to team members. Outside of work, he enjoys roasting coffee and barbecuing.

Omer Haim is a Startup Solutions Architect at Amazon Web Services. He helps startups with their cloud journey, and is passionate about containers and ML. In his spare time, Omer likes to travel, and occasionally game with his son.