All posts by Francisco Oliveira

Orchestrate big data workflows with Apache Airflow, Genie, and Amazon EMR: Part 2

Post Syndicated from Francisco Oliveira original https://aws.amazon.com/blogs/big-data/orchestrate-big-data-workflows-with-apache-airflow-genie-and-amazon-emr-part-2/

Large enterprises running big data ETL workflows on AWS operate at a scale that services many internal end-users and runs thousands of concurrent pipelines. This, together with a continuous need to update and extend the big data platform to keep up with new frameworks and the latest releases of big data processing frameworks, requires an efficient architecture and organizational structure that both simplifies management of the big data platform and promotes easy access to big data applications.

In Part 1 of this post series, you learned how to use Apache Airflow, Genie, and Amazon EMR to manage big data workflows.

This post guides you through deploying the AWS CloudFormation templates, configuring Genie, and running an example workflow authored in Apache Airflow.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Solution overview

This solution uses an AWS CloudFormation template to create the necessary resources.

Users access the Apache Airflow Web UI and the Genie Web UI via SSH tunnel to the bastion host.

The Apache Airflow deployment uses Amazon ElastiCache for Redis as a Celery backend, Amazon EFS as a mount point to store DAGs, and Amazon RDS PostgreSQL for database services.

Genie uses Apache Zookeeper for leader election, an Amazon S3 bucket to store configurations (binaries, application dependencies, cluster metadata), and Amazon RDS PostgreSQL for database services. Genie submits jobs to an Amazon EMR cluster.

The architecture in this post is for demo purposes. In a production environment, the Apache Airflow and the Genie instances should be part of an Auto Scaling Group. For more information, see Deployment on the Genie Reference Guide.

The following diagram illustrates the solution architecture.

Creating and storing admin passwords in AWS Systems Manager Parameter Store

This solution uses AWS Systems Manager Parameter Store to store the passwords used in the configuration scripts. With AWS Systems Manager Parameter Store, you can create secure string parameters, which are parameters that have a plaintext parameter name and an encrypted parameter value. Parameter Store uses AWS KMS to encrypt and decrypt the parameter values of secure string parameters.

Before deploying the AWS CloudFormation templates, execute the following AWS CLI commands. These commands create AWS Systems Manager Parameter Store parameters to store the passwords for the RDS master user, the Airflow DB administrator, and the Genie DB administrator.

aws ssm put-parameter --name "/GenieStack/RDS/Settings" --type SecureString --value "ch4ng1ng-s3cr3t" --region Your-AWS-Region

aws ssm put-parameter --name "/GenieStack/RDS/AirflowSettings" --type SecureString --value "ch4ng1ng-s3cr3t" --region Your-AWS-Region

aws ssm put-parameter --name "/GenieStack/RDS/GenieSettings" --type SecureString --value "ch4ng1ng-s3cr3t" --region Your-AWS-Region

Creating an Amazon S3 Bucket for the solution and uploading the solution artifacts to S3

This solution uses Amazon S3 to store all artifacts used in the solution. Before deploying the AWS CloudFormation templates, create an Amazon S3 bucket and download the artifacts required by the solution from this link.

Unzip the artifacts required by the solution and upload the airflow and genie directories to the Amazon S3 bucket you just created. Keep a record of the Amazon S3 root path because you add it as a parameter to the AWS CloudFormation template later.

As an example, the following screenshot uses the root location geniestackbucket.

Use the value of the Amazon S3 Bucket you created for the AWS CloudFormation parameters GenieS3BucketLocation and AirflowBucketLocation.

Deploying the AWS CloudFormation stack

To launch the entire solution, choose Launch Stack.

The following table explains the parameters that the template requires. You can accept the default values for any parameters not in the table. For the full list of parameters, see the AWS CloudFormation template.

ParameterValue
Location of the configuration artifactsGenieS3BucketLocationThe S3 bucket with Genie artifacts and Genie’s installation scripts. For example: geniestackbucket.
AirflowBucketLocationThe S3 bucket with the Airflow artifacts. For example: geniestackbucket.
NetworkingSSHLocationThe IP address range to SSH to the Genie, Apache Zookeeper, and Apache Airflow EC2 instances.
SecurityBastionKeyNameAn existing EC2 key pair to enable SSH access to the bastion host instance.
AirflowKeyNameAn existing EC2 key pair to enable SSH access to the Apache Airflow instance.
ZKKeyNameAn existing EC2 key pair to enable SSH access to the Apache Zookeeper instance.
GenieKeyNameAn existing EC2 key pair to enable SSH access to the Genie.
EMRKeyNameAn existing Amazon EC2 key pair to enable SSH access to the Amazon EMR cluster.
LoggingemrLogUriThe S3 location to store Amazon EMR cluster Logs. For example: s3://replace-with-your-bucket-name/emrlogs/

Post-deployment steps

To access the Apache Airflow and Genie Web Interfaces, set up an SSH and configure a SOCKS proxy for your browser. Complete the following steps:

  1. On the AWS CloudFormation console, choose the stack you created.
  2. Choose the Outputs
  3. Find the public DNS of the bastion host instance.The following screenshot shows the instance this post uses.
  4. Set up an SSH tunnel to the master node using dynamic port forwarding.
    Instead of using the master public DNS name of your cluster and the username hadoop, which the walkthrough references, use the public DNS of the bastion host instance and replace the user hadoop for the user ec2-user.
  1. Configure the proxy settings to view websites hosted on the master node.
    You do not need to modify any of the steps in the walkthrough.

This process configures a SOCKS proxy management tool that allows you to automatically filter URLs based on text patterns and limit the proxy settings to domains that match the form of the Amazon EC2 instance’s public DNS name.

Accessing the Web UI for Apache Airflow and Genie

To access the Web UI for Apache Airflow and Genie, complete the following steps:

  1. On the CloudFormation console, choose the stack you created.
  2. Choose the Outputs
  3. Find the URLs for the Apache Airflow and Genie Web UI.The following screenshot shows the URLs that this post uses.
  1. Open two tabs in your web browser. You will use the tabs for the Apache Airflow UI and the Genie UI.
  2. For the Foxy Proxy you configured previously, click the icon Foxy Proxy added to the top right section of your browser and choose Use proxies based on their predefined patterns and priorities.The following screenshot shows the proxy options.
  1. Enter the URL for the Apache Airflow Web UI and for the Genie Web UI on their respective tabs.

You are now ready to run a workflow in this solution.

Preparing application resources

The first step as a platform admin engineer is to prepare the binaries and configurations of the big data applications that the platform supports. In this post, the Amazon EMR clusters use release 5.26.0. Because Amazon EMR release 5.26.0 has Hadoop 2.8.5 and Spark 2.4.3 installed, those are the applications you want to support in the big data platform. If you decide to use a different EMR release, prepare binaries and configurations for those versions. The following sections guide you through the steps to prepare binaries should you wish to use a different EMR release version.

To prepare a Genie application resource, create a YAML file with fields that are sent to Genie in a request to create an application resource.

This file defines metadata information about the application, such as the application name, type, version, tags, the location on S3 of the setup script, and the location of the application binaries. For more information, see Create an Application in the Genie REST API Guide.

Tag structure for application resources

This post uses the following tags for application resources:

  • type – The application type, such as Spark, Hadoop, Hive, Sqoop, or Presto.
  • version – The version of the application, such as 2.8.5 for Hadoop.

The next section shows how the tags are defined in the YAML file for an application resource. You can add an arbitrary number of tags to associate with Genie resources. Genie also maintains their own tags in addition to the ones the platform admins define, which you can see in the field ID and field name of the files.

Preparing the Hadoop 2.8.5 application resource

This post provides an automated creation of the YAML file. The following code shows the resulting file details:

id: hadoop-2.8.5
name: hadoop
user: hadoop
status: ACTIVE
description: Hadoop 2.8.5 Application
setupFile: s3://Your_Bucket_Name/genie/applications/hadoop-2.8.5/setup.sh
configs: []
version: 2.8.5
type: hadoop
tags:
  - type:hadoop
  - version:2.8.5
dependencies:
  - s3://Your_Bucket_Name/genie/applications/hadoop-2.8.5/hadoop-2.8.5.tar.gz

The file is also available directly at s3://Your_Bucket_Name/genie/applications/hadoop-2.8.5/hadoop-2.8.5.yml.

NOTE: The following steps are for reference only, should you be completing this manually, rather than using the automation option provided.

The S3 objects referenced by the setupFile and dependencies labels are available in your S3 bucket. For your reference, the steps to prepare the artifacts used by properties setupFile and dependencies are as follows:

  1. Download hadoop-2.8.5.tar.gz from https://www.apache.org/dist/hadoop/core/hadoop-2.8.5/.
  2. Upload hadoop-2.8.5.tar.gz to s3://Your_Bucket_Name/genie/applications/hadoop-2.8.5/.

Preparing the Spark 2.4.3 application resource

This post provides an automated creation of the YAML file. The following code shows the resulting file details:

id: spark-2.4.3
name: spark
user: hadoop
status: ACTIVE
description: Spark 2.4.3 Application
setupFile: s3://Your_Bucket_Name/genie/applications/spark-2.4.3/setup.sh
configs: []
version: 2.4.3
type: spark
tags:
  - type:spark
  - version:2.4.3
  - version:2.4
dependencies:
  - s3://Your_Bucket_Name/genie/applications/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz

The file is also available directly at s3://Your_Bucket_Name/genie/applications/spark-2.4.3/spark-2.4.3.yml.

NOTE: The following steps are for reference only, should you be completing this manually, rather than using the automation option provided.

The objects in setupFile and dependencies are available in your S3 bucket. For your reference, the steps to prepare the artifacts used by properties setupFile and dependencies are as follows:

  1. Download spark-2.4.3-bin-hadoop2.7.tgz from https://archive.apache.org/dist/spark/spark-2.4.3/ .
  2. Upload spark-2.4.3-bin-hadoop2.7.tgz to s3://Your_Bucket_Name/genie/applications/spark-2.4.3/ .

Because spark-2.4.3-bin-hadoop2.7.tgz uses Hadoop 2.7 and not Hadoop 2.8.3, you need to extract the EMRFS libraries for Hadoop 2.7 from an EMR cluster running Hadoop 2.7 (release 5.11.3). This is already available in your S3 Bucket. For reference, the steps to extract the EMRFS libraries are as follows:

  1. Deploy an EMR cluster with release 5.11.3.
  2. Run the following command:
aws s3 cp /usr/share/aws/emr/emrfs/lib/emrfs-hadoop-assembly-2.20.0.jar s3://Your_Bucket_Name/genie/applications/spark-2.4.3/hadoop-2.7/aws/emr/emrfs/lib/

Preparing a command resource

The next step as a platform admin engineer is to prepare the Genie commands that the platform supports.

In this post, the workflows use Apache Spark. This section shows the steps to prepare a command resource of type Apache Spark.

To prepare a Genie command resource, create a YAML file with fields that are sent to Genie in a request to create a command resource.

This file defines metadata information about the command, such as the command name, type, version, tags, the location on S3 of the setup script, and the parameters to use during command execution. For more information, see Create a Command in the Genie REST API Guide.

Tag structure for command resources

This post uses the following tag structure for command resources:

  • type – The command type, for example, spark-submit.
  • version – The version of the command, for example, 2.4.3 for Spark.

The next section shows how the tags are defined in the YAML file for a command resource. Genie also maintains their own tags in addition to the ones the platform admins define, which you can see in the field ID and field name of the files.

Preparing the spark-submit command resource

This post provides an automated creation of the YAML file. The following code shows the resulting file details:

id: spark-2.4.3_spark-submit
name: Spark Submit 
user: hadoop 
description: Spark Submit Command 
status: ACTIVE 
setupFile: s3://Your_Bucket_Name/genie/commands/spark-2.4.3_spark-submit/setup.sh
configs: [] 
executable: ${SPARK_HOME}/bin/spark-submit --master yarn --deploy-mode client 
version: 2.4.3 
tags:
  - type:spark-submit
  - version:2.4.3
checkDelay: 5000

The file is also available at s3://Your_Bucket_Name/genie/commands/spark-2.4.3_spark-submit/spark-2.4.3_spark-submit.yml.

The objects in setupFile are available in your S3 bucket.

Preparing cluster resources

This post also automated the step to prepare cluster resources; it follows a similar process as described previously but applied to cluster resources.

During the startup of the Amazon EMR cluster, a custom script creates a YAML file with the metadata details about the cluster and uploads the file to S3. For more information, see Create a Cluster in the Genie REST API Guide.

The script also extracts all Amazon EMR libraries and uploads them to S3. The next section discusses the process of registering clusters with Genie.

The script is available at s3://Your_Bucket_Name/genie/scripts/genie_register_cluster.sh.

Tag structure for cluster resources

This post uses the following tag structure for cluster resources:

  • cluster.release – The Amazon EMR release name. For example, emr-5.26.0.
  • cluster.id – The Amazon EMR cluster ID. For example, j-xxxxxxxx.
  • cluster.name – The Amazon EMR cluster name.
  • cluster.role – The role associated with this cluster. For this post, the role is batch. Other possible roles would be ad hoc or Presto, for example.

You can add new tags for a cluster resource or change the values of existing tags by editing s3://Your_Bucket_Name/genie/scripts/genie_register_cluster.sh.

You could also use other combinations of tags, such as a tag to identify the application lifecycle environment or required custom jars.

Genie also maintains their own tags in addition to the ones the platform admins define, which you can see in the field ID and field name of the files. If multiple clusters share the same tag, by default, Genie distributes jobs across clusters associated with the same tag randomly. For more information, see Cluster Load Balancing in the Genie Reference Guide.

Registering resources with Genie

Up to this point, all the configuration activities mentioned in the previous sections were already prepared for you.

The following sections show how to register resources with Genie. In this section you will be connecting to the bastion via SSH to run configuration commands.

Registering application resources

To register the application resources you prepared in the previous section, SSH into the bastion host and run the following command:

python /tmp/genie_assets/scripts/genie_register_application_resources.py Replace-With-Your-Bucket-Name Your-AWS-Region http://replace-with-your-genie-server-url:8080

To see the resource information, navigate to the Genie Web UI and choose the Applications tab. See the following screenshot. The screenshot shows two application resources, one for Apache Spark (version 2.4.3) and the other for Apache Hadoop (version 2.8.5).

Registering commands and associate commands with applications

The next step is to register the Genie command resources with specific applications. For this post, because spark-submit depends on Apache Hadoop and Apache Spark, associate the spark-submit command with both applications.

The order you define for the applications in file genie_register_command_resources_and_associate_applications.py is important. Because Apache Spark depends on Apache Hadoop, the file first references Apache Hadoop and then Apache Spark. See the following code:

commands = [{'command_name' : 'spark-2.4.3_spark-submit', 'applications' : ['hadoop-2.8.5', 'spark-2.4.3']}]

To register the command resources and associate them with the application resources registered in the previous step, SSH into the bastion host and run the following command:

python /tmp/genie_assets/scripts/genie_register_command_resources_and_associate_applications.py Replace-With-Your-Bucket-Name Your-AWS-Region http://replace-with-your-genie-server-url:8080

To see the command you registered plus the applications it is linked to, navigate to the Genie Web UI and choose the Commands tab.

The following screenshot shows the command details and the applications it is linked to.

Registering Amazon EMR clusters

As previously mentioned, the Amazon EMR cluster deployed in this solution registers the cluster when the cluster starts via an Amazon EMR step. You can access the script that Amazon EMR clusters use at s3://Your_Bucket_Name/genie/scripts/genie_register_cluster.sh. The script also automates deregistering the cluster from Genie when the cluster terminates.

In the Genie Web UI, choose the Clusters tab. This page shows you the current cluster resources. You can also find the location of the configuration files that uploaded to the cluster S3 location during the registration step.

The following screenshot shows the cluster details and the location of configuration files (yarn-site.xml, core-site.xml, mapred-site.xml).

Linking commands to clusters

You have now registered all applications, commands, and clusters, and associated commands with the applications on which they depend. The final step is to link a command to a specific Amazon EMR cluster that is configured to run it.

Complete the following steps:

  1. SSH into the bastion host.
  2. Open /tmp/genie_assets/scripts/genie_link_commands_to_clusters.py with your preferred text editor.
  3. Look for the following lines in the code:# Change cluster_name below
    clusters = [{'cluster_name' : 'j-xxxxxxxx', 'commands' :
    ['spark-2.4.3_spark-submit']}]
  1. Replace j-xxxxxxxx in the file with the cluster_name.
    To see the name of the cluster, navigate to the Genie Web UI and choose Clusters.
  2. To link the command to a specific Amazon EMR cluster run the following command:
    python /tmp/genie_assets/scripts/genie_link_commands_to_clusters.py Replace-With-Your-Bucket-Name Your-AWS-Region http://replace-with-your-genie-server-url:8080

The command is now linked to a cluster.

In the Genie Web UI, choose the Commands tab. This page shows you the current command resources. Select spark-2.4.3_spark_submit and see the clusters associated with the command.

The following screenshot shows the command details and the clusters it is linked to.

You have configured Genie with all resources; it can now receive job requests.

Running an Apache Airflow workflow

It is out of the scope of this post to provide a detailed description of the workflow code and dataset. This section provides details of how Apache Airflow submits jobs to Genie via a GenieOperator that this post provides.

The GenieOperator for Apache Airflow

The GenieOperator allows the data engineer to define the combination of tags to identify the commands and the clusters in which the tasks should run.

In the following code example, the cluster tag is ‘emr.cluster.role:batch‘ and the command tags are ‘type:spark-submit‘ and ‘version:2.4.3‘.

spark_transform_to_parquet_movies = GenieOperator(
    task_id='transform_to_parquet_movies',
    cluster_tags=['emr.cluster.role:batch'],
    command_tags=['type:spark-submit', 'version:2.4.3'],
    command_arguments="transform_to_parquet.py s3://{}/airflow/demo/input/csv/{}  s3://{}/demo/output/parquet/{}/".format(bucket,'movies.csv',bucket,'movies'), dependencies="s3://{}/airflow/dag_artifacts/transforms/transform_to_parquet.py".format(bucket),
    description='Demo Spark Submit Job',
    job_name="Genie Demo Spark Submit Job",
    tags=['transform_to_parquet_movies'],
    xcom_vars=dict(),
    retries=3,
    dag=extraction_dag)

The property command_arguments defines the arguments to the spark-submit command, and dependencies defines the location of the code for the Apache Spark Application (PySpark).

You can find the code for the GenieOperator in the following location: s3://Your_Bucket_Name/airflow/plugins/genie_plugin.py.

One of the arguments to the DAG is the Genie connection ID (genie_conn_id). This connection was created during the automated setup of the Apache Airflow Instance. To see this and other existing connections, complete the following steps:

  1. In the Apache Airflow Web UI, choose the Admin
  2. Choose Connections.

The following screenshot shows the connection details.

The Airflow variable s3_location_genie_demo reference in the DAG was set during the installation process. To see all configured Apache Airflow variables, complete the following steps:

  1. In the Apache Airflow Web UI, choose the Admin
  2. Choose Variables.

The following screenshot shows the Variables page.

Triggering the workflow

You can now trigger the execution of the movie_lens_transfomer_to_parquet DAG. Complete the following steps:

  1. In the Apache Airflow Web UI, choose the DAGs
  2. Next to your DAG, change Off to On.

The following screenshot shows the DAGs page.

For this example DAG, this post uses a small subset of the movielens dataset. This dataset is a popular open source dataset, which you can use in exploring data science algorithms. Each dataset file is a comma-separated values (CSV) file with a single header row. All files are available in your solution S3 bucket under s3://Your_Bucket_Name/airflow/demo/input/csv .

movie_lens_transfomer_to_parquet is a simple workflow that triggers a Spark job that converts input files from CSV to Parquet.

The following screenshot shows a graphical representation of the DAG.

In this example DAG, after transform_to_parquet_movies concludes, you can potentially execute four tasks in parallel. Because the DAG concurrency is set to 3, as seen in the following code example, only three tasks can run at the same time.

# Initialize the DAG
# Concurrency --> Number of tasks allowed to run concurrently
extraction_dag = DAG(dag_name,
          default_args=dag_default_args,
          start_date=start_date,
          schedule_interval=schedule_interval,
          concurrency=3,
          max_active_runs=1
          )

Visiting the Genie job UI

The GenieOperator for Apache Airflow submitted the jobs to Genie. To see job details, in the Genie Web UI, choose the Jobs tab. You can see details such as the jobs submitted, their arguments, the cluster it is running, and the job status.

The following screenshot shows the Jobs page.

You can now experiment with this architecture by provisioning a new Amazon EMR cluster, registering it with a new value (for example, “production”) for Genie tag “emr.cluster.role”, linking the cluster to a command resource, and updating the tag combination in the GenieOperator used by some of the tasks in the DAG.

Cleaning up

To avoid incurring future charges, delete the resources and the S3 bucket created for this post.

Conclusion

This post showed how to deploy an AWS CloudFormation template that sets up a demo environment for Genie, Apache Airflow, and Amazon EMR. It also demonstrated how to configure Genie and use the GenieOperator for Apache Airflow.

 


About the Authors

Francisco Oliveira is a senior big data solutions architect with AWS. He focuses on building big data solutions with open source technology and AWS. In his free time, he likes to try new sports, travel and explore national parks.

 

 

 

Jelez Raditchkov is a practice manager with AWS.

 

 

 

 

Prasad Alle is a Senior Big Data Consultant with AWS Professional Services. He spends his time leading and building scalable, reliable Big data, Machine learning, Artificial Intelligence and IoT solutions for AWS Enterprise and Strategic customers. His interests extend to various technologies such as Advanced Edge Computing, Machine learning at Edge. In his spare time, he enjoys spending time with his family.

Orchestrate big data workflows with Apache Airflow, Genie, and Amazon EMR: Part 1

Post Syndicated from Francisco Oliveira original https://aws.amazon.com/blogs/big-data/orchestrate-big-data-workflows-with-apache-airflow-genie-and-amazon-emr-part-1/

Large enterprises running big data ETL workflows on AWS operate at a scale that services many internal end-users and runs thousands of concurrent pipelines. This, together with a continuous need to update and extend the big data platform to keep up with new frameworks and the latest releases of big data processing frameworks, requires an efficient architecture and organizational structure that both simplifies management of the big data platform and promotes easy access to big data applications.

This post introduces an architecture that helps centralized platform teams maintain a big data platform to service thousands of concurrent ETL workflows, and simplifies the operational tasks required to accomplish that.

Architecture components

At high level, the architecture uses two open source technologies with Amazon EMR to provide a big data platform for ETL workflow authoring, orchestration, and execution. Genie provides a centralized REST API for concurrent big data job submission, dynamic job routing, central configuration management, and abstraction of the Amazon EMR clusters. Apache Airflow provides a platform for job orchestration that allows you to programmatically author, schedule, and monitor complex data pipelines. Amazon EMR provides a managed cluster platform that can run and scale Apache Hadoop, Apache Spark, and other big data frameworks.

The following diagram illustrates the architecture.

Apache Airflow

Apache Airflow is an open source tool for authoring and orchestrating big data workflows.

With Apache Airflow, data engineers define direct acyclic graphs (DAGs). DAGs describe how to run a workflow and are written in Python. Workflows are designed as a DAG that groups tasks that are executed independently. The DAG keeps track of the relationships and dependencies between tasks.

Operators define a template to define a single task in the workflow. Airflow provides operators for common tasks, and you can also define custom operators. This post discusses the custom operator (GenieOperator) to submit tasks to Genie.

A task is a parameterized instance of an operator. After an operator is instantiated, it’s referred to as a task. A task instance represents a specific run of a task. A task instance has an associated DAG, task, and point in time.

You can run DAGs and tasks on demand or schedule them to run at a specific time defined as a cron expression in the DAG.

For additional details on Apache Airflow, see Concepts in the Apache Airflow documentation.

Genie

Genie is an open source tool by Netflix that provides configuration-management capabilities and dynamic routing of jobs by abstracting access to the underlining Amazon EMR clusters.

Genie provides a REST API to submit jobs from big data applications such as Apache Hadoop MapReduce or Apache Spark. Genie manages the metadata of the underlining clusters and the commands and applications that run in the clusters.

Genie abstracts access to the processing clusters by associating one or more tags with the clusters. You can also associate tags with the metadata details for the applications and commands that the big data platform supports. As Genie receives job submissions for specific tags, it uses a combination of the cluster/command tag to route each job to the correct EMR cluster dynamically.

Genie’s data model

Genie provides a data model to capture the metadata associated with resources in your big data environment.

An application resource is a reusable set of binaries, configuration files, and setup files to install and configure applications supported by the big data platform on the Genie node that submits the jobs to the clusters. When Genie receives a job, the Genie node downloads all dependencies, configuration files, and setup files associated with the applications and stores it in a job working directory. Applications are linked to commands because they represent the binaries and configurations needed before a command runs.

Command resources represent the parameters when using the command line to submit work to a cluster and which applications need to be available on the PATH to run the command. Command resources glue metadata components together. For example, a command resource representing a Hive command would include a hive-site.xml and be associated with a set of application resources that provide the Hive and Hadoop binaries needed to run the command. Moreover, a command resource is linked to the clusters it can run on.

A cluster resource identifies the details of an execution cluster, including connection details, cluster status, tags, and additional properties. A cluster resource can register with Genie during startup and deregister during termination automatically. Clusters are linked to one or more commands that can run in it. After a command is linked to a cluster, Genie can start submitting jobs to the cluster.

Lastly, there are three job resource types: job request, job, and job execution. A job request resource represents the request submission with details to run a job. Based on the parameters submitted in the request, a job resource is created. The job resource captures details such as the command, cluster, and applications associated with the job. Additionally, information on status, start time, and end time is also available on the job resource. A job execution resource provides administrative details so you can understand where the job ran.

For more information, see Data Model on the Genie Reference Guide.

Amazon EMR and Amazon S3

Amazon EMR is a managed cluster platform that simplifies running big data frameworks, such as Apache Hadoop and Apache Spark, on AWS to process and analyze vast amounts of data. For more information, see Overview of Amazon EMR Architecture and Overview of Amazon EMR.

Data is stored in Amazon S3, an object storage service with scalable performance, ease-of-use features, and native encryption and access control capabilities. For more details on S3, see Amazon S3 as the Data Lake Storage Platform.

Architecture deep dive

Two main actors interact with this architecture: platform admin engineers and data engineers.

Platform admin engineers have administrator access to all components. They can add or remove clusters, and configure the applications and the commands that the platform supports.

Data engineers focus on writing big data applications with their preferred frameworks (Apache Spark, Apache Hadoop MR, Apache Sqoop, Apache Hive, Apache Pig, and Presto) and authoring python scripts to represent DAGs.

At high level, the team of platform admin engineers prepares the supported big data applications and its dependencies and registers them with Genie. The team of platform admin engineers launches Amazon EMR clusters that register with Genie during startup.

The team of platform admin engineers associates each Genie metadata resource (applications, commands, and clusters) with Genie tags. For example, you can associate a cluster resource with a tag named environment and the value can be “Production Environment”, “Test Environment”, or “Development Environment”.

Data engineers author workflows as Airflow DAGs and use a custom Airflow Operator—GenieOperator—to submit tasks to Genie. They can use a combination of tags to identify the type of tasks they are running plus where the tasks should run. For example, you might need to run Apache Spark 2.4.3 tasks in the environment identified by the “Production Environment” tag. To do this, set the cluster and command tags in the Airflow GenieOperator as the following code:

(cluster_tags=['emr.cluster.environment:production'],command_tags=['type:spark-submit','ver:2.4.3'])

The following diagram illustrates this architecture.

The workflow, as it corresponds to the numbers in this diagram are as follows:

  1. A platform admin engineer prepares the binaries and dependencies of the supported applications (Spark-2.4.5, Spark-2.1.0, Hive-2.3.5, etc.). The platform admin engineer also prepares commands (spark-submit, hive). The platform admin engineer registers applications and commands with Genie. Moreover, the platform admin engineer associates commands with applications and links commands to a set of clusters after step 2 (below) is concluded.
  2. Amazon EMR cluster(s) register with Genie during startup.
  3. A data engineer authors Airflow DAGs and uses the Genie tag to reference the environment, application, command or any combination of the above. In the workflow code, the data engineer uses the GenieOperator. The GenieOperator submits jobs to Genie.
  4. A schedule triggers workflow execution or a data engineer manually triggers the workflow execution. The jobs that compose the workflow are submitted to Genie for execution with a set of Genie tags that specify where the job should be run.
  5. The Genie node, working as the client gateway, will set up a working directory with all binaries and dependencies. Genie dynamically routes the jobs to the cluster(s) associated with the provided Genie tags. The Amazon EMR clusters run the jobs.

For details on the authorization and authentication mechanisms supported by Apache Airflow and Genie see Security in the Apache Airflow documentation and Security in the Genie documentation.  This architecture pattern does not expose SSH access to the Amazon EMR clusters. For details on providing different levels of access to data in Amazon S3 through EMR File System (EMRFS), see Configure IAM Roles for EMRFS Requests to Amazon S3.

Use cases enabled by this architecture

The following use cases demonstrate the capabilities this architecture provides.

Managing upgrades and deployments with no downtime and adopting the latest open source release

In a large organization, teams that use the data platform use heterogeneous frameworks and different versions. You can use this architecture to support upgrades with no downtime and offer the latest version of open source frameworks in a short amount of time.

Genie and Amazon EMR are the key components to enable this use case. As the Amazon EMR service team strives to add the latest version of the open source frameworks running on Amazon EMR in a short release cycle, you can keep up with your internal teams’ needs of the latest features of their preferred open source framework.

When a new version of the open source framework is available, you need to test it, add the new supported version and its dependencies to Genie, and move tags in the old cluster to the new one. The new cluster takes new job submissions, and the old cluster concludes jobs it is still running.

Moreover, because Genie centralizes the location of application binaries and its dependencies, upgrading binaries and dependencies in Genie also upgrades any upstream client automatically. Using Genie removes the need for upgrading all upstream clients.

Managing a centralized configuration, job and cluster status, and logging

In a universe of thousands of jobs and multiple clusters, you need to identify where a specific job is running and access logging details quickly. This architecture gives you visibility into jobs running on the data platform, logging of jobs, clusters, and their configurations.

Having programmatic access to the big data platform

This architecture enables a single point of job submissions by using Genie’s REST API. Access to the underlying cluster is abstracted through a set of APIs that enable administration tasks plus submitting jobs to the clusters. A REST API call submits jobs into Genie asynchronously. If accepted, a job-id is returned that you can use to get job status and outputs programmatically via API or web UI. A Genie node sets up the working directory and runs the job on a separate process.

You can also integrate this architecture with continuous integration and continuous delivery (CI/CD) pipelines for big data application and Apache Airflow DAGs.

Enabling scalable client gateways and concurrent job submissions

The Genie node acts as a client gateway (edge node) and can scale horizontally to make sure the client gateway resources used to submit jobs to the data platform meet demand. Moreover, Genie allows the submission of concurrent jobs.

When to use this architecture

This architecture is recommended for organizations that use multiple large, multi-tenant processing clusters instead of transient clusters. It is out of the scope of this post to address when organizations should consider always-on clusters versus transient clusters (you can use an EMR Airflow Operator to spin up Amazon EMR clusters that register with Genie, run a job, and tear them down). You should use Reserved Instances with this architecture. For more information, see Using Reserved Instances.

This architecture is especially recommended for organizations that choose to have a central platform team to administer and maintain a big data platform that supports many internal teams that require thousands of jobs to run concurrently.

This architecture might not make sense for organizations that are not at as large or don’t expect to grow to that scale. The benefits of cluster abstraction and centralized configuration management are ideal in bringing structured access to a potentially chaotic environment of thousands of concurrent workflows and hundreds of teams.

This architecture is also recommended for organizations that support a high percentage of multi-hour or overlapping workflows and heterogeneous frameworks (Apache Spark, Apache Hive, Apache Pig, Apache Hadoop MapReduce, Apache Sqoop, or Presto).

If your organization relies solely on Apache Spark and is aligned with the recommendations discussed previously, this architecture might still apply. For organizations that don’t have the scale to justify the need for centralized REST API for job submission, cluster abstraction, dynamic job routing, or centralized configuration management, Apache Livy plus Amazon EMR might be the appropriate option. Genie has its own scalable infrastructure that acts as the edge client. This means that Genie does not compete with Amazon EMR master instance resources, whereas Apache Livy does.

If the majority of your organization’s workflows are a few short-lived jobs, opting for a serverless processing layer, serverless ad hoc querying layer, or using dedicated transient Amazon EMR clusters per workflow might be more appropriate. If the majority of your organization’s workflows are composed of thousands of short-lived jobs, the architecture still applies because it removes the need to spin up and down clusters.

This architecture is recommended for organizations that require full control of the processing platform to optimize component performance. Moreover, this architecture is recommended for organizations that need to enforce centralized governance on their workflows via CI/CD pipelines.

It is out of the scope of this post to evaluate different orchestration options or the benefits of adopting Airflow as the orchestration layer. When considering adopting an architecture, also consider the existing skillset and time to adopt tooling. The open source nature of Genie may allow you to integrate other orchestration tools. Evaluating that route might be an option if you wish to adopt this architecture with another orchestration tool.

Conclusion

This post presented how to use Apache Airflow, Genie, and Amazon EMR to manage big data workflows. The post described the architecture components, the use cases the architecture supports, and when to use it. The second part of this post deploys a demo environment and walks you through the steps to configure Genie and use the GenieOperator for Apache Airflow.

 


About the Author

Francisco Oliveira is a senior big data solutions architect with AWS. He focuses on building big data solutions with open source technology and AWS. In his free time, he likes to try new sports, travel and explore national parks.

 

 

 

Jelez Raditchkov is a practice manager with AWS.

Migrate to Apache HBase on Amazon S3 on Amazon EMR: Guidelines and Best Practices

Post Syndicated from Francisco Oliveira original https://aws.amazon.com/blogs/big-data/migrate-to-apache-hbase-on-amazon-s3-on-amazon-emr-guidelines-and-best-practices/

This blog post provides guidance and best practices about how to migrate from Apache HBase on HDFS to Apache HBase on Amazon S3 on Amazon EMR.

Apache HBase on Amazon S3 on Amazon EMR

Amazon EMR version 5.2.0 or later, lets you run Apache HBase on Amazon S3. By using Amazon S3 as a data store for Apache HBase, you can separate your cluster’s storage and compute nodes. This saves costs because you’re sizing your cluster for your compute requirements. You’re not paying to store your entire dataset with 3x replication in the on-cluster HDFS.

Many customers have taken advantage of the benefits of running Apache HBase on Amazon S3 for data storage. These benefits include lower costs, data durability, and more efficient scalability. Customers, such as the Financial Industry Regulatory Agency (FINRA), have lowered their costs by 60% by moving to an Apache HBase on Amazon S3 architecture. They have also experienced operational benefits that come with decoupling storage from compute and using Amazon S3 as the storage layer.

Whitepaper on Migrating to Apache HBase on Amazon S3 on Amazon EMR

This whitepaper walks you through the stages of a migration. It also helps you determine when to choose Apache HBase on Amazon S3 on Amazon EMR, plan for platform security, tune Apache HBase and EMRFS to support your application SLA, identify options to migrate and restore your data, and manage your cluster in production.

For more information, see Migrating to Apache HBase on Amazon S3 on Amazon EMR


Additional Reading

If you found this post useful, be sure to check out Setting up Read Replica Clusters with HBase on Amazon S3, and Tips for Migrating to Apache HBase on Amazon S3 from HDFS.

 


About the Author

Francisco Oliveira is a Senior Big Data Engineer with AWS Professional Services. He focuses on building big data solutions with open source technology and AWS. In his free time, he likes to try new sports, travel and explore national parks.

 

Submitting User Applications with spark-submit

Post Syndicated from Francisco Oliveira original https://blogs.aws.amazon.com/bigdata/post/Tx578UTQUV7LRP/Submitting-User-Applications-with-spark-submit

Francisco Oliveira is a consultant with AWS Professional Services

Customers starting their big data journey often ask for guidelines on how to submit user applications to Spark running on Amazon EMR. For example, customers ask for guidelines on how to size memory and compute resources available to their applications and the best resource allocation model for their use case.

In this post, I show how to set spark-submit flags to control the memory and compute resources available to your application submitted to Spark running on EMR. I discuss when to use the maximizeResourceAllocation configuration option and dynamic allocation of executors.

Spark execution model

At a high level, each application has a driver program that distributes work in the form of tasks among executors running on several nodes of the cluster.

The driver is the application code that defines the transformations and actions applied to the data set. At its core, the driver has instantiated an object of the SparkContext class. This object allows the driver to acquire a connection to the cluster, request resources, split the application actions into tasks, and schedule and launch tasks in the executors.

The executors not only perform tasks sent by the driver but also store data locally. As the executors are created and destroyed (see the “Enabling dynamic allocation of executors” section later), they register and deregister with the driver. The driver and the executors communicate directly.

To execute your application, the driver organizes the work to be accomplished in jobs. Each job is split into stages and each stage consists of a set of independent tasks that run in parallel. A task is the smallest unit of work in Spark and executes the same code, each on a different partition.

Spark programming model

An important abstraction in Spark is the resilient distributed dataset (RDD). This abstraction is key to perform in-memory computations. An RDD is a collection of read-only and immutable partitions of data that are distributed across the nodes of the cluster. Partitions in Spark allow the parallel execution of subsets of the data. Spark applications create RDDs and apply operations to RDDs. Although Spark partitions RDDs automatically, you can also set the number of partitions.

RDDs support two types of operations: transformation and actions. Transformations are operations that generate a new RDD, and actions are operations that write data to external storage or return a value to the driver after running a transformation on the dataset. Common transformations include operations that filter, sort and group by key. Common actions include operations that collect the results of tasks and ship them to the driver, save an RDD, or count the number of elements in a RDD.

spark-submit

A common way to launch applications on your cluster is by using the spark-submit script. This script offers several flags that allow you to control the resources used by your application.

Setting the spark-submit flags is one of the ways to dynamically supply configurations to the SparkContext object that is instantiated in the driver. spark-submit can also read configuration values set in the conf/spark-defaults.conf file which you can set using EMR configuration options when creating your cluster and, although not recommended, hardcoded in the application. An alternative to change conf/spark-defaults.conf is to use the –conf prop=value flag. I present both the spark-submit flag and the property name to use in the spark-defaults.conf file and –conf flag.

Spark applications running on EMR

Any application submitted to Spark running on EMR runs on YARN, and each Spark executor runs as a YARN container. When running on YARN, the driver can run in one YARN container in the cluster (cluster mode) or locally within the spark-submit process (client mode).

When running in cluster mode, the driver runs on ApplicationMaster, the component that submits YARN container requests to the YARN ResourceManager according to the resources needed by the application. A simplified and high-level diagram of the application submission process is shown below.

When running in client mode, the driver runs outside ApplicationMaster, in the spark-submit script process from the machine used to submit the application.

Setting the location of the driver

With spark-submit, the flag –deploy-mode can be used to select the location of the driver.

Submitting applications in client mode is advantageous when you are debugging and wish to quickly see the output of your application. For applications in production, the best practice is to run the application in cluster mode. This mode offers you a guarantee that the driver is always available during application execution. However, if you do use client mode and you submit applications from outside your EMR cluster (such as locally, on a laptop), keep in mind that the driver is running outside your EMR cluster and there will be higher latency for driver-executor communication.

Setting the driver resources

The size of the driver depends on the calculations the driver performs and on the amount of data it collects from the executors. When running the driver in cluster mode, spark-submit provides you with the option to control the number of cores (–driver-cores) and the memory (–driver-memory) used by the driver. In client mode, the default value for the driver memory is 1024 MB and one core.

Setting the number of cores and the number of executors

The number of executor cores (–executor-cores or spark.executor.cores) selected defines the number of tasks that each executor can execute in parallel. The best practice is to leave one core for the OS and about 4-5 cores per executor. The number of cores requested is constrained by the configuration property yarn.nodemanager.resource.cpu-vcores, which controls the number of cores available to all YARN containers running in one node and is set in the yarn-site.xml file.

The number of executors per node can be calculated using the following formula:

number of executors per node = number of cores on node – 1 for OS/number of task per executor

The total number of executors (–num-executors or spark.executor.instances) for a Spark job is:

total number of executors = number of executors per node * number of instances -1.

Setting the memory of each executor

The memory space of each executor container is subdivided on two major areas: the Spark executor memory and the memory overhead.

Note that the maximum memory that can be allocated to an executor container is dependent on the yarn.nodemanager.resource.memory-mb property available at yarn-site.xml. The executor memory (–executor-memory or spark.executor.memory) defines the amount of memory each executor process can use. The memory overhead (spark.yarn.executor.memoryOverHead) is off-heap memory and is automatically added to the executor memory. Its default value is executorMemory * 0.10.

Executor memory unifies sections of the heap for storage and execution purposes. These two subareas can now borrow space from one another if usage is exceeded. The relevant properties are spark.memory.fraction and spark.memory.storageFraction. For more information, see the Unified Memory Management in Spark 1.6 whitepaper.

The memory of each executor can be calculated using the following formula:

 memory of each executor = max container size on node / number of executors per node

A quick example

To show how you can set the flags I have covered so far, I submit the wordcount example application and then use the Spark history server for a graphical view of the execution.

First, I submit a modified word count sample application as an EMR step to my existing cluster. The code can be seen below:

from __future__ import print_function
from pyspark import SparkContext
import sys
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: wordcount ", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="WordCount")
text_file = sc.textFile(sys.argv[1])
counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile(sys.argv[2])
sc.stop()

The cluster has six m3.2xlarge instances plus one instance for the master, each with 8 vCPU and 30 GB of memory. The default value of yarn.nodemanager.resource.memory-mb for this instance type is 23 GB.

According to the formulas above, the spark-submit command would be as follows:

spark-submit –deploy-mode cluster –master yarn –num-executors 5 –executor-cores 5 –executor-memory 20g –conf spark.yarn.submit.waitAppCompletion=false wordcount.py s3://inputbucket/input.txt s3://outputbucket/

I submit the application as an EMR step with the following command:

aws emr add-steps –cluster-id j-xxxxx –steps Type=spark,Name=SparkWordCountApp,Args=[–deploy-mode,cluster,–master,yarn,–conf,spark.yarn.submit.waitAppCompletion=false,–num-executors,5,–executor-cores,5,–executor-memory,20g,s3://codelocation/wordcount.py,s3://inputbucket/input.txt,s3://outputbucket/],ActionOnFailure=CONTINUE

Note that I am also setting the property spark.yarn.submit.waitAppCompletion with the step definitions. When this property is set to false, the client submits the application and exits, not waiting for the application to complete. This setting allows you to submit multiple applications to be executed simultaneously by the cluster and is only available in cluster mode.

I use the default values for –driver-memory and –driver-cores, as the sample application is writing directly to Amazon S3 and the driver is not receiving any data from the executors.

Enabling dynamic allocation of executors

Spark on YARN has the ability to dynamically scale up and down the number of executors. This feature can be valuable when you have multiple applications being processed simultaneously as idle executors are released and an application can request additional executors on demand.

To enable this feature, please see the steps in the EMR documentation.

Spark provides granular control to the dynamic allocation mechanism by providing the following properties:

Initial number of executors (spark.dynamicAllocation.initalExecutors)

Minimum number of executors to be used by the application (spark.dynamicAllocation.minExecutors)

Maximum executors that can be requested (spark.dynamicAllocation.maxExecutors)

When to remove an idle executor (sparkdynamicAllocation.executorIdleTime)

When to request new executors to process waiting tasks (spark.dynamicAllocation.schedulerBacklogTimeout and spark.dynamicAllocation.sustainedSchedulerBacklogTimeout)

Automatically configure executors with maximum resource allocation

EMR provides an option to automatically configure the properties above in order to maximize the resource usage of the entire cluster. This configuration option can be valuable when you have only a single application being processed by your cluster at a time. Its usage should be avoided when you expect to run multiple applications simultaneously.

To enable this configuration option, please see the steps in the EMR documentation.

By setting this configuration option during cluster creation, EMR automatically updates the spark-defaults.conf file with the properties that control the compute and memory resources of an executor, as follows:

spark.executor.memory = (yarn.scheduler.maximum-allocation-mb – 1g) -spark.yarn.executor.memoryOverhead

spark.yarn.executor.memoryOverhead = (yarn.scheduler.maximum-allocation-mb – 1g) * 0.10

spark.executor.instances = [this is set to the initial number of core nodes plus the number of task nodes in the cluster]

spark.executor.cores = yarn.nodemanager.resource.cpu-vcores

spark.default.parallelism = spark.executor.instances * spark.executor.cores

A graphical view of the parallelism

The Spark history server UI is accessible from the EMR console. It provides useful information about your application’s performance and behavior. You can see the list of scheduled stages and tasks, retrieve information about the executors, obtain a summary of memory usage, and retrieve the configurations submitted to the SparkContext object. For the purposes of this post, I show how the flags set in the spark-submit script used in the example above translate to the graphical tool.

To access the Spark history server, enable your SOCKS proxy and choose Spark History Server under Connections.

For Completed applications, choose the only entry available and expand the event timeline as below. Spark added 5 executors as requested in the definition of the –num-executors flag.

Next, by navigating to the stage details, you can see the number of tasks running in parallel per executor. This value is the same as the value of the –executor-cores flag.

Summary

In this post, you learned how to use spark-submit flags to submit an application to a cluster. Specifically, you learned how to control where the driver runs, set the resources allocated to the driver and executors, and the number of executors. You also learned when to use the maximizeResourceAllocation configuration option and dynamic allocation of executors.

If you have questions or suggestions, please leave a comment below.

—————————-

Related:

Run an External Zeppelin Instance using S3 Backed Notebooks with Spark on Amazon EMR

Looking to learn more about Big Data or Streaming Data? Check out our Big Data and Streaming data educational pages.

Submitting User Applications with spark-submit

Post Syndicated from Francisco Oliveira original https://blogs.aws.amazon.com/bigdata/post/Tx578UTQUV7LRP/Submitting-User-Applications-with-spark-submit

Francisco Oliveira is a consultant with AWS Professional Services

Customers starting their big data journey often ask for guidelines on how to submit user applications to Spark running on Amazon EMR. For example, customers ask for guidelines on how to size memory and compute resources available to their applications and the best resource allocation model for their use case.

In this post, I show how to set spark-submit flags to control the memory and compute resources available to your application submitted to Spark running on EMR. I discuss when to use the maximizeResourceAllocation configuration option and dynamic allocation of executors.

Spark execution model

At a high level, each application has a driver program that distributes work in the form of tasks among executors running on several nodes of the cluster.

The driver is the application code that defines the transformations and actions applied to the data set. At its core, the driver has instantiated an object of the SparkContext class. This object allows the driver to acquire a connection to the cluster, request resources, split the application actions into tasks, and schedule and launch tasks in the executors.

The executors not only perform tasks sent by the driver but also store data locally. As the executors are created and destroyed (see the “Enabling dynamic allocation of executors” section later), they register and deregister with the driver. The driver and the executors communicate directly.

To execute your application, the driver organizes the work to be accomplished in jobs. Each job is split into stages and each stage consists of a set of independent tasks that run in parallel. A task is the smallest unit of work in Spark and executes the same code, each on a different partition.

Spark programming model

An important abstraction in Spark is the resilient distributed dataset (RDD). This abstraction is key to perform in-memory computations. An RDD is a collection of read-only and immutable partitions of data that are distributed across the nodes of the cluster. Partitions in Spark allow the parallel execution of subsets of the data. Spark applications create RDDs and apply operations to RDDs. Although Spark partitions RDDs automatically, you can also set the number of partitions.

RDDs support two types of operations: transformation and actions. Transformations are operations that generate a new RDD, and actions are operations that write data to external storage or return a value to the driver after running a transformation on the dataset. Common transformations include operations that filter, sort and group by key. Common actions include operations that collect the results of tasks and ship them to the driver, save an RDD, or count the number of elements in a RDD.

spark-submit

A common way to launch applications on your cluster is by using the spark-submit script. This script offers several flags that allow you to control the resources used by your application.

Setting the spark-submit flags is one of the ways to dynamically supply configurations to the SparkContext object that is instantiated in the driver. spark-submit can also read configuration values set in the conf/spark-defaults.conf file which you can set using EMR configuration options when creating your cluster and, although not recommended, hardcoded in the application. An alternative to change conf/spark-defaults.conf is to use the –conf prop=value flag. I present both the spark-submit flag and the property name to use in the spark-defaults.conf file and –conf flag.

Spark applications running on EMR

Any application submitted to Spark running on EMR runs on YARN, and each Spark executor runs as a YARN container. When running on YARN, the driver can run in one YARN container in the cluster (cluster mode) or locally within the spark-submit process (client mode).

When running in cluster mode, the driver runs on ApplicationMaster, the component that submits YARN container requests to the YARN ResourceManager according to the resources needed by the application. A simplified and high-level diagram of the application submission process is shown below.

When running in client mode, the driver runs outside ApplicationMaster, in the spark-submit script process from the machine used to submit the application.

Setting the location of the driver

With spark-submit, the flag –deploy-mode can be used to select the location of the driver.

Submitting applications in client mode is advantageous when you are debugging and wish to quickly see the output of your application. For applications in production, the best practice is to run the application in cluster mode. This mode offers you a guarantee that the driver is always available during application execution. However, if you do use client mode and you submit applications from outside your EMR cluster (such as locally, on a laptop), keep in mind that the driver is running outside your EMR cluster and there will be higher latency for driver-executor communication.

Setting the driver resources

The size of the driver depends on the calculations the driver performs and on the amount of data it collects from the executors. When running the driver in cluster mode, spark-submit provides you with the option to control the number of cores (–driver-cores) and the memory (–driver-memory) used by the driver. In client mode, the default value for the driver memory is 1024 MB and one core.

Setting the number of cores and the number of executors

The number of executor cores (–executor-cores or spark.executor.cores) selected defines the number of tasks that each executor can execute in parallel. The best practice is to leave one core for the OS and about 4-5 cores per executor. The number of cores requested is constrained by the configuration property yarn.nodemanager.resource.cpu-vcores, which controls the number of cores available to all YARN containers running in one node and is set in the yarn-site.xml file.

The number of executors per node can be calculated using the following formula:

number of executors per node = number of cores on node – 1 for OS/number of task per executor

The total number of executors (–num-executors or spark.executor.instances) for a Spark job is:

total number of executors = number of executors per node * number of instances -1.

Setting the memory of each executor

The memory space of each executor container is subdivided on two major areas: the Spark executor memory and the memory overhead.

Note that the maximum memory that can be allocated to an executor container is dependent on the yarn.nodemanager.resource.memory-mb property available at yarn-site.xml. The executor memory (–executor-memory or spark.executor.memory) defines the amount of memory each executor process can use. The memory overhead (spark.yarn.executor.memoryOverHead) is off-heap memory and is automatically added to the executor memory. Its default value is executorMemory * 0.10.

Executor memory unifies sections of the heap for storage and execution purposes. These two subareas can now borrow space from one another if usage is exceeded. The relevant properties are spark.memory.fraction and spark.memory.storageFraction. For more information, see the Unified Memory Management in Spark 1.6 whitepaper.

The memory of each executor can be calculated using the following formula:

 memory of each executor = max container size on node / number of executors per node

A quick example

To show how you can set the flags I have covered so far, I submit the wordcount example application and then use the Spark history server for a graphical view of the execution.

First, I submit a modified word count sample application as an EMR step to my existing cluster. The code can be seen below:

from __future__ import print_function
from pyspark import SparkContext
import sys
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: wordcount ", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="WordCount")
text_file = sc.textFile(sys.argv[1])
counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile(sys.argv[2])
sc.stop()

The cluster has six m3.2xlarge instances plus one instance for the master, each with 8 vCPU and 30 GB of memory. The default value of yarn.nodemanager.resource.memory-mb for this instance type is 23 GB.

According to the formulas above, the spark-submit command would be as follows:

spark-submit –deploy-mode cluster –master yarn –num-executors 5 –executor-cores 5 –executor-memory 20g –conf spark.yarn.submit.waitAppCompletion=false wordcount.py s3://inputbucket/input.txt s3://outputbucket/

I submit the application as an EMR step with the following command:

aws emr add-steps –cluster-id j-xxxxx –steps Type=spark,Name=SparkWordCountApp,Args=[–deploy-mode,cluster,–master,yarn,–conf,spark.yarn.submit.waitAppCompletion=false,–num-executors,5,–executor-cores,5,–executor-memory,20g,s3://codelocation/wordcount.py,s3://inputbucket/input.txt,s3://outputbucket/],ActionOnFailure=CONTINUE

Note that I am also setting the property spark.yarn.submit.waitAppCompletion with the step definitions. When this property is set to false, the client submits the application and exits, not waiting for the application to complete. This setting allows you to submit multiple applications to be executed simultaneously by the cluster and is only available in cluster mode.

I use the default values for –driver-memory and –driver-cores, as the sample application is writing directly to Amazon S3 and the driver is not receiving any data from the executors.

Enabling dynamic allocation of executors

Spark on YARN has the ability to dynamically scale up and down the number of executors. This feature can be valuable when you have multiple applications being processed simultaneously as idle executors are released and an application can request additional executors on demand.

To enable this feature, please see the steps in the EMR documentation.

Spark provides granular control to the dynamic allocation mechanism by providing the following properties:

Initial number of executors (spark.dynamicAllocation.initalExecutors)

Minimum number of executors to be used by the application (spark.dynamicAllocation.minExecutors)

Maximum executors that can be requested (spark.dynamicAllocation.maxExecutors)

When to remove an idle executor (sparkdynamicAllocation.executorIdleTime)

When to request new executors to process waiting tasks (spark.dynamicAllocation.schedulerBacklogTimeout and spark.dynamicAllocation.sustainedSchedulerBacklogTimeout)

Automatically configure executors with maximum resource allocation

EMR provides an option to automatically configure the properties above in order to maximize the resource usage of the entire cluster. This configuration option can be valuable when you have only a single application being processed by your cluster at a time. Its usage should be avoided when you expect to run multiple applications simultaneously.

To enable this configuration option, please see the steps in the EMR documentation.

By setting this configuration option during cluster creation, EMR automatically updates the spark-defaults.conf file with the properties that control the compute and memory resources of an executor, as follows:

spark.executor.memory = (yarn.scheduler.maximum-allocation-mb – 1g) -spark.yarn.executor.memoryOverhead

spark.yarn.executor.memoryOverhead = (yarn.scheduler.maximum-allocation-mb – 1g) * 0.10

spark.executor.instances = [this is set to the initial number of core nodes plus the number of task nodes in the cluster]

spark.executor.cores = yarn.nodemanager.resource.cpu-vcores

spark.default.parallelism = spark.executor.instances * spark.executor.cores

A graphical view of the parallelism

The Spark history server UI is accessible from the EMR console. It provides useful information about your application’s performance and behavior. You can see the list of scheduled stages and tasks, retrieve information about the executors, obtain a summary of memory usage, and retrieve the configurations submitted to the SparkContext object. For the purposes of this post, I show how the flags set in the spark-submit script used in the example above translate to the graphical tool.

To access the Spark history server, enable your SOCKS proxy and choose Spark History Server under Connections.

For Completed applications, choose the only entry available and expand the event timeline as below. Spark added 5 executors as requested in the definition of the –num-executors flag.

Next, by navigating to the stage details, you can see the number of tasks running in parallel per executor. This value is the same as the value of the –executor-cores flag.

Summary

In this post, you learned how to use spark-submit flags to submit an application to a cluster. Specifically, you learned how to control where the driver runs, set the resources allocated to the driver and executors, and the number of executors. You also learned when to use the maximizeResourceAllocation configuration option and dynamic allocation of executors.

If you have questions or suggestions, please leave a comment below.

—————————-

Related:

Run an External Zeppelin Instance using S3 Backed Notebooks with Spark on Amazon EMR

Looking to learn more about Big Data or Streaming Data? Check out our Big Data and Streaming data educational pages.