All posts by Kinnar Kumar Sen

Run fault tolerant and cost-optimized Spark clusters using Amazon EMR on EKS and Amazon EC2 Spot Instances

Post Syndicated from Kinnar Kumar Sen original https://aws.amazon.com/blogs/big-data/run-fault-tolerant-and-cost-optimized-spark-clusters-using-amazon-emr-on-eks-and-amazon-ec2-spot-instances/

Amazon EMR on EKS is a deployment option in Amazon EMR that allows you to run Spark jobs on Amazon Elastic Kubernetes Service (Amazon EKS). Amazon Elastic Compute Cloud (Amazon EC2) Spot Instances save you up to 90% over On-Demand Instances, and is a great way to cost optimize the Spark workloads running on Amazon EMR on EKS. Because Spot is an interruptible service, if we can move or reuse the intermediate shuffle files, it improves the overall stability and SLA of the job. The latest versions of Amazon EMR on EKS have integrated Spark features to enable this capability.

In this post, we discuss these features—Node Decommissioning and Persistent Volume Claim (PVC) reuse—and their impact on increasing the fault tolerance of Spark jobs on Amazon EMR on EKS when cost optimizing using EC2 Spot Instances.

Amazon EMR on EKS and Spot

EC2 Spot Instances are spare EC2 capacity provided at a steep discount of up to 90% over On-Demand prices. Spot Instances are a great choice for stateless and flexible workloads. The caveat with this discount and spare capacity is that Amazon EC2 can interrupt an instance with a proactive or reactive (2-minute) warning when it needs the capacity back. You can provision compute capacity in an EKS cluster using Spot Instances using a managed or self-managed node group and provide cost optimization for your workloads.

Amazon EMR on EKS uses Amazon EKS to run jobs with the EMR runtime for Apache Spark, which can be cost optimized by running the Spark executors on Spot. It provides up to 61% lower costs and up to 68% performance improvement for Spark workloads on Amazon EKS. The Spark application launches a driver and executors to run the computation. Spark is a semi-fault tolerant framework that is resilient to executor loss due to an interruption and therefore can run on EC2 Spot. On the other hand, when the driver is interrupted, the job fails. Hence, we recommend running drivers on on-demand instances. Some of the best practices for running Spark on Amazon EKS are applicable with Amazon EMR on EKS.

EC2 Spot instances also helps in cost optimization by improving the overall throughput of the job. This can be achieved by auto-scaling the cluster using Cluster Autoscaler (for managed nodegroups) or Karpenter.

Though Spark executors are resilient to Spot interruptions, the shuffle files and RDD data is lost when the executor gets killed. The lost shuffle files need to be recomputed, which increases the overall runtime of the job. Apache Spark has released two features (in versions 3.1 and 3.2) that addresses this issue. Amazon EMR on EKS released features such as node decommissioning (version 6.3) and PVC reuse (version 6.8) to simplify recovery and reuse shuffle files, which increases the overall resiliency of your application.

Node decommissioning

The node decommissioning feature works by preventing scheduling of new jobs on the nodes that are to be decommissioned. It also moves any shuffle files or cache present in those nodes to other executors (peers). If there are no other available executors, the shuffle files and cache are moved to a remote fallback storage.

Node Decommissioning

Fig 1 : Node Decommissioning

Let’s look at the decommission steps in more detail.

If one of the nodes that is running executors is interrupted, the executor starts the process of decommissioning and sends the message to the driver:

21/05/05 17:41:41 WARN KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Received executor 7 decommissioned message
21/05/05 17:41:41 DEBUG TaskSetManager: Valid locality levels for TaskSet 2.0: NO_PREF, ANY
21/05/05 17:41:41 INFO KubernetesClusterSchedulerBackend: Decommission executors: 7
21/05/05 17:41:41 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, runningTasks: 10
21/05/05 17:41:41 INFO BlockManagerMasterEndpoint: Mark BlockManagers (BlockManagerId(7, 192.168.82.107, 39007, None)) as being decommissioning.
21/05/05 20:22:17 INFO CoarseGrainedExecutorBackend: Decommission executor 1.
21/05/05 20:22:17 INFO CoarseGrainedExecutorBackend: Will exit when finished decommissioning
21/05/05 20:22:17 INFO BlockManager: Starting block manager decommissioning process...
21/05/05 20:22:17 DEBUG FileSystem: Looking for FS supporting s3a

The executor looks for RDD or shuffle files and tries to replicate or migrate those files. It first tries to find a peer executor. If successful, it will move the files to the peer executor:

22/06/07 20:41:38 INFO ShuffleStatus: Updating map output for 46 to BlockManagerId(4, 192.168.13.235, 34737, None)
22/06/07 20:41:38 DEBUG BlockManagerMasterEndpoint: Received shuffle data block update for 0 46, ignore.
22/06/07 20:41:38 DEBUG BlockManagerMasterEndpoint: Received shuffle index block update for 0 46, updating.

However, if It is not able to find a peer executor, it will try to move the files to a fallback storage if available.

Fallback Storage

Fig 2: Fallback Storage

The executor is then decommissioned. When a new executor comes up, the shuffle files are reused:

22/06/07 20:42:50 INFO BasicExecutorFeatureStep: Adding decommission script to lifecycle
22/06/07 20:42:50 DEBUG ExecutorPodsAllocator: Requested executor with id 19 from Kubernetes.
22/06/07 20:42:50 DEBUG ExecutorPodsWatchSnapshotSource: Received executor pod update for pod named amazon-reviews-word-count-bfd0a5813fd1b80f-exec-19, action ADDED
22/06/07 20:42:50 DEBUG BlockManagerMasterEndpoint: Received shuffle index block update for 0 52, updating.
22/06/07 20:42:50 INFO ShuffleStatus: Recover 52 BlockManagerId(fallback, remote, 7337, None)

The key advantage of this process is that it enables migrates blocks and shuffle data, thereby reducing recomputation, which adds to the overall resiliency of the system and reduces runtime. This process can be triggered by a Spot interruption signal (Sigterm) and node draining. Node draining  may happen due to high-priority task scheduling or independently.

When you use Amazon EMR on EKS with managed node groups/Karpenter, the Spot interruption handling is automated, wherein Amazon EKS gracefully drains and rebalances the Spot nodes to minimize application disruption when a Spot node is at elevated risk of interruption. If you’re using managed node groups/Karpenter, the decommission gets triggered when the nodes are getting drained and because it’s proactive, it gives you more time (at least 2 minutes) to move the files. In the case of self-managed node groups, we recommend installing the AWS Node Termination Handler to handle the interruption, and the decommission is triggered when the reactive (2-minute) notification is received. We recommend to use Karpenter with Spot Instances as it has faster node scheduling with early pod binding and binpacking to optimize the resource utilization.

The following code enables this configuration; more details are available on GitHub:

"spark.decommission.enabled": "true"
"spark.storage.decommission.rddBlocks.enabled": "true"
"spark.storage.decommission.shuffleBlocks.enabled" : "true"
"spark.storage.decommission.enabled": "true"
"spark.storage.decommission.fallbackStorage.path": "s3://<<bucket>>"

PVC reuse

Apache Spark enabled dynamic PVC in version 3.1, which is useful with dynamic allocation because we don’t have to pre-create the claims or volumes for the executors and delete them after completion. PVC enables true decoupling of data and processing when we’re running Spark jobs on Kubernetes, because we can use it as a local storage to spill in-process files too. The latest version of Amazon EMR 6.8 has integrated the PVC reuse feature of Spark, wherein if an executor is terminated due to EC2 Spot interruption or any other reason (JVM), then the PVC is not deleted but persisted and reattached to another executor. If there are shuffle files in that volume, then they are reused.

As with node decommission, this reduces the overall runtime because we don’t have to recompute the shuffle files. We also save the time required to request a new volume for an executor, and shuffle files can be reused without moving the files round.

The following diagram illustrates this workflow.

PVC Reuse

Fig 3: PVC Reuse

Let’s look at the steps in more detail.

If one or more of the nodes that are running executors is interrupted, the underlying pods get terminated and the driver gets the update. Note that the driver is the owner of the PVC of the executors, and they are not terminated. See the following code:

22/06/15 23:25:07 DEBUG ExecutorPodsWatchSnapshotSource: Received executor pod update for pod named amazon-reviews-word-count-9ee82b8169a75183-exec-3, action DELETED
22/06/15 23:25:07 DEBUG ExecutorPodsWatchSnapshotSource: Received executor pod update for pod named amazon-reviews-word-count-9ee82b8169a75183-exec-6, action MODIFIED
22/06/15 23:25:07 DEBUG ExecutorPodsWatchSnapshotSource: Received executor pod update for pod named amazon-reviews-word-count-9ee82b8169a75183-exec-6, action DELETED
22/06/15 23:25:07 DEBUG ExecutorPodsWatchSnapshotSource: Received executor pod update for pod named amazon-reviews-word-count-9ee82b8169a75183-exec-3, action MODIFIED

The ExecutorPodsAllocator tries to allocate new executor pods to replace the ones terminated due to interruption. During the allocation, it figures out how many of the existing PVCs have files and can be reused:

22/06/15 23:25:23 INFO ExecutorPodsAllocator: Found 2 reusable PVCs from 10 PVCs

The ExecutorPodsAllocator requests for a pod and when it launches it, the PVC is reused. In the following example, the PVC from executor 6 is reused for new executor pod 11:

22/06/15 23:25:23 DEBUG ExecutorPodsAllocator: Requested executor with id 11 from Kubernetes.
22/06/15 23:25:24 DEBUG ExecutorPodsWatchSnapshotSource: Received executor pod update for pod named amazon-reviews-word-count-9ee82b8169a75183-exec-11, action ADDED
22/06/15 23:25:24 INFO KubernetesClientUtils: Spark configuration files loaded from Some(/usr/lib/spark/conf) : log4j.properties,spark-env.sh,hive-site.xml,metrics.properties
22/06/15 23:25:24 INFO BasicExecutorFeatureStep: Decommissioning not enabled, skipping shutdown script
22/06/15 23:25:24 DEBUG ExecutorPodsWatchSnapshotSource: Received executor pod update for pod named amazon-reviews-word-count-9ee82b8169a75183-exec-11, action MODIFIED
22/06/15 23:25:24 INFO ExecutorPodsAllocator: Reuse PersistentVolumeClaim amazon-reviews-word-count-9ee82b8169a75183-exec-6-pvc-0

The shuffle files, if present in the PVC are reused.

The key advantage of this technique is that it allows us to reuse pre-computed shuffle files in their original location, thereby reducing the time of the overall job run.

This works for both static and dynamic PVCs. Amazon EKS offers three different storage offerings, which can be encrypted too: Amazon Elastic Block Store (Amazon EBS), Amazon Elastic File System (Amazon EFS), and Amazon FSx for Lustre. We recommend using dynamic PVCs with Amazon EBS because with static PVCs, you would need to create multiple PVCs.

The following code enables this configuration; more details are available on GitHub:

"spark.kubernetes.driver.ownPersistentVolumeClaim": "true"
"spark.kubernetes.driver.reusePersistentVolumeClaim": "true"

For this to work, we need to enable PVC with Amazon EKS and mention the details in the Spark runtime configuration. For instructions, refer to How do I use persistent storage in Amazon EKS? The following code contains the Spark configuration details for using PVC as local storage; other details are available on GitHub:

"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly": "false"
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName": "OnDemand"
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass": "spark-sc"
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit": "10Gi"
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path": "/var/data/spill"

Conclusion

With Amazon EMR on EKS (6.9) and the features discussed in this post, you can further reduce the overall runtime for Spark jobs when running with Spot Instances. This also improves the overall resiliency and flexibility of the job while cost optimizing the workload on EC2 Spot.

Try out the EMR on EKS workshop for improved performance when running Spark workloads on Kubernetes and cost optimize using EC2 Spot Instances.


About the Author

Kinnar Kumar Sen is a Sr. Solutions Architect at Amazon Web Services (AWS) focusing on Flexible Compute. As a part of the EC2 Flexible Compute team, he works with customers to guide them to the most elastic and efficient compute options that are suitable for their workload running on AWS. Kinnar has more than 15 years of industry experience working in research, consultancy, engineering, and architecture.

Optimizing Cloud Infrastructure Cost and Performance with Starburst on AWS

Post Syndicated from Kinnar Kumar Sen original https://aws.amazon.com/blogs/architecture/optimizing-cloud-infrastructure-cost-and-performance-with-starburst-on-aws/

Amazon Web Services (AWS) Cloud is elastic, convenient to use, easy to consume, and makes it simple to onboard workloads. Because of this simplicity, the cost associated with onboarding workloads is sometimes overlooked.

There is a notion that when an organization moves its workload to the cloud, agility, scalability, performance, and cost issues will disappear. While this may be true for agility and scalability, you must optimize your workload. You can do this with services like Amazon EC2 Auto Scaling via Amazon Elastic Compute Cloud (Amazon EC2) and Amazon EC2 Spot Instances to realize the performance and cost benefits the cloud offers.

In this blog post, we show you how Starburst Enterprise (Starburst) addressed a sudden increase in cost for their data analytics platform as they grew their internal teams and scaled out their infrastructure. After reviewing their architecture and deployments with AWS specialist architects, Starburst and AWS concluded that they could take the following steps to greatly reduce costs:

  1. Use Spot Instances to run workloads.
  2. Add Amazon EC2 Auto Scaling into their training and demonstration environments, because the Starburst platform is designed to elastically scale up and down.

For analytics workloads, when you rein in costs, you typically rein in performance. Starburst and AWS worked together to balance the cost and performance of Starburst’s data analytics platform while also harnessing the flexibility, scalability, security, and performance of the cloud.

What is Starburst Enterprise?

Starburst provides a Massively Parallel Processing SQL (MPPSQL) engine based on open source Trino. It is an analytics platform that provides the cornerstone in customers’ intelligent data mesh and offers the following benefits and services:

  • The platform gives you a single point to access, monitor, and secure your data mesh.
  • The platform gives you options for your data compute. You no longer have to wait on data migrations or extract, transform, and load (ETL), there is no vendor lock-in, and there is no need to swap out your existing analytics tools.
  • Starburst Stargate (Stargate) ensures that large jobs are completed within each data domain of your data mesh. Only the result set is retrieved from the domain.
    • Stargate reduces data output, which reduces costs and increases performance.
    • Data governance policies can also be applied uniquely in each data domain, ensuring security compliance and federation.

As shown in Figure 1, there are many connectors for input and output that ensure you experience improved performance and security.

Starburst platform

Figure 1. Starburst platform

Integrating Starburst Enterprise with AWS

As shown in Figure 2, Starburst Enterprise uses AWS services to deliver elastic scaling and optimize cost. The platform is architected with decoupled storage and compute. This allows the platform to scale as needed to analyze petabytes of data.

The platform can be deployed via AWS CloudFormation or Amazon Elastic Kubernetes Service (Amazon EKS). Starburst on AWS allows you to run analytic queries across AWS data sources and on-premises systems such as Teradata and Oracle.

Deployment architecture of Starburst platform on AWS

Figure 2. Deployment architecture of Starburst platform on AWS

Amazon EC2 Auto Scaling

Enterprises have diverse analytic workloads; their compute and memory requirements vary with time. Starburst uses Amazon EKS and Amazon EC2 Auto Scaling to elastically scale compute resources to meet the demands of their analytics workloads.

  • Amazon EC2 Auto Scaling ensures that you have the compute capacity for your workloads to handle the load elastically. It is used to architect sophisticated, elastic, and resilient applications on the AWS Cloud.
    • Starburst uses the scheduled scaling feature of Amazon EC2 Auto Scaling to scale the cluster up/down based on time. Thus, they incur no costs when the cluster is not in use.
  • Amazon EKS is a fully managed Kubernetes service that allows you to run Kubernetes on AWS without needing to install, operate, and maintain your own Kubernetes control plane.

Scaling cloud resource consumption on demand has a major impact on controlling cloud costs. Starburst supports scaling down elastically, which means removing compute resources doesn’t impact the underlying processes.

Amazon EC2 Spot Instances

Spot Instances let you take advantage of unused EC2 capacity in the AWS Cloud. They are available at up to a 90% discount compared to On-Demand Instance prices. If EC2 needs capacity for On-Demand Instance usage, Spot Instances can be interrupted by Amazon EC2 with a two-minute notification. There are many ways to handle the interruption to ensure that the application is well architected for resilience and fault tolerance.

Starburst has integrated Spot Instances as a part of the Amazon EKS managed node groups to cost optimize the analytics workloads. This best practice of instance diversification is implemented by using the integration eksctl and instance selector with dry-run flag. This creates a list of instances of same size (vCPU/Mem ratio) and uses them in the underlying node groups.

Same size instances are required to make best use of Kubernetes Cluster Autoscaler, which is used to manage the size of the cluster.

Scaling down, handling interruptions, and provisioning compute

“Scaling in” an active application is tricky, but Starburst was built with resiliency in mind, and it can effectively manage shut downs.

Spot Instances are an ideal compute option because Starburst can handle potential interruptions natively. Starburst also uses Amazon EKS managed node groups to provision nodes in the cluster. This requires significantly less operational effort compared to using self-managed node groups. It allows Starburst to enforce best practices like capacity optimized allocation strategy, capacity rebalancing, and instance diversification.

When you need to “scale out” the deployment, Amazon EKS and Amazon EC2 Auto Scaling help to provision capacity, as depicted in Figure 3.

Depicting “scale out” in a Starburst deployment

Figure 3. Depicting “scale out” in a Starburst deployment

Benefits realized from using AWS services

In a short period, Starburst was able to increase the number of people working on AWS. They added five times the number of Solutions Architects, they have previously. Additionally, in their initial tests of their new deployment architecture, their Solutions Architects were able to complete up to three times the amount of work than they had been able to previously. Even after the workload increased more than 15 times, with two simple changes they only had a slight increase in total cost.

This cost and performance optimization allows Starburst to be more productive internally and realize value for each dollar spent. This further justified investing more into building out infrastructure footprint.

Conclusion

In building their architecture with AWS, Starburst realized the importance of having a robust and comprehensive cloud administration plan that they can implement and manage going forward. They are now able to balance the cloud costs with performance and stability, even after considering the SLA requirements. Starburst is planning to teach their customers about the Spot Instance and Amazon EC2 Auto Scaling best practices to ensure they maintain a cost and performance optimized cloud architecture.

If you want to see the Starburst data analytics platform in action, you can get a free trial in the AWS Marketplace: Starburst Data Free Trial.