Run fault tolerant and cost-optimized Spark clusters using Amazon EMR on EKS and Amazon EC2 Spot Instances

Post Syndicated from Kinnar Kumar Sen original https://aws.amazon.com/blogs/big-data/run-fault-tolerant-and-cost-optimized-spark-clusters-using-amazon-emr-on-eks-and-amazon-ec2-spot-instances/

Amazon EMR on EKS is a deployment option in Amazon EMR that allows you to run Spark jobs on Amazon Elastic Kubernetes Service (Amazon EKS). Amazon Elastic Compute Cloud (Amazon EC2) Spot Instances save you up to 90% over On-Demand Instances, and is a great way to cost optimize the Spark workloads running on Amazon EMR on EKS. Because Spot is an interruptible service, if we can move or reuse the intermediate shuffle files, it improves the overall stability and SLA of the job. The latest versions of Amazon EMR on EKS have integrated Spark features to enable this capability.

In this post, we discuss these features—Node Decommissioning and Persistent Volume Claim (PVC) reuse—and their impact on increasing the fault tolerance of Spark jobs on Amazon EMR on EKS when cost optimizing using EC2 Spot Instances.

Amazon EMR on EKS and Spot

EC2 Spot Instances are spare EC2 capacity provided at a steep discount of up to 90% over On-Demand prices. Spot Instances are a great choice for stateless and flexible workloads. The caveat with this discount and spare capacity is that Amazon EC2 can interrupt an instance with a proactive or reactive (2-minute) warning when it needs the capacity back. You can provision compute capacity in an EKS cluster using Spot Instances using a managed or self-managed node group and provide cost optimization for your workloads.

Amazon EMR on EKS uses Amazon EKS to run jobs with the EMR runtime for Apache Spark, which can be cost optimized by running the Spark executors on Spot. It provides up to 61% lower costs and up to 68% performance improvement for Spark workloads on Amazon EKS. The Spark application launches a driver and executors to run the computation. Spark is a semi-fault tolerant framework that is resilient to executor loss due to an interruption and therefore can run on EC2 Spot. On the other hand, when the driver is interrupted, the job fails. Hence, we recommend running drivers on on-demand instances. Some of the best practices for running Spark on Amazon EKS are applicable with Amazon EMR on EKS.

EC2 Spot instances also helps in cost optimization by improving the overall throughput of the job. This can be achieved by auto-scaling the cluster using Cluster Autoscaler (for managed nodegroups) or Karpenter.

Though Spark executors are resilient to Spot interruptions, the shuffle files and RDD data is lost when the executor gets killed. The lost shuffle files need to be recomputed, which increases the overall runtime of the job. Apache Spark has released two features (in versions 3.1 and 3.2) that addresses this issue. Amazon EMR on EKS released features such as node decommissioning (version 6.3) and PVC reuse (version 6.8) to simplify recovery and reuse shuffle files, which increases the overall resiliency of your application.

Node decommissioning

The node decommissioning feature works by preventing scheduling of new jobs on the nodes that are to be decommissioned. It also moves any shuffle files or cache present in those nodes to other executors (peers). If there are no other available executors, the shuffle files and cache are moved to a remote fallback storage.

Node Decommissioning

Fig 1 : Node Decommissioning

Let’s look at the decommission steps in more detail.

If one of the nodes that is running executors is interrupted, the executor starts the process of decommissioning and sends the message to the driver:

21/05/05 17:41:41 WARN KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Received executor 7 decommissioned message
21/05/05 17:41:41 DEBUG TaskSetManager: Valid locality levels for TaskSet 2.0: NO_PREF, ANY
21/05/05 17:41:41 INFO KubernetesClusterSchedulerBackend: Decommission executors: 7
21/05/05 17:41:41 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, runningTasks: 10
21/05/05 17:41:41 INFO BlockManagerMasterEndpoint: Mark BlockManagers (BlockManagerId(7, 192.168.82.107, 39007, None)) as being decommissioning.
21/05/05 20:22:17 INFO CoarseGrainedExecutorBackend: Decommission executor 1.
21/05/05 20:22:17 INFO CoarseGrainedExecutorBackend: Will exit when finished decommissioning
21/05/05 20:22:17 INFO BlockManager: Starting block manager decommissioning process...
21/05/05 20:22:17 DEBUG FileSystem: Looking for FS supporting s3a

The executor looks for RDD or shuffle files and tries to replicate or migrate those files. It first tries to find a peer executor. If successful, it will move the files to the peer executor:

22/06/07 20:41:38 INFO ShuffleStatus: Updating map output for 46 to BlockManagerId(4, 192.168.13.235, 34737, None)
22/06/07 20:41:38 DEBUG BlockManagerMasterEndpoint: Received shuffle data block update for 0 46, ignore.
22/06/07 20:41:38 DEBUG BlockManagerMasterEndpoint: Received shuffle index block update for 0 46, updating.

However, if It is not able to find a peer executor, it will try to move the files to a fallback storage if available.

Fallback Storage

Fig 2: Fallback Storage

The executor is then decommissioned. When a new executor comes up, the shuffle files are reused:

22/06/07 20:42:50 INFO BasicExecutorFeatureStep: Adding decommission script to lifecycle
22/06/07 20:42:50 DEBUG ExecutorPodsAllocator: Requested executor with id 19 from Kubernetes.
22/06/07 20:42:50 DEBUG ExecutorPodsWatchSnapshotSource: Received executor pod update for pod named amazon-reviews-word-count-bfd0a5813fd1b80f-exec-19, action ADDED
22/06/07 20:42:50 DEBUG BlockManagerMasterEndpoint: Received shuffle index block update for 0 52, updating.
22/06/07 20:42:50 INFO ShuffleStatus: Recover 52 BlockManagerId(fallback, remote, 7337, None)

The key advantage of this process is that it enables migrates blocks and shuffle data, thereby reducing recomputation, which adds to the overall resiliency of the system and reduces runtime. This process can be triggered by a Spot interruption signal (Sigterm) and node draining. Node draining  may happen due to high-priority task scheduling or independently.

When you use Amazon EMR on EKS with managed node groups/Karpenter, the Spot interruption handling is automated, wherein Amazon EKS gracefully drains and rebalances the Spot nodes to minimize application disruption when a Spot node is at elevated risk of interruption. If you’re using managed node groups/Karpenter, the decommission gets triggered when the nodes are getting drained and because it’s proactive, it gives you more time (at least 2 minutes) to move the files. In the case of self-managed node groups, we recommend installing the AWS Node Termination Handler to handle the interruption, and the decommission is triggered when the reactive (2-minute) notification is received. We recommend to use Karpenter with Spot Instances as it has faster node scheduling with early pod binding and binpacking to optimize the resource utilization.

The following code enables this configuration; more details are available on GitHub:

"spark.decommission.enabled": "true"
"spark.storage.decommission.rddBlocks.enabled": "true"
"spark.storage.decommission.shuffleBlocks.enabled" : "true"
"spark.storage.decommission.enabled": "true"
"spark.storage.decommission.fallbackStorage.path": "s3://<<bucket>>"

PVC reuse

Apache Spark enabled dynamic PVC in version 3.1, which is useful with dynamic allocation because we don’t have to pre-create the claims or volumes for the executors and delete them after completion. PVC enables true decoupling of data and processing when we’re running Spark jobs on Kubernetes, because we can use it as a local storage to spill in-process files too. The latest version of Amazon EMR 6.8 has integrated the PVC reuse feature of Spark, wherein if an executor is terminated due to EC2 Spot interruption or any other reason (JVM), then the PVC is not deleted but persisted and reattached to another executor. If there are shuffle files in that volume, then they are reused.

As with node decommission, this reduces the overall runtime because we don’t have to recompute the shuffle files. We also save the time required to request a new volume for an executor, and shuffle files can be reused without moving the files round.

The following diagram illustrates this workflow.

PVC Reuse

Fig 3: PVC Reuse

Let’s look at the steps in more detail.

If one or more of the nodes that are running executors is interrupted, the underlying pods get terminated and the driver gets the update. Note that the driver is the owner of the PVC of the executors, and they are not terminated. See the following code:

22/06/15 23:25:07 DEBUG ExecutorPodsWatchSnapshotSource: Received executor pod update for pod named amazon-reviews-word-count-9ee82b8169a75183-exec-3, action DELETED
22/06/15 23:25:07 DEBUG ExecutorPodsWatchSnapshotSource: Received executor pod update for pod named amazon-reviews-word-count-9ee82b8169a75183-exec-6, action MODIFIED
22/06/15 23:25:07 DEBUG ExecutorPodsWatchSnapshotSource: Received executor pod update for pod named amazon-reviews-word-count-9ee82b8169a75183-exec-6, action DELETED
22/06/15 23:25:07 DEBUG ExecutorPodsWatchSnapshotSource: Received executor pod update for pod named amazon-reviews-word-count-9ee82b8169a75183-exec-3, action MODIFIED

The ExecutorPodsAllocator tries to allocate new executor pods to replace the ones terminated due to interruption. During the allocation, it figures out how many of the existing PVCs have files and can be reused:

22/06/15 23:25:23 INFO ExecutorPodsAllocator: Found 2 reusable PVCs from 10 PVCs

The ExecutorPodsAllocator requests for a pod and when it launches it, the PVC is reused. In the following example, the PVC from executor 6 is reused for new executor pod 11:

22/06/15 23:25:23 DEBUG ExecutorPodsAllocator: Requested executor with id 11 from Kubernetes.
22/06/15 23:25:24 DEBUG ExecutorPodsWatchSnapshotSource: Received executor pod update for pod named amazon-reviews-word-count-9ee82b8169a75183-exec-11, action ADDED
22/06/15 23:25:24 INFO KubernetesClientUtils: Spark configuration files loaded from Some(/usr/lib/spark/conf) : log4j.properties,spark-env.sh,hive-site.xml,metrics.properties
22/06/15 23:25:24 INFO BasicExecutorFeatureStep: Decommissioning not enabled, skipping shutdown script
22/06/15 23:25:24 DEBUG ExecutorPodsWatchSnapshotSource: Received executor pod update for pod named amazon-reviews-word-count-9ee82b8169a75183-exec-11, action MODIFIED
22/06/15 23:25:24 INFO ExecutorPodsAllocator: Reuse PersistentVolumeClaim amazon-reviews-word-count-9ee82b8169a75183-exec-6-pvc-0

The shuffle files, if present in the PVC are reused.

The key advantage of this technique is that it allows us to reuse pre-computed shuffle files in their original location, thereby reducing the time of the overall job run.

This works for both static and dynamic PVCs. Amazon EKS offers three different storage offerings, which can be encrypted too: Amazon Elastic Block Store (Amazon EBS), Amazon Elastic File System (Amazon EFS), and Amazon FSx for Lustre. We recommend using dynamic PVCs with Amazon EBS because with static PVCs, you would need to create multiple PVCs.

The following code enables this configuration; more details are available on GitHub:

"spark.kubernetes.driver.ownPersistentVolumeClaim": "true"
"spark.kubernetes.driver.reusePersistentVolumeClaim": "true"

For this to work, we need to enable PVC with Amazon EKS and mention the details in the Spark runtime configuration. For instructions, refer to How do I use persistent storage in Amazon EKS? The following code contains the Spark configuration details for using PVC as local storage; other details are available on GitHub:

"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly": "false"
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName": "OnDemand"
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass": "spark-sc"
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit": "10Gi"
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path": "/var/data/spill"

Conclusion

With Amazon EMR on EKS (6.9) and the features discussed in this post, you can further reduce the overall runtime for Spark jobs when running with Spot Instances. This also improves the overall resiliency and flexibility of the job while cost optimizing the workload on EC2 Spot.

Try out the EMR on EKS workshop for improved performance when running Spark workloads on Kubernetes and cost optimize using EC2 Spot Instances.


About the Author

Kinnar Kumar Sen is a Sr. Solutions Architect at Amazon Web Services (AWS) focusing on Flexible Compute. As a part of the EC2 Flexible Compute team, he works with customers to guide them to the most elastic and efficient compute options that are suitable for their workload running on AWS. Kinnar has more than 15 years of industry experience working in research, consultancy, engineering, and architecture.