Optimize Ama­zon EMR costs for legacy and Spark workloads with managed scaling and node labels

Post Syndicated from Ramesh Raghupathy original https://aws.amazon.com/blogs/big-data/optimize-amazon-emr-costs-for-legacy-and-spark-workloads-with-managed-scaling-and-node-labels/

Customers migrating from large on-premises Hadoop clusters to Amazon EMR like to reduce their operational costs while running resilient applications. On-premises customers typically use in-elastic, large, fixed-size Hadoop clusters, which incurs high capital expenditure. You can now migrate your mixed workloads to managed scaling Amazon EMR, which saves costs without compromising performance.

This solution can benefit those running a mixed workload of legacy MapReduce applications concurrently with Spark applications. MapReduce applications such as Apache Sqoop jobs need to use Amazon Elastic Compute Cloud (Amazon EC2) On-Demand Instances for resilience, whereas Apache Spark job workers can use EC2 Spot Instances due to built-in resilience. Therefore, it’s critical that you can run your workloads with both On-Demand or Spot Instances when needed, while also having the elasticity and resiliency you need to achieve cost savings.

This post walks through a mixed workload scenario to illustrate the use of Amazon EMR managed scaling, node labels, and capacity scheduler configuration to create an elastic EMR cluster that provides elasticity and ability to deploy resilient applications.

Solution overview

For this post, we use the following Apache Sqoop and Apache Spark workloads to demonstrate the scenario and the results:

  • Sqoop workload – A simple Sqoop job to extract data from Amazon Redshift and write data to Amazon Simple Storage Service (Amazon S3)
  • Spark workload – A Python script that unions Amazon S3 data and writes it back to Amazon S3

The following diagram illustrates the two workloads used for this demonstration.

To build the solution, you must complete the following high-level steps:

  1. Determine the EMR cluster configuration for managed scaling with minimum and maximum capacity, and core and task nodes.
  2. For the workloads to run, identify the capacity scheduler queues required, queue capacity as % of total capacity, and Spot or On-Demand Instances used to meet the queue capacity.
  3. Assign YARN node labels to the On-Demand and Spot Instances in the capacity scheduler configuration to ensure the appropriate instance types are allocated to the queues.
  4. Create a bootstrap and step scripts to automate the configuration process during EMR cluster creation.
  5. Validate the cluster elasticity and application resilience by running the Sqoop and Spark applications.

The solution offers the following benefits:

  • Significantly reduces the time to migrate applications to Amazon EMR because you’re no longer struggling to implement cost-optimization techniques as well as application resilience while migrating from on-premises to the AWS Cloud
  • Offers cost savings when compared to running similar workloads on in-elastic, large on-premises Hadoop clusters
  • Enables you to run a mixed workload on EMR clusters without significantly redesigning your on-premises applications

Prerequisites

You need to complete the following steps before you can configure your EMR cluster and run the workloads.

Launch an Amazon Redshift cluster

We first launch an Amazon Redshift cluster. For instructions, refer to Create a sample Amazon Redshift cluster. We use Amazon Redshift as the relational database management service for the Sqoop job.

Create and associate an IAM role for loading the Amazon Redshift cluster

We create an AWS Identity and Access Management (IAM) role that allows the Amazon Redshift cluster to call AWS services on its behalf.

  1. On the IAM console, choose Roles in the navigation pane.
  2. Choose Create role.
  3. For Use case, choose Redshift and Redshift-Customizable.
  4. Choose Next.
  5. For Permissions policies, choose the policy AmazonS3ReadOnlyAccess.
  6. Choose Next.
  7. For Role name, enter load_tpch_redshift.
  8. Choose Create role.
    Now you attach this role to the Amazon Redshift cluster.
  9. On the Properties tab, choose Manage IAM roles.
  10. Associate the IAM role.

Load test data into the Amazon Redshift cluster

We create a table called SQOOP_LOAD_TBL and load it with mock data to test the Sqoop job. The following code shows the create table and copy statement. The copy statement should load around 1000000 rows in the SQOOP_LOAD_TBL table, which we use to run a large Sqoop data movement job.

CREATE TABLE EMRBLOG.SQOOP_LOAD_TBL
(
ID BIGINT NOT NULL,
NAME VARCHAR(25),
REGIONKEY BIGINT,
COMMENT   VARCHAR(150),
TS1 TIMESTAMP 
)
DISTSTYLE EVEN;

copy EMRBLOG.SQOOP_LOAD_TBL
from 's3://aws-blogs-artifacts-public/artifacts/BDB-1737/sample-data/sqoop_input/redshift_manifest'
IAM_ROLE 'arn:aws:iam::xxxxxxxxxx:role/load_tpch_redshift' format parquet 
manifest
;

Create an Amazon RDS for PostgreSQL instance

We create an Amazon Relational Database Service (Amazon RDS) for PostgreSQL instance to use as the metastore for Sqoop.

The following configuration uses a small instance. We use Sqoop as master_user and postgres as the database name. Note the database name, user ID, and password—we use these to connect Sqoop running on the EMR cluster to this metastore.

Use the Amazon EMR automation scripts while creating the cluster

We use three automation scripts from the S3 folder s3://aws-blogs-artifacts-public/artifacts/BDB-1737/config/ while creating the EMR cluster.

The first script is a node label script used by YARN to determine if each instance is Spot or On-Demand:

getNodeLabels.py 

#!/usr/bin/python3
import json
k='/mnt/var/lib/info/extraInstanceData.json'
with open(k) as f:
    response = json.load(f)
    if (response['instanceRole'] in ['core','task']):
       print (f"NODE_PARTITION:{response['marketType'].upper()}")

This script runs during cluster creation to assign node labels SPOT or ON_DEMAND based on instance type.

The next is a bootstrap script to copy the node label script to the /home/hadoop directory on all cluster nodes:

getNodeLabels_bootstrap.sh

#!/bin/bash
set -vx
aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-1737/config/getNodeLabels.py /home/hadoop
chmod +x /home/hadoop/getNodeLabels.py

This script is used during the bootstrap process to copy getNodeLabels.py from the S3 folder to /home/hadoop on the EMR cluster.

The last is a step script to update the Spot node to be exclusively used by the assigned capacity queue:

addNodeLabels.sh

#!/bin/bash
sudo -u yarn yarn rmadmin -addToClusterNodeLabels "SPOT(exclusive=true),ON_DEMAND(exclusive=false)"

There are two kinds of node partitions:

  • Exclusive – Containers are allocated to nodes with an exact match node partition. For example, asking partition=“x” will be allocated to the node with partition=“x”, and asking the DEFAULT partition will be allocated to the DEFAULT partition nodes.
  • Non-exclusive – If a partition is non-exclusive, it shares idle resources to the container requesting the DEFAULT partition.

We use exclusive labels for SPOT to ensure only Spark workloads can use them and non-exclusive labels for ON_DEMAND so that they can be used both by Spark and Sqoop workloads. For more details on the types of labels, refer to YARN Node Labels.

We’re now ready to run our solution.

Launch an EMR cluster

Complete the following steps to launch an EMR cluster:

  1. Determine the managed scaling EMR cluster configuration, choosing instance fleets, which allows us to choose up to 30 instance types and the minimum and maximum configuration to allocate core and task nodes while enabling scaling on the task nodes.
    We suggest the following EMR cluster configuration for instance fleets and EMR managed scaling with core and task nodes for this demonstration. The right number and types of nodes need to be chosen based on the workload needs of your use case.

    1. Minimum – 4
    2. Maximum – 64
    3. On-demand limit – 4
    4. Maximum core nodes – 4
  2. On the Amazon EMR console, choose Create cluster.
  3. In the Advanced Options section, for Software Configuration, select Hadoop, Sqoop, Oozie, and Spark.
  4. In the Edit software settings section, choose Enter configuration.
  5. Enter the following code, which includes yarn-site, capacity-scheduler, and Sqoop-site properties, and the addition of a property to spark-defaults. In the Sqoop-site section, update the metastore URL, user ID, and password.
    [
      {
        "classification": "yarn-site",
        "properties": {
          "yarn.resourcemanager.scheduler.class": "org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler",
          "yarn.node-labels.enabled": "true",
          "yarn.node-labels.am.default-node-label-expression": "ON_DEMAND",
          "yarn.nodemanager.node-labels.provider": "script",
          "yarn.nodemanager.node-labels.provider.script.path": "/home/hadoop/getNodeLabels.py"
        },
        "configurations": []
      },
      {
        "classification": "capacity-scheduler",
        "properties": {
          "yarn.scheduler.capacity.root.queues": "default,Sqoop",
          "yarn.scheduler.capacity.root.Sqoop.capacity": "40",
          "yarn.scheduler.capacity.root.default.capacity": "60",
          "yarn.scheduler.capacity.root.default.accessible-node-labels": "*",
          "yarn.scheduler.capacity.root.Sqoop.accessible-node-labels": "ON_DEMAND",
          "yarn.scheduler.capacity.root.default.accessible-node-labels.ON_DEMAND.capacity": "45",
          "yarn.scheduler.capacity.root.Sqoop.accessible-labels.ON_DEMAND.capacity": "55",
          "yarn.scheduler.capacity.root.default.accessible-node-labels.SPOT.capacity": "100"
        },
        "configurations": []
      },
      {
        "classification": "Sqoop-site",
        "properties": {
          "Sqoop.metastore.client.enable.autoconnect": "true",
          "Sqoop.metastore.client.autoconnect.url": "jdbc:postgresql://<sqoop-metastore-alias>:5432/postgres",
          "Sqoop.metastore.client.autoconnect.username": "xxxxxx",
          "Sqoop.metastore.client.autoconnect.password": "xxxxxx",
          "Sqoop.metastore.client.record.password": "true"
        },
        "configurations": []
      },
      {
        "classification": "spark-defaults",
        "properties": {
          "spark.yarn.executor.nodeLabelExpression": "SPOT"
        }
      }
    ]

    The configuration is created with two queues (Sqoop and default). The Sqoop queue has access only to the On-Demand nodes, and the default queue has access to both On-Demand and Spot nodes.

    In the spark-defaults section, the property "spark.yarn.executor.nodeLabelExpression": "SPOT" enables quick scaling of the Spot nodes and use of the Spot nodes by the Spark executors as soon as the Spark job starts. If this property isn’t used, Spot node scaling is triggered only after the On-Demand nodes are consumed. This causes a longer runtime for the job due to delayed scaling as well as the Spark executor’s inability to use the scaled-up Spot nodes.

  6. Add the addNodeLabels.sh script as a step, which is run using script-runner.jar.
  7. Under Cluster Composition, select Instance fleets, which provides options to choose the nodes from up to 30 instance types.
  8. Choose one primary, four core, and 0 task nodes.
  9. Under Cluster scaling, choose the Amazon EMR managed scaling policy option to define the core and task units (minimum 4, maximum 64, On-Demand limit 4, max core nodes 4).
  10. For Bootstrap Actions, add the getNodeLabels_bootstrap.sh script from Amazon S3 as a step.
    This script copies getNodeLabels.py from the Amazon S3 location to the /home/hadoop directory on the primary node.
  11. Use an existing EC2 key pair or create a new one if none exists, and download it to be used for logging onto the primary node.
  12. Choose existing security groups for the primary node and core and task nodes.
  13. Choose Create cluster and wait for cluster creation to complete.

Configure proxy settings to view websites hosted on the primary node

To configure your proxy settings, follow the instructions in Option 2, part 1: Set up an SSH tunnel to the master node using dynamic port forwarding and Option 2, part 2: Configure proxy settings to view websites hosted on the master node.

After the proxy settings are configured, run the following command on your terminal window (Mac):

ssh -i ec2-login-keypair.pem [email protected] -ND 8157

Open the Resource Manager UI (found on the Application User Interfaces tab on the Amazon EMR console, similar to http://ec2-xxxxxxxx.compute-1.amazonaws.com:8088/) and choose the scheduler option to monitor the jobs and use of capacity scheduler queues.

Run the Sqoop job

To run the Sqoop job and monitor YARN, complete the following steps:

  1. Connect to the primary node and run the Sqoop job -list command.
    This command creates the Sqoop metadata tables in the metastore.If you can’t connect Sqoop to Amazon RDS, make sure the Amazon RDS security group allows an inbound PostgreSQL TCP connection on port 5432 from both the EMR primary and secondary security groups. Follow the same procedure while connecting to Amazon Redshift to open the Amazon Redshift 5439 port for connections from both the EMR primary and secondary security groups.
  2. Create a test Sqoop job that reads the data from the Amazon Redshift table and writes to Amazon S3.
  3. Substitute the <last_value> to a timestamp value older than the value in TS1 column of the table EMRBLOG.SQOOP_LOAD_TBL and the <Target S3 folder>.
    Sqoop job -Dmapred.job.queue.name=Sqoop \
    --create Sqoop_redshift_extract \
    -- import \
    --connect 'jdbc:postgresql://redshift-clusterxx.xxxxxxxx.us-east-1.redshift.amazonaws.com:5439/dev?user=awsuser&password=xxxxxxxx’ \
    --fields-terminated-by '|' \
    --null-non-string '' \
    --null-string '' \
    --target-dir S3n://<Target S3 folder> \
    --query "Select * from EMRBLOG.SQOOP_LOAD_TBL where \$CONDITIONS" \
    --m 1 \
    --append \ 
    --check-column "ts1" \
    --incremental lastmodified \ 
    --last-value "<last_value>"

  4. List the job to verify if it was created correctly, and run the Sqoop job using the following command:
    # list the job to confirm creation
    Sqoop job —list 
    # run the job
    Sqoop job —exec Sqoop_redshift_extract

  5. Monitor the job and the use of the application queues using the scheduler option on the Resource Manager UI.

You should notice that the Sqoop job is using On-Demand nodes and the Sqoop queue under it. There is no usage of Spot nodes.

Create the Spark job and monitor YARN

The Spark job (emr_union_job.py) reads a mock Parquet dataset from Amazon S3. It uses an argument count to union multiple copies of the dataset, and sorts the result data before writing it to the Amazon S3 output location. Because this job consumes a large amount of memory for performing the union and sort operations, it triggers the Spark executors to scale up Spot nodes. When the job is complete, the EMR cluster should scale down the Spot nodes.

The count value can be varied between 1–8 to achieve varying cluster scaling and runtimes for the job based on the volume of data being unioned.

  1. Run the Spark job as a step on the EMR cluster using command-runner.jar, as shown in the following screenshot.
  2. Use the following sample command to submit the Spark job (emr_union_job.py).
    It takes in three arguments:

    • <input_full_path> – The Amazon S3 location of the data file that is read in by the Spark job. The path should not be changed. The input_full_path is s3://aws-blogs-artifacts-public/artifacts/BDB-1737/sample-data/input/part-00000-a0885743-e0cb-48b1-bc2b-05eb748ab898-c000.snappy.parquet.
    • <output_path> – The Amazon S3 folder where the results are written to.
    • <number of copies to be unioned> – By varying this argument, we can use the Spark job to trigger varying job runtimes and varying scaling of Spot nodes.
    spark-submit --deploy-mode cluster s3://aws-blogs-artifacts-public/artifacts/BDB-1737/scripts/emr_union_job.py s3://aws-blogs-artifacts-public/artifacts/BDB-1737/sample-data/input/part-00000-a0885743-e0cb-48b1-bc2b-05eb748ab898-c000.snappy.parquet s3://mydatarr/union_out/ 6

  3. Review the Resource Manager UI and choose the Scheduler tab.
    You should see that only the resources under the default queue are being used both in ON_DEMAND and SPOT partitions.
  4. On the Hardware tab of the EMR cluster, verify if the cluster has scaled up the Spot Instances of the task nodes.Because Spark job executors are configured to use Spot nodes exclusively, the Spot nodes should scale up while the Spark job is running.
  5. After the job is complete, verify if the Spot nodes have scaled down to 0, indicating that the workload has run successfully.

Clean up

To help prevent unwanted charges to your AWS account, you can delete the AWS resources that you used for this walkthrough:

  • Amazon Redshift cluster
  • Amazon RDS database
  • EMR cluster

Conclusion

In this post, you learned how to configure an EMR cluster with managed scaling, assign node labels, and use a capacity scheduler to run mixed workload jobs on the EMR cluster. You created an Amazon Redshift cluster for sourcing data, Amazon RDS for Sqoop metadata, a Sqoop job to import data from Amazon Redshift to Amazon EMR, and a Spark job to test managed scaling and the usage of the capacity scheduler queues.

You also observed how to run Sqoop jobs on On-Demand nodes to provide resilience, whereas Spark jobs use inexpensive Spot nodes, which scale up and down based on the workload.

We used a sample capacity scheduler queue configuration for this post; you should adjust it for your specific workload requirements. Furthermore, you can create additional scheduler queues to meet more complex requirements.

We also showed how you can apply automation of the configuration for EMR cluster creation.

For more information about managed scaling and optimizing EC2 Spot usage, refer to Introducing Amazon EMR Managed Scaling – Automatically Resize Clusters to Lower Cost and Best practices for running Apache Spark applications using Amazon EC2 Spot Instances with Amazon EMR.

Appendix

The following code is the emr_union_job.py script:

#----------------------------------------------------------------------------------------
# Author: Ramesh Raghupathy
# Date: 06/15/2022
# Description: This pyspark script reads in 3 arguments (Input file name, multiply count 
# and outputPath (S3)). It reads the Input file and unions it as 
# many times as the Multiply count. By varying multiply count it is
# easy to generate different size of the workload required for testing
# managed scaling of a EMR cluster.
#----------------------------------------------------------------------------------------
from __future__ import print_function
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master("yarn") \
        .appName("Generic Union") \
        .getOrCreate()

import sys
import time
from pyspark.sql import DataFrame
from functools import reduce

if (len(sys.argv) < 2):
    print ("Insufficient args ")
    quit()

ip_full_path = sys.argv[1]
outputPath = sys.argv[2].strip()
multiply_count = int(sys.argv[3].strip())
delimiter = '|'

ip_file = ip_full_path


print ("----------- Args Start '-------------")
print (multiply_count)
print (ip_full_path)
print (outputPath)
print ("----------- Args Done -------------")

ip_df  = spark.read.parquet(ip_file)

ip_df.show(20)

df_union = ip_df

for i in range(multiply_count):
    df_union = df_union.union(df_union)
    print(df_union.count())

df_union.show(30)
df_union.sort("custkey", "orderkey","comment1").show(50)

df_union.write.format("csv").mode("overwrite").option("compression", "bzip2").option("delimiter", delimiter).option("ignoreTrailingWhiteSpace", False).option("ignoreLeadingWhiteSpace", False).option("nullValue", "").option("emptyValue","").option("multiline","True").save(outputPath)


About the Authors

Ramesh Raghupathy is a Senior Data Architect with WWCO ProServe at AWS. He works with AWS customers to architect, deploy, and migrate to data warehouses and data lakes on the AWS Cloud. While not at work, Ramesh enjoys traveling, spending time with family, and yoga.

Kiran Guduguntla is a WW Go-to-Market Specialist for Amazon EMR at AWS. He works with AWS customers across the globe to strategize, build, develop, and deploy modern data analytics solutions.