All posts by Lotfi Mouhib

Cost monitoring for Amazon EMR on Amazon EKS

Post Syndicated from Lotfi Mouhib original https://aws.amazon.com/blogs/big-data/cost-monitoring-for-amazon-emr-on-amazon-eks/

Amazon EMR is the industry-leading cloud big data solution, providing a collection of open-source frameworks such as Spark, Hive, Hudi, and Presto, fully managed and with per-second billing. Amazon EMR on Amazon EKS is a deployment option allowing you to deploy Amazon EMR on the same Amazon Elastic Kubernetes Service (Amazon EKS) clusters that is multi-tenant and used by other applications, improving resource utilization, reducing cost, and simplifying infrastructure management. EMR on EKS provide you up to 5.37 times better performance than OSS Spark v3.3.1 with 76.8% cost savings. It also provides a wide variety of job submission methods, like an AWS API called StartJobRun, or through a declarative way with a Kubernetes controller through the AWS Controllers for Kubernetes for Amazon EMR on EKS.

This consolidation comes with a trade-off of increased difficulty measuring fine-grained costs for showback or chargeback by team or application. According to a CNCF and FinOps Foundation survey, 68% of Kubernetes users either rely on monthly estimates or don’t monitor Kubernetes costs at all. And for respondents reporting active Kubernetes cost monitoring, AWS Cost Explorer and Kubecost were ranked as the most popular tools being used.

Currently, you can distribute costs per tenant using a hard multi-tenancy with separate EKS clusters in dedicated AWS accounts or a soft multi-tenancy using separate node groups in a shared EKS cluster. To reduce costs and improve resource utilization, you can use namespace-based segregation, where nodes are shared across different namespaces. However, calculating and attributing costs to teams by workload or namespaces while taking into account compute optimization (like Saving Plans or Spot Instance cost) and the cost of AWS services like EMR on EKS is a challenging and non-trivial task.

In this post, we present a cost chargeback solution for EMR on EKS that combines the AWS-native capabilities of AWS Cost and Usage Reports (AWS CUR) alongside the in-depth Kubernetes cost visibility and insights using Kubecost on Amazon EKS.

Solution overview

A job in EMR on EKS incur costs mainly on two dimensions: compute resources and a marginal uplift charge for EMR on EKS usage. To track the cost associated with each of the dimensions, we use data from three sources:

  • AWS CUR – We use this to get the EMR on EKS cost uplift per job and for Kubecost to reconcile the compute cost with any saving plans or reserved instance used. The supporting infrastructure for CUR is deployed as defined in Setting up Athena using AWS CloudFormation templates.
  • Kubecost – We use this to get the compute cost incurred by the executor and driver pods.

The cost allocation process includes the following components:

  • The compute cost is provided by Kubecost. However, in order to do an in-depth analysis, we define an hourly Kubernetes CronJob on it that starts a pod to retrieve data from Kubecost and stores it in Amazon Simple Storage Service (Amazon S3).
  • CUR files are stored in an S3 bucket.
  • We use Amazon Athena to create a view and provide a consolidated view of the total cost to run an EMR on EKS job.
  • Finally, you can connect your preferred business intelligence tools using the JDBC or ODBC connections to Athena. In this post, we use Amazon QuickSight native integration for visualization purposes.

The following diagram shows the overall architecture as well as how the different components interact with each other.

emr-eks-cost-tracking-architecture

We provide a shell script to deploy our the tracking solution. The shell script configures the infrastructure using an AWS CloudFormation template, the AWS Command Line Interface (AWS CLI), and eksctl and kubectl commands. This script runs the following actions:

  1. Start the CloudFormation deployment.
  2. Create and configure an AWS Cost and Usage Report.
  3. Configure and deploy Kubecost backed by Amazon Managed Service for Prometheus.
  4. Deploy a Kubernetes CronJob.

Prerequisites

You need the following prerequisites:

This post assumes you already have an EKS cluster and run EMR on EKS jobs. If you don’t have an EKS cluster ready to test the solution, we suggest starting with a standard EMR on EKS blueprint that configures a cluster to submit EMR on EKS jobs.

Set up the solution

To run the shell script, complete the following steps:

  1. Clone the following GitHub repository.
  2. Go to the folder cost-tracking with the following command:

cd cost-tracking

  1. Run the script with following command :

sh deploy-emr-eks-cost-tracking.sh REGION KUBECOST-VERSION EKS-CLUSTER-NAME ACCOUNT-ID

After you run the script, you’re ready to use Kubecost and the CUR data to understand the cost associated with your EMR on EKS jobs.

Tracking cost

In this section, we show you how to analyze the compute cost that is retrieved from Kubecost, how to query EMR on EKS uplift data, and how to combine them to have a single consolidated view for the cost.

Compute cost

Kubecost offers various ways to track cost per Kubernetes object. For example, you can track cost by pod, controller, job, label, or deployment. It also allows you to understand the cost of idle resources, like Amazon Elastic Compute Cloud (Amazon EC2) instances that aren’t fully utilized by pods. In this post, we assume that no nodes are provisioned if no EMR on EKS job is running, and we use the Karpenter Cluster Autoscaler to provision nodes when jobs are submitted. Karpenter also does bin packing, which optimizes the EC2 resource utilization and in turn reduces the cost of idle resources.

To track compute cost associated with EMR on EKS pods, we query the Kubecost allocation API by passing pod and labels in the aggregate parameter. We use the emr-containers.amazonaws.com/job.id and emr-containers.amazonaws.com/virtual-cluster-id labels that are always present in executor and driver pods. The labels are used to filter Kubecost data to get only the cost associated with EMR on EKS pods. You can review various levels of granularity at the pod, job, and virtual cluster level to understand the cost of a driver vs. executor, or of using Spot Instances in jobs. You can also use the virtual cluster cost to understand the overall cost of a EMR on EMR when it’s used in a namespace that is used by applications other than EMR on EKS.

We also provide the instance_id, instance size, and capacity type (On-Demand or Spot) that was used to run the pod. This is retrieved through querying the Kubecost assets API. This data can be useful to understand how you run your jobs and which capacity you use more often.

The data about the cost of running the pods as well as the assets is retrieved with a Kubernetes CronJob that submits the request to the Kubecost API, joins the two data sources (allocation and assets data) on the instance_id, cleans the data, and stores it in Amazon S3 in CSV format.

The compute cost data has multiple fields that are of interest, including cpucost, ramcost (cost of memory), pvcost (cost of Amazon EBS storage), efficiency of use of CPU and RAM, as well as total cost, which represents the aggregate cost of all the resources used, either at pod, job, or virtual cluster level.

To view this data, complete the following steps:

  1. On the Athena console, navigate to the query editor.
  2. Choose athenacurcfn_c_u_r for the database and cost_data for the table.
  3. Run the following query:
SELECT job_id,
vc_id,
sum(totalcost) as cost
FROM "athenacurcfn_c_u_r"."compute_cost"
GROUP BY job_id, vc_id

The following screenshot shows the query results.

To query the data about information at the pod level, you can run the following SQL statement:

SELECT
split_part(name, '/', 1) as pod_name,
job_id,
vc_id,
totalcost,
instance_id,
"properties.labels.node_kubernetes_io_instance_type",
capacity_type
FROM "athenacurcfn_c_u_r"."compute_cost";

EMR on EKS uplift

The cost associated with EMR on EKS uplift is available through AWS CUT and is stored in an S3 bucket. The script you ran in the setup step created an Athena table associated to the data in the S3 bucket. The following steps take you through how you can query the data:

  1. On the Athena console, navigate to the query editor.
  2. Choose athenacurcfn_c_u_r for the database and cur_data for the table.
  3. Run the following query:
SELECT
split_part(line_item_resource_id, '/', 5) as job_id,
split_part(line_item_resource_id, '/', 3) as vc_id,
sum(line_item_blended_cost) as cost
FROM athenacurcfn_c_u_r.automated
WHERE product_product_family='EMR Containers'
GROUP BY line_item_resource_id

This query provides you with the cost per job. The following screenshot shows the results.

You will have to wait up to 24 hours for the CUR data to be available. As such, you should only run the preceding query after the CUR data is available and you have run the EMR on EKS jobs.

Overall cost

To view the overall cost and perform analysis on it, create a view in Athena as follows:

CREATE VIEW emr_eks_cost AS
SELECT
split_part(line_item_resource_id, '/', 5) as job_id,
split_part(line_item_resource_id, '/', 3) as vc_id,
sum(line_item_blended_cost) as cost,
'emr-uplift' as category
FROM athenacurcfn_c_u_r.cur_data
WHERE product_product_family='EMR Containers'
GROUP BY line_item_resource_id
UNION
SELECT
job_id,
vc_id,
sum(totalCost) as cost,
'compute' as category
FROM "athenacurcfn_c_u_r"."compute_cost"
group by job_id, vc_id

Now that the view is created, you can query and analyze the cost of running your EMR on EKS jobs:

SELECT sum(cost) as total_cost, job_id, vc_id
FROM "athenacurcfn_c_u_r"."emr_eks_cost"
GROUP BY job_id, vc_id;

The following screenshot shows an example output of the query on the created view.

Lastly, you can use QuickSight for a graphical high-level view on your EMR on EKS spend. The following screenshot shows an example dashboard.

emr-eks-compute-cost-quicksight-dashboard

You can now adapt this solution to your specific needs and build your custom analysis.

Clean up

Throughout this post, you deployed and configured the required infrastructure components to track cost for your EMR on EKS workloads. To avoid incurring additional charges for this solution, delete all the resources you created:

  1. Empty the S3 buckets cost-data-REGION-ACCOUNT_ID and aws-athena-query-results-cur-REGION-ACCOUNT_ID.
  2. Delete the Athena workgroup kubecost-cur-workgroup.
  3. Empty and delete the ECR repository emreks-compute-cost-exporter.
  4. Run the script destroy-emr-eks-cost-tracking.sh, which will delete the AWS CloudFormation deployment, uninstall Kubecost, delete the CronJob, and delete the Cost and Usage Reports.

Conclusion

In this post, we showed how you can use Kubecost capabilities alongside Cost and Usage Reports to closely monitor the costs for Amazon EMR on EKS per virtual cluster or per job. This solution allows you to achieve more granular costs for chargebacks using Athena, Amazon Managed Service for Prometheus, and QuickSight.

The solution presented steps to set up Cost and Usage Reports and Kubecost, and configure a CronJob on an hourly basis to get the cost of running pods spun by EMR on EKS. You can modify the presented solution to run at longer intervals or to collect data on different EKS clusters. You can also modify the Python script run by the CronJob to further clean data or reduce the amount of data stored by eliminating fields you don’t need. You can use the insights provided to drive cost optimization efforts over time, detect any increase of costs, and measure the impact of new deployments or particular events on resource usage and cost performance. For more information about integrating EMR on EKS in your existing Amazon EKS deployment, refer to Design considerations for Amazon EMR on EKS in a multi-tenant Amazon EKS environment


About the Authors

Lotfi Mouhib is a Senior Solutions Architect working for the Public Sector team with Amazon Web Services. He helps public sector customers across EMEA realize their ideas, build new services, and innovate for citizens. In his spare time, Lotfi enjoys cycling and running.

Hamza Mimi Principal Solutions Architect in the French Public sector team at Amazon Web Services (AWS). With a long experience in the telecommunications industry. He is currently working as a customer advisor on topics ranging from digital transformation to architectural guidance.

Introducing Amazon EMR on EKS job submission with Spark Operator and spark-submit

Post Syndicated from Lotfi Mouhib original https://aws.amazon.com/blogs/big-data/introducing-amazon-emr-on-eks-job-submission-with-spark-operator-and-spark-submit/

Amazon EMR on EKS provides a deployment option for Amazon EMR that allows organizations to run open-source big data frameworks on Amazon Elastic Kubernetes Service (Amazon EKS). With EMR on EKS, Spark applications run on the Amazon EMR runtime for Apache Spark. This performance-optimized runtime offered by Amazon EMR makes your Spark jobs run fast and cost-effectively. The EMR runtime provides up to 5.37 times better performance and 76.8% cost savings, when compared to using open-source Apache Spark on Amazon EKS.

Building on the success of Amazon EMR on EKS, customers have been running and managing jobs using the emr-containers API, creating EMR virtual clusters, and submitting jobs to the EKS cluster, either through the AWS Command Line Interface (AWS CLI) or Apache Airflow scheduler. However, other customers running Spark applications have chosen Spark Operator or native spark-submit to define and run Apache Spark jobs on Amazon EKS, but without taking advantage of the performance gains from running Spark on the optimized EMR runtime. In response to this need, starting from EMR 6.10, we have introduced a new feature that lets you use the optimized EMR runtime while submitting and managing Spark jobs through either Spark Operator or spark-submit. This means that anyone running Spark workloads on EKS can take advantage of EMR’s optimized runtime.

In this post, we walk through the process of setting up and running Spark jobs using both Spark Operator and spark-submit, integrated with the EMR runtime feature. We provide step-by-step instructions to assist you in setting up the infrastructure and submitting a job with both methods. Additionally, you can use the Data on EKS blueprint to deploy the entire infrastructure using Terraform templates.

Infrastructure overview

In this post, we walk through the process of deploying a comprehensive solution using eksctl, Helm, and AWS CLI. Our deployment includes the following resources:

  • A VPC, EKS cluster, and managed node group, set up with the eksctl tool
  • Essential Amazon EKS managed add-ons, such as the VPC CNI, CoreDNS, and KubeProxy set up with the eksctl tool
  • Cluster Autoscaler and Spark Operator add-ons, set up using Helm
  • A Spark job execution AWS Identity and Access Management (IAM) role, IAM policy for Amazon Simple Storage Service (Amazon S3) bucket access, service account, and role-based access control, set up using the AWS CLI and eksctl

Prerequisites

Verify that the following prerequisites are installed on your machine:

Set up AWS credentials

Before proceeding to the next step and running the eksctl command, you need to set up your local AWS credentials profile. For instructions, refer to Configuration and credential file settings.

Deploy the VPC, EKS cluster, and managed add-ons

The following configuration uses us-west-1 as the default Region. To run in a different Region, update the region and availabilityZones fields accordingly. Also, verify that the same Region is used in the subsequent steps throughout the post.

Enter the following code snippet into the terminal where your AWS credentials are set up. Make sure to update the publicAccessCIDRs field with your IP before you run the command below. This will create a file named eks-cluster.yaml:

cat <<EOF >eks-cluster.yaml
---
apiVersion: eksctl.io/v1alpha5
kind: ClusterConfig
metadata:
  name: emr-spark-operator
  region: us-west-1 # replace with your region
  version: "1.25"
vpc:
  clusterEndpoints:
    publicAccess: true
    privateAccess: true
  publicAccessCIDRs: ["YOUR-IP/32"]
availabilityZones: ["us-west-1a","us-west-1b"] # replace with your region
iam:
  withOIDC: true
  serviceAccounts:
  - metadata:
      name: cluster-autoscaler
      namespace: kube-system
    wellKnownPolicies:
      autoScaler: true
    roleName: eksctl-cluster-autoscaler-role
managedNodeGroups:
  - name: m5x
    instanceType: m5.xlarge
    availabilityZones: ["us-west-1a"]
    volumeSize: 100
    volumeType: gp3
    minSize: 2
    desiredCapacity: 2
    maxSize: 10
    tags:
      k8s.io/cluster-autoscaler/enabled: "true"
      k8s.io/cluster-autoscaler/eks-nvme: "owned" 
addons:
  - name: vpc-cni
    version: latest
  - name: coredns
    version: latest
  - name: kube-proxy
    version: latest
cloudWatch:
  clusterLogging:
    enableTypes: ["*"]
EOF

Use the following command to create the EKS cluster : eksctl create cluster -f eks-cluster.yaml

Deploy Cluster Autoscaler

Cluster Autoscaler is crucial for automatically adjusting the size of your Kubernetes cluster based on the current resource demands, optimizing resource utilization and cost. Create an autoscaler-helm-values.yaml file and install the Cluster Autoscaler using Helm:

cat <<EOF >autoscaler-helm-values.yaml
---
autoDiscovery:
    clusterName: emr-spark-operator
    tags:
      - k8s.io/cluster-autoscaler/enabled
      - k8s.io/cluster-autoscaler/{{ .Values.autoDiscovery.clusterName }}
awsRegion: us-west-1 # Make sure the region same as the EKS Cluster
rbac:
  serviceAccount:
    create: false
    name: cluster-autoscaler
EOF
helm repo add autoscaler https://kubernetes.github.io/autoscaler
helm install nodescaler autoscaler/cluster-autoscaler \
--namespace kube-system \
--values autoscaler-helm-values.yaml --debug

You can also set up Karpenter as a cluster autoscaler to automatically launch the right compute resources to handle your EKS cluster’s applications. You can follow this blog on how to setup and configure Karpenter.

Deploy Spark Operator

Spark Operator is an open-source Kubernetes operator specifically designed to manage and monitor Spark applications running on Kubernetes. It streamlines the process of deploying and managing Spark jobs, by providing a Kubernetes custom resource to define, configure and run Spark applications, as well as manage the job life cycle through Kubernetes API. Some customers prefer using Spark Operator to manage Spark jobs because it enables them to manage Spark applications just like other Kubernetes resources.

Currently, customers are building their open-source Spark images and using S3a committers as part of job submissions with Spark Operator or spark-submit. However, with the new job submission option, you can now benefit from the EMR runtime in conjunction with EMRFS. Starting with Amazon EMR 6.10 and for each upcoming version of the EMR runtime, we will release the Spark Operator and its Helm chart to use the EMR runtime.

In this section, we show you how to deploy a Spark Operator Helm chart from an Amazon Elastic Container Registry (Amazon ECR) repository and submit jobs using EMR runtime images, benefiting from the performance enhancements provided by the EMR runtime.

Install Spark Operator with Helm from Amazon ECR

The Spark Operator Helm chart is stored in an ECR repository. To install the Spark Operator, you first need to authenticate your Helm client with the ECR repository. The charts are stored under the following path: ECR_URI/spark-operator.

Authenticate your Helm client and install the Spark Operator:

aws ecr get-login-password \
--region us-west-1 | helm registry login \
--username AWS \
--password-stdin 608033475327.dkr.ecr.us-west-1.amazonaws.com

You can authenticate to other EMR on EKS supported Regions by obtaining the AWS account ID for the corresponding Region. For more information, refer to how to select a base image URI.

Install Spark Operator

You can now install Spark Operator using the following command:

helm install spark-operator-demo \
oci://608033475327.dkr.ecr.us-west-1.amazonaws.com/spark-operator \
--set emrContainers.awsRegion=us-west-1 \
--version 1.1.26-amzn-0 \
--set serviceAccounts.spark.create=false \
--namespace spark-operator \
--create-namespace

To verify that the operator has been installed correctly, run the following command:

helm list --namespace spark-operator -o yaml

Set up the Spark job execution role and service account

In this step, we create a Spark job execution IAM role and a service account, which will be used in Spark Operator and spark-submit job submission examples.

First, we create an IAM policy that will be used by the IAM Roles for Service Accounts (IRSA). This policy enables the driver and executor pods to access the AWS services specified in the policy. Complete the following steps:

  1. As a prerequisite, either create an S3 bucket (aws s3api create-bucket --bucket <ENTER-S3-BUCKET> --create-bucket-configuration LocationConstraint=us-west-1 --region us-west-1) or use an existing S3 bucket. Replace <ENTER-S3-BUCKET> in the following code with the bucket name.
  2. Create a policy file that allows read and write access to an S3 bucket:
    cat >job-execution-policy.json <<EOL
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:ListBucket",
                    "s3:PutObject",
                    "s3:DeleteObject",
                    "s3:AbortMultipartUpload",
                    "s3:ListMultipartUploadParts"
                ],
                "Resource": [
                    "arn:aws:s3:::<ENTER-S3-BUCKET>",
                    "arn:aws:s3:::<ENTER-S3-BUCKET>/*",
                    "arn:aws:s3:::aws-data-lake-workshop/*",
                    "arn:aws:s3:::nyc-tlc",
                    "arn:aws:s3:::nyc-tlc/*"
                ]
            }
        ]
    }
    EOL

  3. Create the IAM policy with the following command:
    aws iam create-policy --policy-name emr-job-execution-policy --policy-document file://job-execution-policy.json

  4. Next, create the service account named emr-job-execution-sa-role as well as the IAM roles. The following eksctl command creates a service account scoped to the namespace and service account defined to be used by the executor and driver. Make sure to replace <ENTER_YOUR_ACCOUNT_ID> with your account ID before running the command:
    eksctl create iamserviceaccount \
    --cluster=emr-spark-operator \
    --region us-west-1 \
    --name=emr-job-execution-sa \
    --attach-policy-arn=arn:aws:iam::<ENTER_YOUR_ACCOUNT_ID>:policy/emr-job-execution-policy \
    --role-name=emr-job-execution-irsa \
    --namespace=data-team-a \
    --approve

  5. Create an S3 bucket policy to allow only the execution role create in step 4 to write and read from the S3 bucket create in step 1. Make sure to replace <ENTER_YOUR_ACCOUNT_ID> with your account ID before running the command:
    cat > bucketpolicy.json<<EOL
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:ListBucket",
                    "s3:PutObject",
                    "s3:DeleteObject",
                    "s3:AbortMultipartUpload",
                    "s3:ListMultipartUploadParts"
                ], "Principal": {
                    "AWS": "arn:aws:iam::<ENTER_YOUR_ACCOUNT_ID>:role/emr-job-execution-irsa"
                },
                "Resource": [
                    "arn:aws:s3:::<ENTER-S3-BUCKET>",
                    "arn:aws:s3:::<ENTER-S3-BUCKET>/*"
                ]
            }
        ]
    }
    EOL
    
    aws s3api put-bucket-policy --bucket ENTER-S3-BUCKET-NAME --policy file://bucketpolicy.json

  6. Create a Kubernetes role and role binding required for the service account used in the Spark job run:
    cat <<EOF >emr-job-execution-rbac.yaml
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: Role
    metadata:
      name: emr-job-execution-sa-role
      namespace: data-team-a
    rules:
      - apiGroups: ["", "batch","extensions"]
        resources: ["configmaps","serviceaccounts","events","pods","pods/exec","pods/log","pods/portforward","secrets","services","persistentvolumeclaims"]
        verbs: ["create","delete","get","list","patch","update","watch"]
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: RoleBinding
    metadata:
      name: emr-job-execution-sa-rb
      namespace: data-team-a
    roleRef:
      apiGroup: rbac.authorization.k8s.io
      kind: Role
      name: emr-job-execution-sa-role
    subjects:
      - kind: ServiceAccount
        name: emr-job-execution-sa
        namespace: data-team-a
    EOF

  7. Apply the Kubernetes role and role binding definition with the following command:
kubectl apply -f emr-job-execution-rbac.yaml

So far, we have completed the infrastructure setup, including the Spark job execution roles. In the following steps, we run sample Spark jobs using both Spark Operator and spark-submit with the EMR runtime.

Configure the Spark Operator job with the EMR runtime

In this section, we present a sample Spark job that reads data from public datasets stored in S3 buckets, processes them, and writes the results to your own S3 bucket. Make sure that you update the S3 bucket in the following configuration by replacing <ENTER_S3_BUCKET> with the URI to your own S3 bucket refered in step 2 of the “Set up the Spark job execution role and service account section. Also, note that we are using data-team-a as a namespace and emr-job-execution-sa as a service account, which we created in the previous step. These are necessary to run the Spark job pods in the dedicated namespace, and the IAM role linked to the service account is used to access the S3 bucket for reading and writing data.

Most importantly, notice the image field with the EMR optimized runtime Docker image, which is currently set to emr-6.10.0. You can change this to a newer version when it’s released by the Amazon EMR team. Also, when configuring your jobs, make sure that you include the sparkConf and hadoopConf settings as defined in the following manifest. These configurations enable you to benefit from EMR runtime performance, AWS Glue Data Catalog integration, and the EMRFS optimized connector.

  1. Create the file (emr-spark-operator-example.yaml) locally and update the S3 bucket location so that you can submit the job as part of the next step:
    cat <<EOF >emr-spark-operator-example.yaml
    ---
    apiVersion: "sparkoperator.k8s.io/v1beta2"
    kind: SparkApplication
    metadata:
      name: taxi-example
      namespace: data-team-a
    spec:
      type: Scala
      mode: cluster
      # EMR optimized runtime image
      image: "483788554619.dkr.ecr.eu-west-1.amazonaws.com/spark/emr-6.10.0:latest"
      imagePullPolicy: Always
      mainClass: ValueZones
      mainApplicationFile: s3://aws-data-lake-workshop/spark-eks/spark-eks-assembly-3.3.0.jar
      arguments:
        - s3://nyc-tlc/csv_backup
        - "2017"
        - s3://nyc-tlc/misc/taxi _zone_lookup.csv
        - s3://<ENTER_S3_BUCKET>/emr-eks-results
        - emr_eks_demo
      hadoopConf:
        # EMRFS filesystem config
        fs.s3.customAWSCredentialsProvider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider
        fs.s3.impl: com.amazon.ws.emr.hadoop.fs.EmrFileSystem
        fs.AbstractFileSystem.s3.impl: org.apache.hadoop.fs.s3.EMRFSDelegate
        fs.s3.buffer.dir: /mnt/s3
        fs.s3.getObject.initialSocketTimeoutMilliseconds: "2000"
        mapreduce.fileoutputcommitter.algorithm.version.emr_internal_use_only.EmrFileSystem: "2"
        mapreduce.fileoutputcommitter.cleanup-failures.ignored.emr_internal_use_only.EmrFileSystem: "true"
      sparkConf:
        spark.eventLog.enabled: "true"
        spark.eventLog.dir: "s3://<ENTER_S3_BUCKET>/"
        spark.kubernetes.driver.pod.name: driver-nyc-taxi-etl
        # Required for EMR Runtime and Glue Catalogue
        spark.driver.extraClassPath: /usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/home/hadoop/extrajars/*
        spark.driver.extraLibraryPath: /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native
        spark.executor.extraClassPath: /usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/home/hadoop/extrajars/*
        spark.executor.extraLibraryPath: /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native
        # EMRFS commiter
        spark.sql.parquet.output.committer.class: com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
        spark.sql.parquet.fs.optimized.committer.optimization-enabled: "true"
        spark.sql.emr.internal.extensions: com.amazonaws.emr.spark.EmrSparkSessionExtensions
        spark.executor.defaultJavaOptions: -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseParallelGC -XX:InitiatingHeapOccupancyPercent=70 -XX:OnOutOfMemoryError='kill -9 %p'
        spark.driver.defaultJavaOptions:  -XX:OnOutOfMemoryError='kill -9 %p' -XX:+UseParallelGC -XX:InitiatingHeapOccupancyPercent=70
      sparkVersion: "3.3.1"
      restartPolicy:
        type: Never
      driver:
        cores: 1
        memory: "4g"
        serviceAccount: emr-job-execution-sa
      executor:
        cores: 1
        instances: 4
        memory: "4g"
        serviceAccount: emr-job-execution-sa
    EOF

  2. Run the following command to submit the job to the EKS cluster:
    kubectl apply -f emr-spark-operator-example.yaml

    The job may take 4–5 minutes to complete, and you can verify the successful message in the driver pod logs.

  3. Verify the job by running the following command:
kubectl get pods -n data-team-a

Enable access to the Spark UI

The Spark UI is an important tool for data engineers because it allows you to track the progress of tasks, view detailed job and stage information, and analyze resource utilization to identify bottlenecks and optimize your code. For Spark jobs running on Kubernetes, the Spark UI is hosted on the driver pod and its access is restricted to the internal network of Kubernetes. To access it, we need to forward the traffic to the pod with kubectl. The following steps take you through how to set it up.

Run the following command to forward traffic to the driver pod:

kubectl port-forward <driver-pod-name> 4040:4040

You should see text similar to the following:

Forwarding from 127.0.0.1:4040 -> 4040
Forwarding from [::1]:4040 → 4040

If you didn’t specify the driver pod name at the submission of the SparkApplication, you can get it with the following command:

kubectl get pods -l spark-role=driver,spark-app-name=<your-spark-app-name> -o jsonpath='{.items[0].metadata.name}'

Open a browser and enter http://localhost:4040 in the address bar. You should be able to connect to the Spark UI.

spark-ui-screenshot

Spark History Server

If you want to explore your job after its run, you can view it through the Spark History Server. The preceding SparkApplication definition has the event log enabled and stores the events in an S3 bucket with the following path: s3://YOUR-S3-BUCKET/. For instructions on setting up the Spark History Server and exploring the logs, refer to Launching the Spark history server and viewing the Spark UI using Docker.

spark-submit

spark-submit is a command line interface for running Apache Spark applications on a cluster or locally. It allows you to submit applications to Spark clusters. The tool enables simple configuration of application properties, resource allocation, and custom libraries, streamlining the deployment and management of Spark jobs.

Beginning with Amazon EMR 6.10, spark-submit is supported as a job submission method. This method currently only supports cluster mode as the submission mechanism. To submit jobs using the spark-submit method, we reuse the IAM role for the service account we set up earlier. We also use the S3 bucket used for the Spark Operator method. The steps in this section take you through how to configure and submit jobs with spark-submit and benefit from EMR runtime improvements.

  1. In order to submit a job, you need to use the Spark version that matches the one available in Amazon EMR. For Amazon EMR 6.10, you need to download the Spark 3.3 version.
  2. You also need to make sure you have Java installed in your environment.
  3. Unzip the file and navigate to the root of the Spark directory.
  4. In the following code, replace the EKS endpoint as well as the S3 bucket then run the script:
./bin/spark-submit \
--class ValueZones \
--master k8s://EKS-ENDPOINT \
--conf spark.kubernetes.namespace=data-team-a \
--conf spark.kubernetes.container.image=608033475327.dkr.ecr.us-west-1.amazonaws.com/spark/emr-6.10.0:latest \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=emr-job-execution-sa \
--conf spark.kubernetes.authenticate.executor.serviceAccountName=emr-job-execution-sa \
--conf spark.driver.extraClassPath="/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/home/hadoop/extrajars/*" \
--conf spark.driver.extraLibraryPath="/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native" \
--conf spark.executor.extraClassPath="/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/home/hadoop/extrajars/*" \
--conf spark.executor.extraLibraryPath="/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native" \
--conf spark.hadoop.fs.s3.customAWSCredentialsProvider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider \
--conf spark.hadoop.fs.s3.impl=com.amazon.ws.emr.hadoop.fs.EmrFileSystem \
--conf spark.hadoop.fs.AbstractFileSystem.s3.impl=org.apache.hadoop.fs.s3.EMRFSDelegate \
--conf spark.hadoop.fs.s3.buffer.dir=/mnt/s3 \
--conf spark.hadoop.fs.s3n.impl=com.amazon.ws.emr.hadoop.fs.EmrFileSystem \
--deploy-mode cluster \
s3://aws-data-lake-workshop/spark-eks/spark-eks-assembly-3.3.0.jar s3://nyc-tlc/csv_backup 2017 s3://nyc-tlc/misc/taxi_zone_lookup.csv s3://S3_BUCKET/emr-eks-results emr_eks_demo

The job takes about 7 minutes to complete with two executors of one core and 1 G of memory.

Using custom kubernetes schedulers

Customers running a large volume of jobs concurrently might face challenges related to providing fair access to compute capacity that they aren’t able to solve with the standard scheduling and resource utilization management Kubernetes offers. In addition, customers that are migrating from Amazon EMR on Amazon Elastic Compute Cloud (Amazon EC2) and are managing their scheduling with YARN queues will not be able to transpose them to Kubernetes scheduling capabilities.

To overcome this issue, you can use custom schedulers like Apache Yunikorn or Volcano.Spark Operator natively supports these schedulers, and with them you can schedule Spark applications based on factors such as priority, resource requirements, and fairness policies, while Spark Operator simplifies application deployment and management. To set up Yunikorn with gang scheduling and use it in Spark applications submitted through Spark Operator, refer to Spark Operator with YuniKorn.

Clean up

To avoid unwanted charges to your AWS account, delete all the AWS resources created during this deployment:

eksctl delete cluster -f eks-cluster.yaml

Conclusion

In this post, we introduced the EMR runtime feature for Spark Operator and spark-submit, and explored the advantages of using this feature on an EKS cluster. With the optimized EMR runtime, you can significantly enhance the performance of your Spark applications while optimizing costs. We demonstrated the deployment of the cluster using the eksctl tool, , you can also use the Data on EKS blueprints for deploying a production-ready EKS which you can use for EMR on EKS and leverage these new deployment methods in addition to the EMR on EKS API job submission method. By running your applications on the optimized EMR runtime, you can further enhance your Spark application workflows and drive innovation in your data processing pipelines.


About the Authors

Lotfi Mouhib is a Senior Solutions Architect working for the Public Sector team with Amazon Web Services. He helps public sector customers across EMEA realize their ideas, build new services, and innovate for citizens. In his spare time, Lotfi enjoys cycling and running.

Vara Bonthu is a dedicated technology professional and Worldwide Tech Leader for Data on EKS, specializing in assisting AWS customers ranging from strategic accounts to diverse organizations. He is passionate about open-source technologies, data analytics, AI/ML, and Kubernetes, and boasts an extensive background in development, DevOps, and architecture. Vara’s primary focus is on building highly scalable data and AI/ML solutions on Kubernetes platforms, helping customers harness the full potential of cutting-edge technology for their data-driven pursuits.

Accelerate your data exploration and experimentation with the AWS Analytics Reference Architecture library

Post Syndicated from Lotfi Mouhib original https://aws.amazon.com/blogs/big-data/accelerate-your-data-exploration-and-experimentation-with-the-aws-analytics-reference-architecture-library/

Organizations use their data to solve complex problems by starting small, running iterative experiments, and refining the solution. Although the power of experiments can’t be ignored, organizations have to be cautious about the cost-effectiveness of such experiments. If time is spent creating the underlying infrastructure for enabling experiments, it further adds to the cost.

Developers need an integrated development environment (IDE) for data exploration and debugging of workflows, and different compute profiles for running these workflows. If you choose Amazon EMR for such use cases, you can use an IDE called Amazon EMR Studio for data exploration, transformation, version control, and debugging, and run Spark jobs to process large volume of data. Deploying Amazon EMR on Amazon EKS simplifies management, reduces costs, and improves performance. However, a data engineer or IT administrator needs to spend time creating the underlying infrastructure, configuring security, and creating a managed endpoint for users to connect to. This means such projects have to wait until these experts create the infrastructure.

In this post, we show how a data engineer or IT administrator can use the AWS Analytics Reference Architecture (ARA) to accelerate infrastructure deployment, saving your organization both time and money spent on these data analytics experiments. We use the library to deploy an Amazon Elastic Kubernetes (Amazon EKS) cluster, configure it to use Amazon EMR on EKS, and deploy a virtual cluster and managed endpoints and EMR Studio. You can then either run jobs on the virtual cluster or run exploratory data analysis with Jupyter notebooks on Amazon EMR Studio and Amazon EMR on EKS. The architecture below represent the infrastructure you will deploy with the AWS Analytics Reference Architecture.

cdk-emr-eks-studio-architecture

Prerequisites

To follow along, you need to have an AWS account that is bootstrapped with the AWS Cloud Development Kit (AWS CDK). For instructions, refer to Bootstrapping. The following tutorial uses TypeScript, and requires version 2 or later of the AWS CDK. If you don’t have the AWS CDK installed, refer to Install the AWS CDK.

Set up an AWS CDK project

To deploy resources using the ARA, you first need to set up an AWS CDK project and install the ARA library. Complete the following steps:

  1. Create a folder named emr-eks-app:
    mkdir emr-eks-app && cd emr-eks-app

  2. Initialize an AWS CDK project in an empty directory and run the following command:
    cdk init app --language typescript

  3. Install the ARA library:
    npm install aws-analytics-reference-architecture --save

  4. In lib/emr-eks-app.ts, import the ARA library as follows. The first line calls the ARA library, the second one defines AWS Identity and Access Management (IAM) policies:
    import * as ara from 'aws-analytics-reference-architecture'; 
    import * as iam from 'aws-cdk-lib/aws-iam';

Create and define an EKS cluster and compute capacity

To create an EMR on EKS virtual cluster, you first need to deploy an EKS cluster. The ARA library defines a construct called EmrEksCluster. The construct provisions an EKS cluster, enables IAM roles for service accounts, and deploys a set of supporting controllers like certificate manager controller (needed by the managed endpoint that is used by Amazon EMR Studio) as well as a cluster auto scaler to have an elastic cluster and save on cost when no job is submitted to the cluster.

In lib/emr-eks-app.ts, add the following line:

const emrEks = ara.EmrEksCluster.getOrCreate(this,{ 
   eksAdminRoleArn:ROLE_ARN;, 
   eksClusterName:CLUSTER_NAME;
   autoscaling: Autoscaler.KARPENTER, 
});

To learn more about the properties you can customize, refer to EmrEksClusterProps. There are two mandatory parameters in EmrEksCluster construct: The first is eksAdminRoleArn role is mandatory and is the role you use to interact with the Kubernetes control plane. This role must have administrative permissions to create or update the cluster. The second parameter is autoscaling, this parameter allows you to select the autoscaling mechanism, either Karpenter or native Kubernetes Cluster Autoscaler. In this blog we will use Karpenter and we recommend its use due to faster autoscaling, simplified node management and provisioning. Now you’re ready to define the compute capacity.

One way to define worker nodes in Amazon EKS is to use managed node groups. We use one node group called tooling, which hosts the coredns, ingress controller, certificate manager, Karpenter and any other pod that is necessary for the running EMR on EKS jobs or ManagedEndpoint. We also define default Karpenter Provisioners that define capacity to be used for jobs submitted by EMR on EKS. These Provisioners are optimized for different Spark use cases (critical jobs, non-critical job, experimentation and interactive sessions). The construct also allows you to submit your own provisioner defined by a Kubernetes manifest through a method called addKarpenterProvisioner. Let’s discuss the predefined Provisioners.

Default Provisioners configurations

The default provisioners are set for rapid experimentation and are always created by default. However, if you don’t want to use them, you can set the defaultNodeGroups parameter to false in the EmrEksCluster properties at creation time. The Provisioners are defined as follows and are created in each of the subnets that are used by Amazon EKS:

  • Critical provisioner – It is dedicated to supporting jobs with aggressive SLAs and are time sensitive. The provisioner uses On-Demand Instances, which aren’t stopped, unlike Spot Instances, and their lifecycle follows through one of the jobs. The nodes use instance stores, which are NVMe disks physically attached to the host, which offer a high I/O throughput that allow better Spark performance, because it’s used as temporary storage for disk spill and shuffle. The instance types used in the node are of the m6gd family. The instances use the AWS Graviton processor, which offers better price/performance than x86 processors. To use this provisioner in your jobs, you can use the following sample configuration, which is referenced in the configuration override of the EMR on EKS job submission.
  • Non-critical provisioner – This Provisioner leverage Spot Instances to save costs for jobs that aren’t time sensitive or jobs that are used for experiments. This node use Spot Instances because the jobs aren’t critical and can be interrupted. These instances can be stopped if the instance is reclaimed. The instance types used in the node are of the m6gd family, the driver is On-Demand and executors are on spot instances.
  • Notebook provisioner – The Provisioner is for running managed endpoints that are used by Amazon EMR Studio for data exploration using Amazon EMR on EKS. The instances are of t3 family and are On-Demand for driver and Spot Instances for executors to keep the cost low. If the executor instances are stopped, new ones are started by Karpenter. If the executor instances are stopped too often, you can define your own that use On-Demand instances.

The following link provides more details about how each of the provisioner are defined. One import property that is defined in the default Provisioners is there is one for each AZ. This is important because it allows you to reduce inter-AZ network transfer cost when Spark runs a shuffle.

For this post, we use the default Provisioners, so you don’t need to add any lines of code for this section. If you want yo add your own Provisioners you can leverage the method addKarpenterProvisioner to apply your own manifests. You can use helper methods in Utils class like readYamlDocument to read YAML document and loadYaml load YAML files and pass them as arguments to addKarpenterProvisioner method.

Deploy the virtual cluster and an execution role

A virtual cluster is a Kubernetes namespace that Amazon EMR is registered with; when you submit a job, the driver and executor pods are running in the associated namespace. The EmrEksCluster construct offers a method called addEmrVirtualCluster, which creates the virtual cluster for you. The method takes EmrVirtualClusterOptions as a parameter, which has the following attributes:

  • name – The name of your virtual cluster.
  • createNamespace – An optional field that creates the EKS namespace. This is of type Boolean and by default it doesn’t create a separate EKS namespace, so your virtual cluster is created in the default namespace.
  • eksNamespace – The name of the EKS namespace to be linked with the virtual EMR cluster. If no namespace is supplied, the construct uses the default namespace.
  1. In lib/emr-eks-app.ts, add the following line to create your virtual cluster:
    const virtualCluster = emrEks.addEmrVirtualCluster(this,{ 
       name:'my-emr-eks-cluster', 
       eksNamespace: ‘batchjob’, 
       createNamespace: true 
    });

    Now we create the execution role, which is an IAM role that is used by the driver and executor to interact with AWS services. Before we can create the execution role for Amazon EMR, we need to first create the ManagedPolicy. Note that in the following code, we create a policy to allow access to the Amazon Simple Storage Service (Amazon S3) bucket and Amazon CloudWatch logs.

  2. In lib/emr-eks-app.ts, add the following line to create the policy:
    const emrEksPolicy = new iam.ManagedPolicy(this,'managed-policy',
    { statements: [ 
       new iam.PolicyStatement({ 
           effect: iam.Effect.ALLOW, 
           actions:['s3:PutObject','s3:GetObject','s3:ListBucket'], 
           resources:['YOUR-DATA-S3-BUCKET']
        }), 
       new iam.PolicyStatement({ 
           effect: iam.Effect.ALLOW, 
           actions:['logs:PutLogEvents','logs:CreateLogStream','logs:DescribeLogGroups','logs:DescribeLogStreams'], 
           resources:['arn:aws:logs:*:*:*'] 
        })
       ] 
    });

    If you want to use the AWS Glue Data Catalog, add its permission in the preceding policy.

    Now we create the execution role for Amazon EMR on EKS using the policy defined in the previous step using the createExecutionRole instance method. The driver and executor pods can then assume this role to access and process data. The role is scoped in such a way that only pods in the virtual cluster namespace can assume it. To learn more about the condition implemented by this method to restrict access to the role to only pods that are created by Amazon EMR on EKS in the namespace of the virtual cluster, refer to Using job execution roles with Amazon EMR on EKS.

  3. In lib/emr-eks-app.ts, add the following line to create the execution role:
    const role = emrEks.createExecutionRole(this,'emr-eks-execution-role', emrEksPolicy, ‘batchjob’,’ execRoleJob’);

    The preceding code produces an IAM role called execRoleJob with the IAM policy defined in emrekspolicy and scoped to the namespace dataanalysis.

  4. Lastly, we output parameters that are important for the job run:
// Virtual cluster Id to reference in jobs
new cdk.CfnOutput(this, 'VirtualClusterId', { value: virtualCluster.attrId });

// Job config for each nodegroup
new cdk.CfnOutput(this, 'CriticalConfig', { value: emrEks.criticalDefaultConfig });

// Execution role arn
new cdk.CfnOutput(this, 'ExecRoleArn', { value: role.roleArn });

Deploy Amazon EMR Studio and provision users

To deploy an EMR Studio for data exploration and job authoring, the ARA library has a construct called NotebookPlatform. This construct allows you to deploy as many EMR Studios as you need (within the account limit) and set them up with the authentication mode that is suitable for you and assign users to them. To learn more about the authentication modes available in Amazon EMR Studio, refer to Choose an authentication mode for Amazon EMR Studio.

The construct creates all the necessary IAM roles and policies needed by Amazon EMR Studio. It also creates an S3 bucket where all the notebooks are stored by Amazon EMR Studio. The bucket is encrypted with a customer managed key (CMK) generated by the AWS CDK stack. The following steps show you how to create your own EMR Studio with the construct.

The notebook platform construct takes NotebookPlatformProps as a property, which allows you to define your EMR Studio, a namespace, the name of the EMR Studio, and its authentication mode.

  1. In lib/emr-eks-app.ts, add the following line:
    const notebookPlatform = new ara.NotebookPlatform(this, 'platform-notebook', {
    emrEks: emrEks,
    eksNamespace: 'dataanalysis',
    studioName: 'platform',
    studioAuthMode: ara.StudioAuthMode.IAM,
    });

    For this post, we use IAM users so that you can easily reproduce it in your own account. However, if you have IAM federation or single sign-on (SSO) already in place, you can use them instead of IAM users.To learn more about the parameters of NotebookPlatformProps, refer to NotebookPlatformProps.

    Next, we need to create and assign users to the Amazon EMR Studio. For this, the construct has a method called addUser that takes a list of users and either assigns them to Amazon EMR Studio in case of SSO or updates the IAM policy to allows access to Amazon EMR Studio for the provided IAM users. The user can also have multiple managed endpoints, and each user can have their Amazon EMR version defined. They can use a different set of Amazon Elastic Compute Cloud (Amazon EC2) instances and different permissions using job execution roles.

  2. In lib/emr-eks-app.ts, add the following line:
    notebookPlatform.addUser([{
    identityName:<NAME-OF-EXISTING-IAM-USER>,
    notebookManagedEndpoints: [{
    emrOnEksVersion: 'emr-6.8.0-latest',
    executionPolicy: emrEksPolicy,
    managedEndpointName: ‘myendpoint’
    }],
    }]);

    In the preceding code, for the sake of brevity, we reuse the same IAM policy that we created in the execution role.

    Note that the construct optimizes the number of managed endpoints that are created. If two endpoints have the same name, then only one is created.

  3. Now that we have defined our deployment, we can deploy it:
   npm run build && cdk deploy

You can find a sample project that contains all the steps of the walk through in the following GitHub repository.

When the deployment is complete, the output contains the S3 bucket containing the assets for podTemplate, the link for the EMR Studio, and the EMR Studio virtual cluster ID. The following screenshot shows the output of the AWS CDK after the deployment is complete.

CDK output
Submit jobs

Because we’re using the default Provisioners, we will use the podTemplate that is defined by the construct available on the ARA GitHub repository. These are uploaded for you by the construct to an S3 bucket called <clustername>-emr-eks-assets; you only need to refer to them in your Spark job. In this job, you also use the job parameters in the output at the end of the AWS CDK deployment. These parameters allow you to use the AWS Glue Data Catalog and implement Spark on Kubernetes best practices like dynamicAllocation and pod collocation. At the end of cdk deploy ARA will output job sample configurations with the best practices listed before that you can use to submit a job. You can submit a job as follows.

A job run is a unit of work such as a Spark JAR file that is submitted to the EMR on EKS cluster. We start a job using the start-job-run command. Note you can use SparkSubmitParameters to specify the Amazon S3 path to the pod template, as shown in the following command:

aws emr-containers start-job-run \

--virtual-cluster-id <CLUSTER-ID>\

--name <SPARK-JOB-NAME>\

--execution-role-arn <ROLE-ARN> \

--release-label emr-6.8.0-latest \

--job-driver '{
"sparkSubmitJobDriver": {
"entryPoint": ""<S3URI-SPARK-JOB>"
}
}' --configuration-overrides '{
"applicationConfiguration": [
{
"classification": "spark-defaults",
"properties": {
"spark.hadoop.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",

"spark.sql.catalogImplementation": "hive",

"spark.dynamicAllocation.enabled":"true",

"spark.dynamicAllocation.minExecutors": "8",

"spark.dynamicAllocation.maxExecutors": "40",

"spark.kubernetes.allocation.batch.size": "8",

"spark.executor.cores": "8",

"spark.kubernetes.executor.request.cores": "7",

"spark.executor.memory": "28G",

"spark.driver.cores": "2",

"spark.kubernetes.driver.request.cores": "2",

"spark.driver.memory": "6G",

"spark.dynamicAllocation.executorAllocationRatio": "1",

"spark.dynamicAllocation.shuffleTracking.enabled": "true",

"spark.dynamicAllocation.shuffleTracking.timeout": "300s",

"spark.kubernetes.driver.podTemplateFile": s3://<EKS-CLUSTER-NAME>-emr-eks-assets-<ACCOUNT-ID>-<REGION> /<EKS-CLUSTER-NAME>/pod-template/critical-driver.yaml ",

"spark.kubernetes.executor.podTemplateFile": s3://<EKS-CLUSTER-NAME>-emr-eks-assets-<ACCOUNT-ID>-<REGION> /<EKS-CLUSTER-NAME>/pod-template/critical-executor.yaml "
}
}
],
"monitoringConfiguration": {
"cloudWatchMonitoringConfiguration": {
"logGroupName": ""<Log_Group_Name>",
"logStreamNamePrefix": "<Log_Stream_Prefix>"
}
}'

The code takes the following values:

  • <CLUSTER-ID> – The EMR virtual cluster ID
  • <SPARK-JOB-NAME> – The name of your Spark job
  • <ROLE-ARN> – The execution role you created
  • <S3URI-SPARK-JOB> – The Amazon S3 URI of your Spark job
  • <S3URI-CRITICAL-DRIVER> – The Amazon S3 URI of the driver pod template, which you get from the AWS CDK output
  • <S3URI-CRITICAL-EXECUTOR> – The Amazon S3 URI of the executor pod template
  • <Log_Group_Name> – Your CloudWatch log group name
  • <Log_Stream_Prefix> – Your CloudWatch log stream prefix

You can go to the Amazon EMR console to check the status of your job and to view logs. You can also check the status by running the describe-job-run command:

aws emr-containers describe-job-run --<CLUSTER-ID> cluster-id --id <JOB-RUN-ID>

Explore data using Amazon EMR Studio

In this section, we show how you can create a workspace in Amazon EMR Studio and connect to the Amazon EKS managed endpoint from the workspace. From the output, use the link to Amazon EMR Studio to navigate to the EMR Studio deployment. You must sign in with the IAM username you provided in the addUser method.

Create a Workspace

To create a Workspace, complete the following steps:

  1. Log in to the EMR Studio created by the AWS CDK.
  2. Choose Create Workspace.
  3. Enter a workspace name and an optional description.
  4. Select Allow Workspace Collaboration if you want to work with other Studio users in this Workspace in real time.
  5. Choose Create Workspace.

create-emr-studio-workspace

After you create the Workspace, choose it from the list of Workspaces to open the JupyterLab environment.
emr studio workspace running

The following screenshot shows what the terminal looks like. For more information about the user interface, refer to Understand the Workspace user interface.

EMR Studio workspace view

Connect to an EMR on EKS managed endpoint

You can easily connect to the EMR on EKS managed endpoint from the Workspace.

  1. In the navigation pane, on the Clusters menu, select EMR Cluster on EKS for Cluster type.
    The virtual clusters appear on the EMR Cluster on EKS drop-down menu, and the endpoint appears on the Endpoint drop-down menu. If there are multiple endpoints, they appear here, and you can easily switch between endpoints from the Workspace.
  2. Select the appropriate endpoint and choose Attach.
    attach to managedendpoint

Work with a notebook

You can now open a notebook and connect to a preferred kernel to do your tasks. For instance, you can select a PySpark kernel, as shown in the following screenshot.
select-kernel

Explore your data

The first step of our data exploration exercise is to create a Spark session and then load the New York taxi dataset from the S3 bucket into a data frame. Use the following code block to load the data into a data frame. Copy the Amazon S3 URI for the location where the dataset resides in Amazon S3.

	from pyspark.sql import SparkSession
	from pyspark.sql.functions import *
	from datetime import datetime
	spark = SparkSession.builder.appName("SparkEDAA").getOrCreate()

After we load the data into a data frame, we replace the data of the current_date column with the actual current date, count the number of rows, and save the data into a Parquet file:

print("Total number of records: " + str(updatedNYTaxi.count()))
updatedNYTaxi.write.parquet("<YOUR-S3-PATH>")

The following screenshot shows the result of our notebook running on Amazon EMR Studio and with PySpark running on Amazon EMR on EKS.
notebook execution

Clean up

To clean up after this post, run cdk destroy.

Conclusion

In this post, we showed how you can use the ARA to quickly deploy a data analytics infrastructure and start experimenting with your data. You can find the full example referenced in this post in the GitHub repository. The AWS Analytics Reference Architecture implements common Analytics pattern and AWS best practices to offer you ready to use constructs to for your experiments. One of the patterns is the data mesh, which you can consult how to use in this blog post.

You can also explore other constructs offered in this library to experiment with AWS Analytics services before transitioning your workload for production.


About the Authors

co-author-1Lotfi Mouhib is a Senior Solutions Architect working for the Public Sector team with Amazon Web Services. He helps public sector customers across EMEA realize their ideas, build new services, and innovate for citizens. In his spare time, Lotfi enjoys cycling and running.

Sandipan Bhaumik is a Senior Analytics Specialist Solutions Architect based in London. He has worked with customers in different industries like Banking & Financial Services, Healthcare, Power & Utilities, Manufacturing and Retail helping them solve complex challenges with large-scale data platforms. At AWS he focuses on strategic accounts in the UK and Ireland and helps customers to accelerate their journey to the cloud and innovate using AWS analytics and machine learning services. He loves playing badminton, and reading books.

Design considerations for Amazon EMR on EKS in a multi-tenant Amazon EKS environment

Post Syndicated from Lotfi Mouhib original https://aws.amazon.com/blogs/big-data/design-considerations-for-amazon-emr-on-eks-in-a-multi-tenant-amazon-eks-environment/

Many AWS customers use Amazon Elastic Kubernetes Service (Amazon EKS) in order to take advantage of Kubernetes without the burden of managing the Kubernetes control plane. With Kubernetes, you can centrally manage your workloads and offer administrators a multi-tenant environment where they can create, update, scale, and secure workloads using a single API. Kubernetes also allows you to improve resource utilization, reduce cost, and simplify infrastructure management to support different application deployments. This model is beneficial for those running Apache Spark workloads, for several reasons. For example, it allows you to have multiple Spark environments running concurrently with different configurations and dependencies that are segregated from each other through Kubernetes multi-tenancy features. In addition, the same cluster can be used for various workloads like machine learning (ML), host applications, data streaming and thereby reducing operational overhead of managing multiple clusters.

AWS offers Amazon EMR on EKS, a managed service that enables you to run your Apache Spark workloads on Amazon EKS. This service uses the Amazon EMR runtime for Apache Spark, which increases the performance of your Spark jobs so that they run faster and cost less. When you run Spark jobs on EMR on EKS and not on self-managed Apache Spark on Kubernetes, you can take advantage of automated provisioning, scaling, faster runtimes, and the development and debugging tools that Amazon EMR provides

In this post, we show how to configure and run EMR on EKS in a multi-tenant EKS cluster that can used by your various teams. We tackle multi-tenancy through four topics: network, resource management, cost management, and security.

Concepts

Throughout this post, we use terminology that is either specific to EMR on EKS, Spark, or Kubernetes:

  • Multi-tenancy – Multi-tenancy in Kubernetes can come in three forms: hard multi-tenancy, soft multi-tenancy and sole multi-tenancy. Hard multi-tenancy means each business unit or group of applications gets a dedicated Kubernetes; there is no sharing of the control plane. This model is out of scope for this post. Soft multi-tenancy is where pods might share the same underlying compute resource (node) and are logically separated using Kubernetes constructs through namespaces, resource quotas, or network policies. A second way to achieve multi-tenancy in Kubernetes is to assign pods to specific nodes that are pre-provisioned and allocated to a specific team. In this case, we talk about sole multi-tenancy. Unless your security posture requires you to use hard or sole multi-tenancy, you would want to consider using soft multi-tenancy for the following reasons:
    • Soft multi-tenancy avoids underutilization of resources and waste of compute resources.
    • There is a limited number of managed node groups that can be used by Amazon EKS, so for large deployments, this limit can quickly become a limiting factor.
    • In sole multi-tenancy there is high chance of ghost nodes with no pods scheduled on them due to misconfiguration as we force pods into dedicated nodes with label, taints and tolerance and anti-affinity rules.
  • Namespace – Namespaces are core in Kubernetes and a pillar to implement soft multi-tenancy. With namespaces, you can divide the cluster into logical partitions. These partitions are then referenced in quotas, network policies, service accounts, and other constructs that help isolate environments in Kubernetes.
  • Virtual cluster – An EMR virtual cluster is mapped to a Kubernetes namespace that Amazon EMR is registered with. Amazon EMR uses virtual clusters to run jobs and host endpoints. Multiple virtual clusters can be backed by the same physical cluster. However, each virtual cluster maps to one namespace on an EKS cluster. Virtual clusters don’t create any active resources that contribute to your bill or require lifecycle management outside the service.
  • Pod template – In EMR on EKS, you can provide a pod template to control pod placement, or define a sidecar container. This pod template can be defined for executor pods and driver pods, and stored in an Amazon Simple Storage Service (Amazon S3) bucket. The S3 locations are then submitted as part of the applicationConfiguration object that is part of configurationOverrides, as defined in the EMR on EKS job submission API.

Security considerations

In this section, we address security from different angles. We first discuss how to protect IAM role that is used for running the job. Then address how to protect secrets use in jobs and finally we discuss how you can protect data while it is processed by Spark.

IAM role protection

A job submitted to EMR on EKS needs an AWS Identity and Access Management (IAM) execution role to interact with AWS resources, for example with Amazon S3 to get data, with Amazon CloudWatch Logs to publish logs, or use an encryption key in AWS Key Management Service (AWS KMS). It’s a best practice in AWS to apply least privilege for IAM roles. In Amazon EKS, this is achieved through IRSA (IAM Role for Service Accounts). This mechanism allows a pod to assume an IAM role at the pod level and not at the node level, while using short-term credentials that are provided through the EKS OIDC.

IRSA creates a trust relationship between the EKS OIDC provider and the IAM role. This method allows only pods with a service account (annotated with an IAM role ARN) to assume a role that has a trust policy with the EKS OIDC provider. However, this isn’t enough, because it would allow any pod with a service account within the EKS cluster that is annotated with a role ARN to assume the execution role. This must be further scoped down using conditions on the role trust policy. This condition allows the assume role to happen only if the calling service account is the one used for running a job associated with the virtual cluster. The following code shows the structure of the condition to add to the trust policy:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Federated": <OIDC provider ARN >
            },
            "Action": "sts:AssumeRoleWithWebIdentity"
            "Condition": { "StringLike": { “<OIDC_PROVIDER>:sub": "system:serviceaccount:<NAMESPACE>:emr-containers-sa-*-*-<AWS_ACCOUNT_ID>-<BASE36_ENCODED_ROLE_NAME>”} }
        }
    ]
}

To scope down the trust policy using the service account condition, you need to run the following the command with AWS CLI:

aws emr-containers update-role-trust-policy \
–cluster-name cluster \
–namespace namespace \
–role-name iam_role_name_for_job_execution

The command will the add the service account that will be used by the spark client, Jupyter Enterprise Gateway, Spark kernel, driver or executor. The service accounts name have the following structure emr-containers-sa-*-*-<AWS_ACCOUNT_ID>-<BASE36_ENCODED_ROLE_NAME>.

In addition to the role segregation offered by IRSA, we recommend blocking access to instance metadata because a pod can still inherit the rights of the instance profile assigned to the worker node. For more information about how you can block access to metadata, refer to Restrict access to the instance profile assigned to the worker node.

Secret protection

Sometime a Spark job needs to consume data stored in a database or from APIs. Most of the time, these are protected with a password or access key. The most common way to pass these secrets is through environment variables. However, in a multi-tenant environment, this means any user with access to the Kubernetes API can potentially access the secrets in the environment variables if this access isn’t scoped well to the namespaces the user has access to.

To overcome this challenge, we recommend using a Secrets store like AWS Secrets Manager that can be mounted through the Secret Store CSI Driver. The benefit of using Secrets Manager is the ability to use IRSA and allow only the role assumed by the pod access to the given secret, thereby improving your security posture. You can refer to the best practices guide for sample code showing the use of Secrets Manager with EMR on EKS.

Spark data encryption

When a Spark application is running, the driver and executors produce intermediate data. This data is written to the node local storage. Anyone who is able to exec into the pods would be able to read this data. Spark supports encryption of this data, and it can be enabled by passing --conf spark.io.encryption.enabled=true. Because this configuration adds performance penalty, we recommend enabling data encryption only for workloads that store and access highly sensitive data and in untrusted environments.

Network considerations

In this section we discuss how to manage networking within the cluster as well as outside the cluster. We first address how Spark handle cross executors and driver communication and how to secure it. Then we discuss how to restrict network traffic between pods in the EKS cluster and allow only traffic destined to EMR on EKS. Last, we discuss how to restrict traffic of executors and driver pods to external AWS service traffic using security groups.

Network encryption

The communication between the driver and executor uses RPC protocol and is not encrypted. Starting with Spark 3 in the Kubernetes backed cluster, Spark offers a mechanism to encrypt communication using AES encryption.

The driver generates a key and shares it with executors through the environment variable. Because the key is shared through the environment variable, potentially any user with access to the Kubernetes API (kubectl) can read the key. We recommend securing access so that only authorized users can have access to the EMR virtual cluster. In addition, you should set up Kubernetes role-based access control in such a way that the pod spec in the namespace where the EMR virtual cluster runs is granted to only a few selected service accounts. This method of passing secrets through the environment variable would change in the future with a proposal to use Kubernetes secrets.

To enable encryption, RPC authentication must also be enabled in your Spark configuration. To enable encryption in-transit in Spark, you should use the following parameters in your Spark config:

--conf spark.authenticate=true

--conf spark.network.crypto.enabled=true

Note that these are the minimal parameters to set; refer to Encryption from the complete list of parameters.

Additionally, applying encryption in Spark has a negative impact on processing speed. You should only apply it when there is a compliance or regulation need.

Securing Network traffic within the cluster

In Kubernetes, by default pods can communicate over the network across different namespaces in the same cluster. This behavior is not always desirable in a multi-tenant environment. In some instances, for example in regulated industries, to be compliant you want to enforce strict control over the network and send and receive traffic only from the namespace that you’re interacting with. For EMR on EKS, it would be the namespace associated to the EMR virtual cluster. Kubernetes offers constructs that allow you to implement network policies and define fine-grained control over the pod-to-pod communication. These policies are implemented by the CNI plugin; in Amazon EKS, the default plugin would be the VPC CNI. A policy is defined as follows and is applied with kubectl:

Kind: NetworkPolicy
metadata:
  name: default-np-ns1
  namespace: <EMR-VC-NAMESPACE>
spec:
  podSelector: {}
  policyTypes:
  - Ingress
  - Egress
  ingress:
  - from:
    - namespaceSelector:
        matchLabels:
          nsname: <EMR-VC-NAMESPACE>

Network traffic outside the cluster

In Amazon EKS, when you deploy pods on Amazon Elastic Compute Cloud (Amazon EC2) instances, all the pods use the security group associated with the node. This can be an issue if your pods (executor pods) are accessing a data source (namely a database) that allows traffic based on the source security group. Database servers often restrict network access only from where they are expecting it. In the case of a multi-tenant EKS cluster, this means pods from other teams that shouldn’t have access to the database servers, would be able to send traffic to it.

To overcome this challenge, you can use security groups for pods. This feature allows you to assign a specific security group to your pods, thereby controlling the network traffic to your database server or data source. You can also refer to the best practices guide for a reference implementation.

Cost management and chargeback

In a multi-tenant environment, cost management is a critical subject. You have multiple users from various business units, and you need to be able to precisely chargeback the cost of the compute resource they have used. At the beginning of the post, we introduced three models of multi-tenancy in Amazon EKS: hard multi-tenancy, soft multi-tenancy, and sole multi-tenancy. Hard multi-tenancy is out of scope because the cost tracking is trivial; all the resources are dedicated to the team using the cluster, which is not the case for sole multi-tenancy and soft multi-tenancy. In the next sections, we discuss these two methods to track the cost for each of model.

Soft multi-tenancy

In a soft multi-tenant environment, you can perform chargeback to your data engineering teams based on the resources they consumed and not the nodes allocated. In this method, you use the namespaces associated with the EMR virtual cluster to track how much resources were used for processing jobs. The following diagram illustrates an example.

Diagram to illustrate soft multi-tenancy

Diagram -1 Soft multi-tenancy

Tracking resources based on the namespace isn’t an easy task because jobs are transient in nature and fluctuate in their duration. However, there are partner tools available that allow you to keep track of the resources used, such as Kubecost, CloudZero, Vantage, and many others. For instructions on using Kubecost on Amazon EKS, refer to this blog post on cost monitoring for EKS customers.

Sole multi-tenancy

For sole multi-tenancy, the chargeback is done at the instance (node) level. Each member on your team uses a specific set of nodes that are dedicated to it. These nodes aren’t always running, and are spun up using the Kubernetes auto scaling mechanism. The following diagram illustrates an example.

Diagram to illustrate Sole tenancy

Diagram -2 Sole tenancy

With sole multi-tenancy, you use a cost allocation tag, which is an AWS mechanism that allows you to track how much each resource has consumed. Although the method of sole multi-tenancy isn’t efficient in terms of resource utilization, it provides a simplified strategy for chargebacks. With the cost allocation tag, you can chargeback a team based on all the resources they used, like Amazon S3, Amazon DynamoDB, and other AWS resources. The chargeback mechanism based on the cost allocation tag can be augmented using the recently launched AWS Billing Conductor, which allows you to issue bills internally for your team.

Resource management

In this section, we discuss considerations regarding resource management in multi-tenant clusters. We briefly discuss topics like sharing resources graciously, setting guard rails on resource consumption, techniques for ensuring resources for time sensitive and/or critical jobs, meeting quick resource scaling requirements and finally cost optimization practices with node selectors.

Sharing resources

In a multi-tenant environment, the goal is to share resources like compute and memory for better resource utilization. However, this requires careful capacity management and resource allocation to make sure each tenant gets their fair share. In Kubernetes, resource allocation is controlled and enforced by using ResourceQuota and LimitRange. ResourceQuota limits resources on the namespace level, and LimitRange allows you to make sure that all the containers are submitted with a resource requirement and a limit. In this section, we demonstrate how a data engineer or Kubernetes administrator can set up ResourceQuota as a LimitRange configuration.

The administrator creates one ResourceQuota per namespace that provides constraints for aggregate resource consumption:

apiVersion: v1
kind: ResourceQuota
metadata:
  name: compute-resources
  namespace: teamA
spec:
  hard:
    requests.cpu: "1000"
    requests.memory: 4000Gi
    limits.cpu: "2000"
    limits.memory: 6000Gi

For LimitRange, the administrator can review the following sample configuration. We recommend using default and defaultRequest to enforce the limit and request field on containers. Lastly, from a data engineer perspective while submitting the EMR on EKS jobs, you need to make sure the Spark parameters of resource requirements are within the range of the defined LimitRange. For example, in the following configuration, the request for spark.executor.cores=7 will fail because the max limit for CPU is 6 per container:

apiVersion: v1
kind: LimitRange
metadata:
  name: cpu-min-max
  namespace: teamA
spec:
  limits:
  - max:
      cpu: "6"
    min:
      cpu: "100m"
    default:
      cpu: "500m"
    defaultRequest:
      cpu: "100m"
    type: Container

Priority-based resource allocation

Diagram Illustrates an example of resource allocation with priority

Diagram – 3 Illustrates an example of resource allocation with priority.

As all the EMR virtual clusters share the same EKS computing platform with limited resources, there will be scenarios in which you need to prioritize jobs in a sensitive timeline. In this case, high-priority jobs can utilize the resources and finish the job, whereas low-priority jobs that are running gets stopped and any new pods must wait in the queue. EMR on EKS can achieve this with the help of pod templates, where you specify a priority class for the given job.

When a pod priority is enabled, the Kubernetes scheduler orders pending pods by their priority and places them in the scheduling queue. As a result, the higher-priority pod may be scheduled sooner than pods with lower priority if its scheduling requirements are met. If this pod can’t be scheduled, the scheduler continues and tries to schedule other lower-priority pods.

The preemptionPolicy field on the PriorityClass defaults to PreemptLowerPriority, and the pods of that PriorityClass can preempt lower-priority pods. If preemptionPolicy is set to Never, pods of that PriorityClass are non-preempting. In other words, they can’t preempt any other pods. When lower-priority pods are preempted, the victim pods get a grace period to finish their work and exit. If the pod doesn’t exit within that grace period, that pod is stopped by the Kubernetes scheduler. Therefore, there is usually a time gap between the point when the scheduler preempts victim pods and the time that a higher-priority pod is scheduled. If you want to minimize this gap, you can set a deletion grace period of lower-priority pods to zero or a small number. You can do this by setting the terminationGracePeriodSeconds option in the victim Pod YAML.

See the following code samples for priority class:

apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
  name: high-priority
value: 100
globalDefault: false
description: " High-priority Pods and for Driver Pods."

apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
  name: low-priority
value: 50
globalDefault: false
description: " Low-priority Pods."

One of the key considerations while templatizing the driver pods, especially for low-priority jobs, is to avoid the same low-priority class for both driver and executor. This will save the driver pods from getting evicted and lose the progress of all its executors in a resource congestion scenario. In this low-priority job example, we have used a high-priority class for driver pod templates and low-priority classes only for executor templates. This way, we can ensure the driver pods are safe during the eviction process of low-priority jobs. In this case, only executors will be evicted, and the driver can bring back the evicted executor pods as the resource becomes freed. See the following code:

apiVersion: v1
kind: Pod
spec:
  priorityClassName: "high-priority"
  nodeSelector:
    eks.amazonaws.com/capacityType: ON_DEMAND
  containers:
  - name: spark-kubernetes-driver # This will be interpreted as Spark driver container

apiVersion: v1
kind: Pod
spec:
  priorityClassName: "low-priority"
  nodeSelector:
    eks.amazonaws.com/capacityType: SPOT
  containers:
  - name: spark-kubernetes-executors # This will be interpreted as Spark executor container

Overprovisioning with priority

Diagram Illustrates an example of overprovisioning with priority

Diagram – 4 Illustrates an example of overprovisioning with priority.

As pods wait in a pending state due to resource availability, additional capacity can be added to the cluster with Amazon EKS auto scaling. The time it takes to scale the cluster by adding new nodes for deployment has to be considered for time-sensitive jobs. Overprovisioning is an option to mitigate the auto scaling delay using temporary pods with negative priority. These pods occupy space in the cluster. When pods with high priority are unschedulable, the temporary pods are preempted to make the room. This causes the auto scaler to scale out new nodes due to overprovisioning. Be aware that this is a trade-off because it adds higher cost while minimizing scheduling latency. For more information about overprovisioning best practices, refer to Overprovisioning.

Node selectors

EKS clusters can span multiple Availability Zones in a VPC. A Spark application whose driver and executor pods are distributed across multiple Availability Zones can incur inter- Availability Zone data transfer costs. To minimize or eliminate the data transfer cost, you should configure the job to run on a specific Availability Zone or even specific node type with the help of node labels. Amazon EKS places a set of default labels to identify capacity type (On-Demand or Spot Instance), Availability Zone, instance type, and more. In addition, we can use custom labels to meet workload-specific node affinity.

EMR on EKS allows you to choose specific nodes in two ways:

  • At the job level. Refer to EKS Node Placement for more details.
  • In the driver and executor level using pod templates.

When using pod templates, we recommend using on demand instances for driver pods. You can also consider including spot instances for executor pods for workloads that are tolerant of occasional periods when the target capacity is not completely available. Leveraging spot instances allow you to save cost for jobs that are not critical and can be terminated. Please refer Define a NodeSelector in PodTemplates.

Conclusion

In this post, we provided guidance on how to design and deploy EMR on EKS in a multi-tenant EKS environment through different lenses: network, security, cost management, and resource management. For any deployment, we recommend the following:

  • Use IRSA with a condition scoped on the EMR on EKS service account
  • Use a secret manager to store credentials and the Secret Store CSI Driver to access them in your Spark application
  • Use ResourceQuota and LimitRange to specify the resources that each of your data engineering teams can use and avoid compute resource abuse and starvation
  • Implement a network policy to segregate network traffic between pods

Lastly, if you are considering migrating your spark workload to EMR on EKS you can further learn about design patterns to manage Apache Spark workload in EMR on EKS in this blog and about migrating your EMR transient cluster to EMR on EKS in this blog.


About the Authors

author - lotfiLotfi Mouhib is a Senior Solutions Architect working for the Public Sector team with Amazon Web Services. He helps public sector customers across EMEA realize their ideas, build new services, and innovate for citizens. In his spare time, Lotfi enjoys cycling and running.

author - peter ajeebAjeeb Peter is a Senior Solutions Architect with Amazon Web Services based in Charlotte, North Carolina, where he guides global financial services customers to build highly secure, scalable, reliable, and cost-efficient applications on the cloud. He brings over 20 years of technology experience on Software Development, Architecture and Analytics from industries like finance and telecom.