Tag Archives: Apache Airflow

Orchestrate big data workflows with Apache Airflow, Genie, and Amazon EMR: Part 2

Post Syndicated from Francisco Oliveira original https://aws.amazon.com/blogs/big-data/orchestrate-big-data-workflows-with-apache-airflow-genie-and-amazon-emr-part-2/

Large enterprises running big data ETL workflows on AWS operate at a scale that services many internal end-users and runs thousands of concurrent pipelines. This, together with a continuous need to update and extend the big data platform to keep up with new frameworks and the latest releases of big data processing frameworks, requires an efficient architecture and organizational structure that both simplifies management of the big data platform and promotes easy access to big data applications.

In Part 1 of this post series, you learned how to use Apache Airflow, Genie, and Amazon EMR to manage big data workflows.

This post guides you through deploying the AWS CloudFormation templates, configuring Genie, and running an example workflow authored in Apache Airflow.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Solution overview

This solution uses an AWS CloudFormation template to create the necessary resources.

Users access the Apache Airflow Web UI and the Genie Web UI via SSH tunnel to the bastion host.

The Apache Airflow deployment uses Amazon ElastiCache for Redis as a Celery backend, Amazon EFS as a mount point to store DAGs, and Amazon RDS PostgreSQL for database services.

Genie uses Apache Zookeeper for leader election, an Amazon S3 bucket to store configurations (binaries, application dependencies, cluster metadata), and Amazon RDS PostgreSQL for database services. Genie submits jobs to an Amazon EMR cluster.

The architecture in this post is for demo purposes. In a production environment, the Apache Airflow and the Genie instances should be part of an Auto Scaling Group. For more information, see Deployment on the Genie Reference Guide.

The following diagram illustrates the solution architecture.

Creating and storing admin passwords in AWS Systems Manager Parameter Store

This solution uses AWS Systems Manager Parameter Store to store the passwords used in the configuration scripts. With AWS Systems Manager Parameter Store, you can create secure string parameters, which are parameters that have a plaintext parameter name and an encrypted parameter value. Parameter Store uses AWS KMS to encrypt and decrypt the parameter values of secure string parameters.

Before deploying the AWS CloudFormation templates, execute the following AWS CLI commands. These commands create AWS Systems Manager Parameter Store parameters to store the passwords for the RDS master user, the Airflow DB administrator, and the Genie DB administrator.

aws ssm put-parameter --name "/GenieStack/RDS/Settings" --type SecureString --value "ch4ng1ng-s3cr3t" --region Your-AWS-Region

aws ssm put-parameter --name "/GenieStack/RDS/AirflowSettings" --type SecureString --value "ch4ng1ng-s3cr3t" --region Your-AWS-Region

aws ssm put-parameter --name "/GenieStack/RDS/GenieSettings" --type SecureString --value "ch4ng1ng-s3cr3t" --region Your-AWS-Region

Creating an Amazon S3 Bucket for the solution and uploading the solution artifacts to S3

This solution uses Amazon S3 to store all artifacts used in the solution. Before deploying the AWS CloudFormation templates, create an Amazon S3 bucket and download the artifacts required by the solution from this link.

Unzip the artifacts required by the solution and upload the airflow and genie directories to the Amazon S3 bucket you just created. Keep a record of the Amazon S3 root path because you add it as a parameter to the AWS CloudFormation template later.

As an example, the following screenshot uses the root location geniestackbucket.

Use the value of the Amazon S3 Bucket you created for the AWS CloudFormation parameters GenieS3BucketLocation and AirflowBucketLocation.

Deploying the AWS CloudFormation stack

To launch the entire solution, choose Launch Stack.

The following table explains the parameters that the template requires. You can accept the default values for any parameters not in the table. For the full list of parameters, see the AWS CloudFormation template.

ParameterValue
Location of the configuration artifactsGenieS3BucketLocationThe S3 bucket with Genie artifacts and Genie’s installation scripts. For example: geniestackbucket.
AirflowBucketLocationThe S3 bucket with the Airflow artifacts. For example: geniestackbucket.
NetworkingSSHLocationThe IP address range to SSH to the Genie, Apache Zookeeper, and Apache Airflow EC2 instances.
SecurityBastionKeyNameAn existing EC2 key pair to enable SSH access to the bastion host instance.
AirflowKeyNameAn existing EC2 key pair to enable SSH access to the Apache Airflow instance.
ZKKeyNameAn existing EC2 key pair to enable SSH access to the Apache Zookeeper instance.
GenieKeyNameAn existing EC2 key pair to enable SSH access to the Genie.
EMRKeyNameAn existing Amazon EC2 key pair to enable SSH access to the Amazon EMR cluster.
LoggingemrLogUriThe S3 location to store Amazon EMR cluster Logs. For example: s3://replace-with-your-bucket-name/emrlogs/

Post-deployment steps

To access the Apache Airflow and Genie Web Interfaces, set up an SSH and configure a SOCKS proxy for your browser. Complete the following steps:

  1. On the AWS CloudFormation console, choose the stack you created.
  2. Choose the Outputs
  3. Find the public DNS of the bastion host instance.The following screenshot shows the instance this post uses.
  4. Set up an SSH tunnel to the master node using dynamic port forwarding.
    Instead of using the master public DNS name of your cluster and the username hadoop, which the walkthrough references, use the public DNS of the bastion host instance and replace the user hadoop for the user ec2-user.
  1. Configure the proxy settings to view websites hosted on the master node.
    You do not need to modify any of the steps in the walkthrough.

This process configures a SOCKS proxy management tool that allows you to automatically filter URLs based on text patterns and limit the proxy settings to domains that match the form of the Amazon EC2 instance’s public DNS name.

Accessing the Web UI for Apache Airflow and Genie

To access the Web UI for Apache Airflow and Genie, complete the following steps:

  1. On the CloudFormation console, choose the stack you created.
  2. Choose the Outputs
  3. Find the URLs for the Apache Airflow and Genie Web UI.The following screenshot shows the URLs that this post uses.
  1. Open two tabs in your web browser. You will use the tabs for the Apache Airflow UI and the Genie UI.
  2. For the Foxy Proxy you configured previously, click the icon Foxy Proxy added to the top right section of your browser and choose Use proxies based on their predefined patterns and priorities.The following screenshot shows the proxy options.
  1. Enter the URL for the Apache Airflow Web UI and for the Genie Web UI on their respective tabs.

You are now ready to run a workflow in this solution.

Preparing application resources

The first step as a platform admin engineer is to prepare the binaries and configurations of the big data applications that the platform supports. In this post, the Amazon EMR clusters use release 5.26.0. Because Amazon EMR release 5.26.0 has Hadoop 2.8.5 and Spark 2.4.3 installed, those are the applications you want to support in the big data platform. If you decide to use a different EMR release, prepare binaries and configurations for those versions. The following sections guide you through the steps to prepare binaries should you wish to use a different EMR release version.

To prepare a Genie application resource, create a YAML file with fields that are sent to Genie in a request to create an application resource.

This file defines metadata information about the application, such as the application name, type, version, tags, the location on S3 of the setup script, and the location of the application binaries. For more information, see Create an Application in the Genie REST API Guide.

Tag structure for application resources

This post uses the following tags for application resources:

  • type – The application type, such as Spark, Hadoop, Hive, Sqoop, or Presto.
  • version – The version of the application, such as 2.8.5 for Hadoop.

The next section shows how the tags are defined in the YAML file for an application resource. You can add an arbitrary number of tags to associate with Genie resources. Genie also maintains their own tags in addition to the ones the platform admins define, which you can see in the field ID and field name of the files.

Preparing the Hadoop 2.8.5 application resource

This post provides an automated creation of the YAML file. The following code shows the resulting file details:

id: hadoop-2.8.5
name: hadoop
user: hadoop
status: ACTIVE
description: Hadoop 2.8.5 Application
setupFile: s3://Your_Bucket_Name/genie/applications/hadoop-2.8.5/setup.sh
configs: []
version: 2.8.5
type: hadoop
tags:
  - type:hadoop
  - version:2.8.5
dependencies:
  - s3://Your_Bucket_Name/genie/applications/hadoop-2.8.5/hadoop-2.8.5.tar.gz

The file is also available directly at s3://Your_Bucket_Name/genie/applications/hadoop-2.8.5/hadoop-2.8.5.yml.

NOTE: The following steps are for reference only, should you be completing this manually, rather than using the automation option provided.

The S3 objects referenced by the setupFile and dependencies labels are available in your S3 bucket. For your reference, the steps to prepare the artifacts used by properties setupFile and dependencies are as follows:

  1. Download hadoop-2.8.5.tar.gz from https://www.apache.org/dist/hadoop/core/hadoop-2.8.5/.
  2. Upload hadoop-2.8.5.tar.gz to s3://Your_Bucket_Name/genie/applications/hadoop-2.8.5/.

Preparing the Spark 2.4.3 application resource

This post provides an automated creation of the YAML file. The following code shows the resulting file details:

id: spark-2.4.3
name: spark
user: hadoop
status: ACTIVE
description: Spark 2.4.3 Application
setupFile: s3://Your_Bucket_Name/genie/applications/spark-2.4.3/setup.sh
configs: []
version: 2.4.3
type: spark
tags:
  - type:spark
  - version:2.4.3
  - version:2.4
dependencies:
  - s3://Your_Bucket_Name/genie/applications/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz

The file is also available directly at s3://Your_Bucket_Name/genie/applications/spark-2.4.3/spark-2.4.3.yml.

NOTE: The following steps are for reference only, should you be completing this manually, rather than using the automation option provided.

The objects in setupFile and dependencies are available in your S3 bucket. For your reference, the steps to prepare the artifacts used by properties setupFile and dependencies are as follows:

  1. Download spark-2.4.3-bin-hadoop2.7.tgz from https://archive.apache.org/dist/spark/spark-2.4.3/ .
  2. Upload spark-2.4.3-bin-hadoop2.7.tgz to s3://Your_Bucket_Name/genie/applications/spark-2.4.3/ .

Because spark-2.4.3-bin-hadoop2.7.tgz uses Hadoop 2.7 and not Hadoop 2.8.3, you need to extract the EMRFS libraries for Hadoop 2.7 from an EMR cluster running Hadoop 2.7 (release 5.11.3). This is already available in your S3 Bucket. For reference, the steps to extract the EMRFS libraries are as follows:

  1. Deploy an EMR cluster with release 5.11.3.
  2. Run the following command:
aws s3 cp /usr/share/aws/emr/emrfs/lib/emrfs-hadoop-assembly-2.20.0.jar s3://Your_Bucket_Name/genie/applications/spark-2.4.3/hadoop-2.7/aws/emr/emrfs/lib/

Preparing a command resource

The next step as a platform admin engineer is to prepare the Genie commands that the platform supports.

In this post, the workflows use Apache Spark. This section shows the steps to prepare a command resource of type Apache Spark.

To prepare a Genie command resource, create a YAML file with fields that are sent to Genie in a request to create a command resource.

This file defines metadata information about the command, such as the command name, type, version, tags, the location on S3 of the setup script, and the parameters to use during command execution. For more information, see Create a Command in the Genie REST API Guide.

Tag structure for command resources

This post uses the following tag structure for command resources:

  • type – The command type, for example, spark-submit.
  • version – The version of the command, for example, 2.4.3 for Spark.

The next section shows how the tags are defined in the YAML file for a command resource. Genie also maintains their own tags in addition to the ones the platform admins define, which you can see in the field ID and field name of the files.

Preparing the spark-submit command resource

This post provides an automated creation of the YAML file. The following code shows the resulting file details:

id: spark-2.4.3_spark-submit
name: Spark Submit 
user: hadoop 
description: Spark Submit Command 
status: ACTIVE 
setupFile: s3://Your_Bucket_Name/genie/commands/spark-2.4.3_spark-submit/setup.sh
configs: [] 
executable: ${SPARK_HOME}/bin/spark-submit --master yarn --deploy-mode client 
version: 2.4.3 
tags:
  - type:spark-submit
  - version:2.4.3
checkDelay: 5000

The file is also available at s3://Your_Bucket_Name/genie/commands/spark-2.4.3_spark-submit/spark-2.4.3_spark-submit.yml.

The objects in setupFile are available in your S3 bucket.

Preparing cluster resources

This post also automated the step to prepare cluster resources; it follows a similar process as described previously but applied to cluster resources.

During the startup of the Amazon EMR cluster, a custom script creates a YAML file with the metadata details about the cluster and uploads the file to S3. For more information, see Create a Cluster in the Genie REST API Guide.

The script also extracts all Amazon EMR libraries and uploads them to S3. The next section discusses the process of registering clusters with Genie.

The script is available at s3://Your_Bucket_Name/genie/scripts/genie_register_cluster.sh.

Tag structure for cluster resources

This post uses the following tag structure for cluster resources:

  • cluster.release – The Amazon EMR release name. For example, emr-5.26.0.
  • cluster.id – The Amazon EMR cluster ID. For example, j-xxxxxxxx.
  • cluster.name – The Amazon EMR cluster name.
  • cluster.role – The role associated with this cluster. For this post, the role is batch. Other possible roles would be ad hoc or Presto, for example.

You can add new tags for a cluster resource or change the values of existing tags by editing s3://Your_Bucket_Name/genie/scripts/genie_register_cluster.sh.

You could also use other combinations of tags, such as a tag to identify the application lifecycle environment or required custom jars.

Genie also maintains their own tags in addition to the ones the platform admins define, which you can see in the field ID and field name of the files. If multiple clusters share the same tag, by default, Genie distributes jobs across clusters associated with the same tag randomly. For more information, see Cluster Load Balancing in the Genie Reference Guide.

Registering resources with Genie

Up to this point, all the configuration activities mentioned in the previous sections were already prepared for you.

The following sections show how to register resources with Genie. In this section you will be connecting to the bastion via SSH to run configuration commands.

Registering application resources

To register the application resources you prepared in the previous section, SSH into the bastion host and run the following command:

python /tmp/genie_assets/scripts/genie_register_application_resources.py Replace-With-Your-Bucket-Name Your-AWS-Region http://replace-with-your-genie-server-url:8080

To see the resource information, navigate to the Genie Web UI and choose the Applications tab. See the following screenshot. The screenshot shows two application resources, one for Apache Spark (version 2.4.3) and the other for Apache Hadoop (version 2.8.5).

Registering commands and associate commands with applications

The next step is to register the Genie command resources with specific applications. For this post, because spark-submit depends on Apache Hadoop and Apache Spark, associate the spark-submit command with both applications.

The order you define for the applications in file genie_register_command_resources_and_associate_applications.py is important. Because Apache Spark depends on Apache Hadoop, the file first references Apache Hadoop and then Apache Spark. See the following code:

commands = [{'command_name' : 'spark-2.4.3_spark-submit', 'applications' : ['hadoop-2.8.5', 'spark-2.4.3']}]

To register the command resources and associate them with the application resources registered in the previous step, SSH into the bastion host and run the following command:

python /tmp/genie_assets/scripts/genie_register_command_resources_and_associate_applications.py Replace-With-Your-Bucket-Name Your-AWS-Region http://replace-with-your-genie-server-url:8080

To see the command you registered plus the applications it is linked to, navigate to the Genie Web UI and choose the Commands tab.

The following screenshot shows the command details and the applications it is linked to.

Registering Amazon EMR clusters

As previously mentioned, the Amazon EMR cluster deployed in this solution registers the cluster when the cluster starts via an Amazon EMR step. You can access the script that Amazon EMR clusters use at s3://Your_Bucket_Name/genie/scripts/genie_register_cluster.sh. The script also automates deregistering the cluster from Genie when the cluster terminates.

In the Genie Web UI, choose the Clusters tab. This page shows you the current cluster resources. You can also find the location of the configuration files that uploaded to the cluster S3 location during the registration step.

The following screenshot shows the cluster details and the location of configuration files (yarn-site.xml, core-site.xml, mapred-site.xml).

Linking commands to clusters

You have now registered all applications, commands, and clusters, and associated commands with the applications on which they depend. The final step is to link a command to a specific Amazon EMR cluster that is configured to run it.

Complete the following steps:

  1. SSH into the bastion host.
  2. Open /tmp/genie_assets/scripts/genie_link_commands_to_clusters.py with your preferred text editor.
  3. Look for the following lines in the code:# Change cluster_name below
    clusters = [{'cluster_name' : 'j-xxxxxxxx', 'commands' :
    ['spark-2.4.3_spark-submit']}]
  1. Replace j-xxxxxxxx in the file with the cluster_name.
    To see the name of the cluster, navigate to the Genie Web UI and choose Clusters.
  2. To link the command to a specific Amazon EMR cluster run the following command:
    python /tmp/genie_assets/scripts/genie_link_commands_to_clusters.py Replace-With-Your-Bucket-Name Your-AWS-Region http://replace-with-your-genie-server-url:8080

The command is now linked to a cluster.

In the Genie Web UI, choose the Commands tab. This page shows you the current command resources. Select spark-2.4.3_spark_submit and see the clusters associated with the command.

The following screenshot shows the command details and the clusters it is linked to.

You have configured Genie with all resources; it can now receive job requests.

Running an Apache Airflow workflow

It is out of the scope of this post to provide a detailed description of the workflow code and dataset. This section provides details of how Apache Airflow submits jobs to Genie via a GenieOperator that this post provides.

The GenieOperator for Apache Airflow

The GenieOperator allows the data engineer to define the combination of tags to identify the commands and the clusters in which the tasks should run.

In the following code example, the cluster tag is ‘emr.cluster.role:batch‘ and the command tags are ‘type:spark-submit‘ and ‘version:2.4.3‘.

spark_transform_to_parquet_movies = GenieOperator(
    task_id='transform_to_parquet_movies',
    cluster_tags=['emr.cluster.role:batch'],
    command_tags=['type:spark-submit', 'version:2.4.3'],
    command_arguments="transform_to_parquet.py s3://{}/airflow/demo/input/csv/{}  s3://{}/demo/output/parquet/{}/".format(bucket,'movies.csv',bucket,'movies'), dependencies="s3://{}/airflow/dag_artifacts/transforms/transform_to_parquet.py".format(bucket),
    description='Demo Spark Submit Job',
    job_name="Genie Demo Spark Submit Job",
    tags=['transform_to_parquet_movies'],
    xcom_vars=dict(),
    retries=3,
    dag=extraction_dag)

The property command_arguments defines the arguments to the spark-submit command, and dependencies defines the location of the code for the Apache Spark Application (PySpark).

You can find the code for the GenieOperator in the following location: s3://Your_Bucket_Name/airflow/plugins/genie_plugin.py.

One of the arguments to the DAG is the Genie connection ID (genie_conn_id). This connection was created during the automated setup of the Apache Airflow Instance. To see this and other existing connections, complete the following steps:

  1. In the Apache Airflow Web UI, choose the Admin
  2. Choose Connections.

The following screenshot shows the connection details.

The Airflow variable s3_location_genie_demo reference in the DAG was set during the installation process. To see all configured Apache Airflow variables, complete the following steps:

  1. In the Apache Airflow Web UI, choose the Admin
  2. Choose Variables.

The following screenshot shows the Variables page.

Triggering the workflow

You can now trigger the execution of the movie_lens_transfomer_to_parquet DAG. Complete the following steps:

  1. In the Apache Airflow Web UI, choose the DAGs
  2. Next to your DAG, change Off to On.

The following screenshot shows the DAGs page.

For this example DAG, this post uses a small subset of the movielens dataset. This dataset is a popular open source dataset, which you can use in exploring data science algorithms. Each dataset file is a comma-separated values (CSV) file with a single header row. All files are available in your solution S3 bucket under s3://Your_Bucket_Name/airflow/demo/input/csv .

movie_lens_transfomer_to_parquet is a simple workflow that triggers a Spark job that converts input files from CSV to Parquet.

The following screenshot shows a graphical representation of the DAG.

In this example DAG, after transform_to_parquet_movies concludes, you can potentially execute four tasks in parallel. Because the DAG concurrency is set to 3, as seen in the following code example, only three tasks can run at the same time.

# Initialize the DAG
# Concurrency --> Number of tasks allowed to run concurrently
extraction_dag = DAG(dag_name,
          default_args=dag_default_args,
          start_date=start_date,
          schedule_interval=schedule_interval,
          concurrency=3,
          max_active_runs=1
          )

Visiting the Genie job UI

The GenieOperator for Apache Airflow submitted the jobs to Genie. To see job details, in the Genie Web UI, choose the Jobs tab. You can see details such as the jobs submitted, their arguments, the cluster it is running, and the job status.

The following screenshot shows the Jobs page.

You can now experiment with this architecture by provisioning a new Amazon EMR cluster, registering it with a new value (for example, “production”) for Genie tag “emr.cluster.role”, linking the cluster to a command resource, and updating the tag combination in the GenieOperator used by some of the tasks in the DAG.

Cleaning up

To avoid incurring future charges, delete the resources and the S3 bucket created for this post.

Conclusion

This post showed how to deploy an AWS CloudFormation template that sets up a demo environment for Genie, Apache Airflow, and Amazon EMR. It also demonstrated how to configure Genie and use the GenieOperator for Apache Airflow.

 


About the Authors

Francisco Oliveira is a senior big data solutions architect with AWS. He focuses on building big data solutions with open source technology and AWS. In his free time, he likes to try new sports, travel and explore national parks.

 

 

 

Jelez Raditchkov is a practice manager with AWS.

 

 

 

 

Prasad Alle is a Senior Big Data Consultant with AWS Professional Services. He spends his time leading and building scalable, reliable Big data, Machine learning, Artificial Intelligence and IoT solutions for AWS Enterprise and Strategic customers. His interests extend to various technologies such as Advanced Edge Computing, Machine learning at Edge. In his spare time, he enjoys spending time with his family.

Orchestrate big data workflows with Apache Airflow, Genie, and Amazon EMR: Part 1

Post Syndicated from Francisco Oliveira original https://aws.amazon.com/blogs/big-data/orchestrate-big-data-workflows-with-apache-airflow-genie-and-amazon-emr-part-1/

Large enterprises running big data ETL workflows on AWS operate at a scale that services many internal end-users and runs thousands of concurrent pipelines. This, together with a continuous need to update and extend the big data platform to keep up with new frameworks and the latest releases of big data processing frameworks, requires an efficient architecture and organizational structure that both simplifies management of the big data platform and promotes easy access to big data applications.

This post introduces an architecture that helps centralized platform teams maintain a big data platform to service thousands of concurrent ETL workflows, and simplifies the operational tasks required to accomplish that.

Architecture components

At high level, the architecture uses two open source technologies with Amazon EMR to provide a big data platform for ETL workflow authoring, orchestration, and execution. Genie provides a centralized REST API for concurrent big data job submission, dynamic job routing, central configuration management, and abstraction of the Amazon EMR clusters. Apache Airflow provides a platform for job orchestration that allows you to programmatically author, schedule, and monitor complex data pipelines. Amazon EMR provides a managed cluster platform that can run and scale Apache Hadoop, Apache Spark, and other big data frameworks.

The following diagram illustrates the architecture.

Apache Airflow

Apache Airflow is an open source tool for authoring and orchestrating big data workflows.

With Apache Airflow, data engineers define direct acyclic graphs (DAGs). DAGs describe how to run a workflow and are written in Python. Workflows are designed as a DAG that groups tasks that are executed independently. The DAG keeps track of the relationships and dependencies between tasks.

Operators define a template to define a single task in the workflow. Airflow provides operators for common tasks, and you can also define custom operators. This post discusses the custom operator (GenieOperator) to submit tasks to Genie.

A task is a parameterized instance of an operator. After an operator is instantiated, it’s referred to as a task. A task instance represents a specific run of a task. A task instance has an associated DAG, task, and point in time.

You can run DAGs and tasks on demand or schedule them to run at a specific time defined as a cron expression in the DAG.

For additional details on Apache Airflow, see Concepts in the Apache Airflow documentation.

Genie

Genie is an open source tool by Netflix that provides configuration-management capabilities and dynamic routing of jobs by abstracting access to the underlining Amazon EMR clusters.

Genie provides a REST API to submit jobs from big data applications such as Apache Hadoop MapReduce or Apache Spark. Genie manages the metadata of the underlining clusters and the commands and applications that run in the clusters.

Genie abstracts access to the processing clusters by associating one or more tags with the clusters. You can also associate tags with the metadata details for the applications and commands that the big data platform supports. As Genie receives job submissions for specific tags, it uses a combination of the cluster/command tag to route each job to the correct EMR cluster dynamically.

Genie’s data model

Genie provides a data model to capture the metadata associated with resources in your big data environment.

An application resource is a reusable set of binaries, configuration files, and setup files to install and configure applications supported by the big data platform on the Genie node that submits the jobs to the clusters. When Genie receives a job, the Genie node downloads all dependencies, configuration files, and setup files associated with the applications and stores it in a job working directory. Applications are linked to commands because they represent the binaries and configurations needed before a command runs.

Command resources represent the parameters when using the command line to submit work to a cluster and which applications need to be available on the PATH to run the command. Command resources glue metadata components together. For example, a command resource representing a Hive command would include a hive-site.xml and be associated with a set of application resources that provide the Hive and Hadoop binaries needed to run the command. Moreover, a command resource is linked to the clusters it can run on.

A cluster resource identifies the details of an execution cluster, including connection details, cluster status, tags, and additional properties. A cluster resource can register with Genie during startup and deregister during termination automatically. Clusters are linked to one or more commands that can run in it. After a command is linked to a cluster, Genie can start submitting jobs to the cluster.

Lastly, there are three job resource types: job request, job, and job execution. A job request resource represents the request submission with details to run a job. Based on the parameters submitted in the request, a job resource is created. The job resource captures details such as the command, cluster, and applications associated with the job. Additionally, information on status, start time, and end time is also available on the job resource. A job execution resource provides administrative details so you can understand where the job ran.

For more information, see Data Model on the Genie Reference Guide.

Amazon EMR and Amazon S3

Amazon EMR is a managed cluster platform that simplifies running big data frameworks, such as Apache Hadoop and Apache Spark, on AWS to process and analyze vast amounts of data. For more information, see Overview of Amazon EMR Architecture and Overview of Amazon EMR.

Data is stored in Amazon S3, an object storage service with scalable performance, ease-of-use features, and native encryption and access control capabilities. For more details on S3, see Amazon S3 as the Data Lake Storage Platform.

Architecture deep dive

Two main actors interact with this architecture: platform admin engineers and data engineers.

Platform admin engineers have administrator access to all components. They can add or remove clusters, and configure the applications and the commands that the platform supports.

Data engineers focus on writing big data applications with their preferred frameworks (Apache Spark, Apache Hadoop MR, Apache Sqoop, Apache Hive, Apache Pig, and Presto) and authoring python scripts to represent DAGs.

At high level, the team of platform admin engineers prepares the supported big data applications and its dependencies and registers them with Genie. The team of platform admin engineers launches Amazon EMR clusters that register with Genie during startup.

The team of platform admin engineers associates each Genie metadata resource (applications, commands, and clusters) with Genie tags. For example, you can associate a cluster resource with a tag named environment and the value can be “Production Environment”, “Test Environment”, or “Development Environment”.

Data engineers author workflows as Airflow DAGs and use a custom Airflow Operator—GenieOperator—to submit tasks to Genie. They can use a combination of tags to identify the type of tasks they are running plus where the tasks should run. For example, you might need to run Apache Spark 2.4.3 tasks in the environment identified by the “Production Environment” tag. To do this, set the cluster and command tags in the Airflow GenieOperator as the following code:

(cluster_tags=['emr.cluster.environment:production'],command_tags=['type:spark-submit','ver:2.4.3'])

The following diagram illustrates this architecture.

The workflow, as it corresponds to the numbers in this diagram are as follows:

  1. A platform admin engineer prepares the binaries and dependencies of the supported applications (Spark-2.4.5, Spark-2.1.0, Hive-2.3.5, etc.). The platform admin engineer also prepares commands (spark-submit, hive). The platform admin engineer registers applications and commands with Genie. Moreover, the platform admin engineer associates commands with applications and links commands to a set of clusters after step 2 (below) is concluded.
  2. Amazon EMR cluster(s) register with Genie during startup.
  3. A data engineer authors Airflow DAGs and uses the Genie tag to reference the environment, application, command or any combination of the above. In the workflow code, the data engineer uses the GenieOperator. The GenieOperator submits jobs to Genie.
  4. A schedule triggers workflow execution or a data engineer manually triggers the workflow execution. The jobs that compose the workflow are submitted to Genie for execution with a set of Genie tags that specify where the job should be run.
  5. The Genie node, working as the client gateway, will set up a working directory with all binaries and dependencies. Genie dynamically routes the jobs to the cluster(s) associated with the provided Genie tags. The Amazon EMR clusters run the jobs.

For details on the authorization and authentication mechanisms supported by Apache Airflow and Genie see Security in the Apache Airflow documentation and Security in the Genie documentation.  This architecture pattern does not expose SSH access to the Amazon EMR clusters. For details on providing different levels of access to data in Amazon S3 through EMR File System (EMRFS), see Configure IAM Roles for EMRFS Requests to Amazon S3.

Use cases enabled by this architecture

The following use cases demonstrate the capabilities this architecture provides.

Managing upgrades and deployments with no downtime and adopting the latest open source release

In a large organization, teams that use the data platform use heterogeneous frameworks and different versions. You can use this architecture to support upgrades with no downtime and offer the latest version of open source frameworks in a short amount of time.

Genie and Amazon EMR are the key components to enable this use case. As the Amazon EMR service team strives to add the latest version of the open source frameworks running on Amazon EMR in a short release cycle, you can keep up with your internal teams’ needs of the latest features of their preferred open source framework.

When a new version of the open source framework is available, you need to test it, add the new supported version and its dependencies to Genie, and move tags in the old cluster to the new one. The new cluster takes new job submissions, and the old cluster concludes jobs it is still running.

Moreover, because Genie centralizes the location of application binaries and its dependencies, upgrading binaries and dependencies in Genie also upgrades any upstream client automatically. Using Genie removes the need for upgrading all upstream clients.

Managing a centralized configuration, job and cluster status, and logging

In a universe of thousands of jobs and multiple clusters, you need to identify where a specific job is running and access logging details quickly. This architecture gives you visibility into jobs running on the data platform, logging of jobs, clusters, and their configurations.

Having programmatic access to the big data platform

This architecture enables a single point of job submissions by using Genie’s REST API. Access to the underlying cluster is abstracted through a set of APIs that enable administration tasks plus submitting jobs to the clusters. A REST API call submits jobs into Genie asynchronously. If accepted, a job-id is returned that you can use to get job status and outputs programmatically via API or web UI. A Genie node sets up the working directory and runs the job on a separate process.

You can also integrate this architecture with continuous integration and continuous delivery (CI/CD) pipelines for big data application and Apache Airflow DAGs.

Enabling scalable client gateways and concurrent job submissions

The Genie node acts as a client gateway (edge node) and can scale horizontally to make sure the client gateway resources used to submit jobs to the data platform meet demand. Moreover, Genie allows the submission of concurrent jobs.

When to use this architecture

This architecture is recommended for organizations that use multiple large, multi-tenant processing clusters instead of transient clusters. It is out of the scope of this post to address when organizations should consider always-on clusters versus transient clusters (you can use an EMR Airflow Operator to spin up Amazon EMR clusters that register with Genie, run a job, and tear them down). You should use Reserved Instances with this architecture. For more information, see Using Reserved Instances.

This architecture is especially recommended for organizations that choose to have a central platform team to administer and maintain a big data platform that supports many internal teams that require thousands of jobs to run concurrently.

This architecture might not make sense for organizations that are not at as large or don’t expect to grow to that scale. The benefits of cluster abstraction and centralized configuration management are ideal in bringing structured access to a potentially chaotic environment of thousands of concurrent workflows and hundreds of teams.

This architecture is also recommended for organizations that support a high percentage of multi-hour or overlapping workflows and heterogeneous frameworks (Apache Spark, Apache Hive, Apache Pig, Apache Hadoop MapReduce, Apache Sqoop, or Presto).

If your organization relies solely on Apache Spark and is aligned with the recommendations discussed previously, this architecture might still apply. For organizations that don’t have the scale to justify the need for centralized REST API for job submission, cluster abstraction, dynamic job routing, or centralized configuration management, Apache Livy plus Amazon EMR might be the appropriate option. Genie has its own scalable infrastructure that acts as the edge client. This means that Genie does not compete with Amazon EMR master instance resources, whereas Apache Livy does.

If the majority of your organization’s workflows are a few short-lived jobs, opting for a serverless processing layer, serverless ad hoc querying layer, or using dedicated transient Amazon EMR clusters per workflow might be more appropriate. If the majority of your organization’s workflows are composed of thousands of short-lived jobs, the architecture still applies because it removes the need to spin up and down clusters.

This architecture is recommended for organizations that require full control of the processing platform to optimize component performance. Moreover, this architecture is recommended for organizations that need to enforce centralized governance on their workflows via CI/CD pipelines.

It is out of the scope of this post to evaluate different orchestration options or the benefits of adopting Airflow as the orchestration layer. When considering adopting an architecture, also consider the existing skillset and time to adopt tooling. The open source nature of Genie may allow you to integrate other orchestration tools. Evaluating that route might be an option if you wish to adopt this architecture with another orchestration tool.

Conclusion

This post presented how to use Apache Airflow, Genie, and Amazon EMR to manage big data workflows. The post described the architecture components, the use cases the architecture supports, and when to use it. The second part of this post deploys a demo environment and walks you through the steps to configure Genie and use the GenieOperator for Apache Airflow.

 


About the Author

Francisco Oliveira is a senior big data solutions architect with AWS. He focuses on building big data solutions with open source technology and AWS. In his free time, he likes to try new sports, travel and explore national parks.

 

 

 

Jelez Raditchkov is a practice manager with AWS.