All posts by Fei Lang

Orchestrating analytics jobs on Amazon EMR Notebooks using Amazon MWAA

Post Syndicated from Fei Lang original https://aws.amazon.com/blogs/big-data/orchestrating-analytics-jobs-on-amazon-emr-notebooks-using-amazon-mwaa/

In a previous post, we introduced the Amazon EMR notebook APIs, which allow you to programmatically run a notebook on both Amazon EMR Notebooks and Amazon EMR Studio (preview) without accessing the AWS web console. With the APIs, you can schedule running EMR notebooks with cron scripts, chain multiple EMR notebooks, and use orchestration services such as AWS Step Functions triggered by AWS CloudWatch Events.

In this post, we show how to use Amazon Managed Workflows for Apache Airflow (Amazon MWAA) to orchestrate analytics jobs on EMR Notebooks. We will start by walking you through the process of using AWS CloudFormation to set up an Amazon MWAA environment, which allows you to programmatically author, schedule, and monitor different sorts of workflows on Amazon EMR. We will then use this environment to run an EMR notebook example which does data analysis with Hive.

The data source for the example in this post is from the public Amazon Customer Reviews Dataset. We use the Parquet formatted dataset as the input dataset for our EMR notebook.

Apache Airflow and Amazon MWAA

Apache Airflow is an open-source platform for authoring, scheduling, and monitoring workflows. With Apache Airflow, we can define direct acyclic graphs (DAGs). DAGs describe how to run a workflow and are written in Python. For additional details on Apache Airflow, see Concepts. Many organizations build, manage, and maintain Apache Airflow on AWS using services such as Amazon Elastic Compute Cloud (Amazon EC2) or Amazon Elastic Kubernetes Service (Amazon EKS). Amazon MWAA is a fully managed service that makes it easy to run open-source versions of Apache Airflow on AWS, and to build workflows to run your extract, transform, and load (ETL) jobs and data pipelines.

Prerequisites

Before getting started, you must have the following prerequisites:

  • An AWS account that provides access to AWS services.
  • AWS Command Line Interface (AWS CLI) version 1.18.128 or later installed on your workstation.
  • An Amazon Simple Storage Service (Amazon S3) bucket that meets the following Amazon MWAA requirements:
    • The bucket must be in the same AWS Region where you create the MWAA environment.
    • The bucket name must start with airflow- and should be globally unique.
    • Bucket versioning is enabled.
    • A folder named dags must be created in the same bucket to store DAGs and associated support files.
  • An AWS Identity and Access Management (IAM) user with an access key and secret access key to configure the AWS CLI.
    • The IAM user has permissions to create an IAM role and policies, launch an EMR cluster, create an Amazon MWAA environment, and create stacks in AWS CloudFormation.
  • A possible limit increase for your account. (Usually a limit increase isn’t necessary. See AWS service quotas if you encounter a limit error while building the solution.)
  • An EMR notebook created through the Amazon EMR console, using the notebook file find_best_sellers.ipynb. See Creating a Notebook for instructions on creating an EMR notebook. Record the ID of the EMR notebook (for example, <e-*************************>); you will use this later in this post.

Architecture overview

At a high level, this solution uses Amazon MWAA with Amazon EMR to build pipelines for ETL workflow orchestration. The following diagram illustrates the solution architecture.

The following diagram illustrates the solution architecture.

We use the following services and configurations in this solution:

  • Amazon S3
  • VPC network configurations
  • VPC endpoints

Amazon S3

Amazon MWAA uses an S3 bucket to store DAGs and associated support files. You must create an S3 bucket before you can create the environment, with requirements as mentioned in the Prerequisites section. To use a bucket with an Amazon MWAA environment, you must create the bucket in the same Region where you create the environment. Refer to Create an Amazon S3 bucket for Amazon MWAA for further details.

VPC network configurations

Amazon MWAA requires a VPC network that meets the following requirements:

  • Includes two private subnets that are in two different Availability Zones within the same Region
  • Includes public subnets that are configured to route the private subnet data to the internet (via NAT gateways)

For more information, see Create the VPC network using a AWS CloudFormation template.

The Airflow UI in the Amazon MWAA environment is accessible over the internet by users granted access in the IAM policy. Amazon MWAA attaches an Application Load Balancer with an HTTPS endpoint for your web server as part of the Amazon MWAA managed service. For more information, see How it works.

VPC endpoints

VPC endpoints are highly available VPC components that enable private connections between your VPC and supported AWS services. Traffic between your VPC and the other services remains in your AWS network. For our example, we use the following VPC endpoints to ensure extra security, availability, and Amazon S3 data transfer performance:

  • An Amazon S3 gateway VPC endpoint to establish a private connection between the Amazon MWAA VPC and Amazon S3
  • An EMR interface VPC endpoint to securely route traffic directly to Amazon EMR from Amazon MWAA, instead of connecting over the internet

Setting up an Amazon MWAA environment

To make it easier to get started, we created a CloudFormation template that automatically configures and deploys the Amazon MWAA environment. The template takes care of the following tasks for you:

  • Create an Amazon MWAA execution IAM role.
  • Set up the VPC network for the Amazon MWAA environment, deploying the following resources:
    • A VPC with a pair of public and private subnets spread across two Availability Zones.
    • An internet gateway, with a default route on the public subnets.
    • A pair of NAT gateways (one in each Availability Zone), and default routes for them in the private subnets.
    • Amazon S3 gateway VPC endpoints and EMR interface VPC endpoints in the private subnets in two Availability Zones.
    • A security group to be used by the Amazon MWAA environment that only allows local inbound traffic and all outbound traffic.
  • Create an Amazon MWAA environment. For this post, we select mw1.small for the environment class and choose maximum worker count as 1. For monitoring, we choose to publish environment performance to CloudWatch Metrics. For Airflow logging configuration, we choose to send only the task logs and use log level INFO.

If you want to manually create, configure, and deploy the Amazon MWAA environment without using AWS CloudFormation, see Get started with Amazon Managed Workflows for Apache Airflow (MWAA).

Launching the CloudFormation template

To launch your stack and provision your resources, complete the following steps:

  1. Choose Launch Stack:

This automatically launches AWS CloudFormation in your AWS account with a template. It prompts you to sign in as needed. You can view the template on the AWS CloudFormation console as required. The Amazon MWAA environment is created in the same Region as you launched the CloudFormation stack. Make sure that you create the stack in your intended Region.

The CloudFormation stack requires a few parameters, as shown in the following screenshot.

The CloudFormation stack requires a few parameters, as shown in the following screenshot.

The following table describes the parameters.

Parameter Description Default Value
Stack name Enter a meaningful name for the stack. We use MWAAEmrNBDemo for this example. Replace it with your own value. None
AirflowBucketName Name of the S3 bucket to store DAGs and support files. The S3 bucket must be in the same Region where you create the environment. The name must start with airflow-. Enter the S3 bucket created as a prerequisite. We use the S3 bucket airflow-emr-demo-us-west-2 for this post. You must replace it with your own value for this field. None
EnvironmentName An MWAA environment name that is prefixed to resource names. All the resources created by this templated are named after the value saved for this field. We name our environment mwaa-emr-blog-demo for this post. Replace it with your own value for this field. mwaa-
PrivateSubnet1CIDR The IP range (CIDR notation) for the private subnet in the first Availability Zone. For more information, see AWS CloudFormation VPC stack specifications. 10.192.20.0/24
PrivateSubnet2CIDR The IP range (CIDR notation) for the private subnet in the second Availability Zone. For more information, see AWS CloudFormation VPC stack specifications.. 10.192.21.0/24
PublicSubnet1CIDR The IP range (CIDR notation) for the public subnet in the first Availability Zone. For more information, see AWS CloudFormation VPC stack specifications. 10.192.10.0/24
PublicSubnet2CIDR The IP range (CIDR notation) for the public subnet in the second Availability Zone. For more information, see AWS CloudFormation VPC stack specifications. 10.192.11.0/24
VpcCIDR The IP range (CIDR notation) for this VPC being created. For more information, see AWS CloudFormation VPC stack specifications. 10.192.0.0/16

The default values for the IP range (CIDR notation) fields refer to the AWS CloudFormation VPC stack specifications. You can make changes based on the requirements of your own network settings.

  1. Enter the parameter values from the preceding table.
  2. Review the details on the Capabilities section and select the check boxes confirming AWS CloudFormation might create IAM resources with custom names.
  3. Choose Create Stack.

Stack creation takes a few minutes. After the CloudFormation stack is complete, on the Resources tab, you can find the resources being created in this CloudFormation stack. Now, we’re ready to run our example.

Orchestrating Hive analytics jobs on EMR Notebooks using Apache Airflow

The following diagram illustrates the workflow: As a user, you first need to create the DAG file that describes how to run the analytics jobs and upload it to the dags folder under the S3 bucket specified. The DAG can be triggered in Apache Airflow UI to orchestrate the job workflow, which includes creating an EMR cluster, waiting for the cluster to be ready, running Hive analytics jobs on EMR notebooks, uploading the results to Amazon S3, and cleaning up the cluster after the job is complete.

The following diagram illustrates the workflow.

Input notebook file

Let’s take a look at the following input notebook file find_best_sellers.ipynb, which we use for our example.

Let’s take a look at the following input notebook file find_best_sellers.ipynb, which we use for our example.

find_best_sellers.ipynb is a Python script that does analysis on the public Amazon Customer Reviews Dataset. It generates the top 20 best sellers in a given list of categories over a given period of time and saves the results to the given S3 output location. For demonstration purpose only, we rank the seller simply by the sum of review star ratings from verified purchases.

The explanations of the default parameters in the first cell and each code block are included in the notebook itself.

The last line in the first cell, we have OUTPUT_LOCATION = "s3://airflow-emr-demo-us-west-2/query_output/” as a default value for the input parameter. Replace it with your own value for the output location. You can also supply a different value for this for this parameter in the Airflow Variables later.

DAG file

The DAG file test_dag.py is used to orchestrate our job flow via Apache Airflow. It performs the following tasks:

  1. Create an EMR cluster with one m5.xlarge primary and two m5.xlarge core nodes on release version 6.2.0 with Spark, Hive, Livy and JupyterEnterpriseGateway installed as applications.
  2. Wait until the cluster is up and ready.
  3. Run the notebook find_best_sellers.ipynb on the EMR cluster created in Step 1.
  4. Wait until the notebook run is complete.
  5. Clean up the EMR cluster.

Here is the full source code of the DAG:

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from time import sleep
from datetime import datetime
import boto3, time
from builtins import range
from pprint import pprint
from airflow.operators.sensors import BaseSensorOperator
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator
from airflow.contrib.sensors.emr_job_flow_sensor import EmrJobFlowSensor
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.contrib.hooks.emr_hook import EmrHook
from airflow.contrib.sensors.emr_base_sensor import EmrBaseSensor
from airflow.models import Variable
from airflow.utils import apply_defaults
from airflow.utils.dates import days_ago

# Available categories:
#
# Apparel,Automotive,Baby,Beauty,Books,Camera,Digital_Ebook_Purchase,Digital_Music_Purchase,
# Digital_Software,Digital_Video_Download,Digital_Video_Games,Electronics,Furniture,Gift_Card,
# Grocery,Health_&_Personal_Care,Home,Home_Entertainment,Home_Improvement,Jewelry,Kitchen,
# Lawn_and_Garden,Luggage,Major_Appliances,Mobile_Apps,Mobile_Electronics,Music,Musical_Instruments,
# Office_Products,Outdoors,PC,Personal_Care_Appliances,Pet_Products,Shoes,Software,Sports,Tools,
# Toys,Video,Video_DVD,Video_Games,Watches,Wireless

# =============== VARIABLES ===============
NOTEBOOK_ID = Variable.get('NOTEBOOK_ID')
NOTEBOOK_FILE_NAME = Variable.get('NOTEBOOK_FILE_NAME')
CATEGORIES_CSV = Variable.get('CATEGORIES_CSV')
REGION = Variable.get('REGION')
SUBNET_ID = Variable.get('SUBNET_ID')
EMR_LOG_URI = Variable.get('EMR_LOG_URI')
OUTPUT_LOCATION = Variable.get('OUTPUT_LOCATION')
FROM_DATE = Variable.get('FROM_DATE')
TO_DATE = Variable.get('TO_DATE')
# =========================================

JOB_FLOW_OVERRIDES = {
    'Name': 'Test-Cluster',
    'ReleaseLabel': 'emr-6.2.0',
    'Applications': [{'Name':'Spark'}, {'Name':'Hive'}, {'Name':'Livy'}, {'Name':'JupyterEnterpriseGateway'}],
    'Configurations': [
          {
            "Classification": "hive-site",
            "Properties": {
                "hive.execution.engine": "spark"
            }
        }
    ],
    'Instances': {
        'Ec2SubnetId': SUBNET_ID,
        'InstanceGroups': [
            {
                'Name': 'Master node',
                'Market': 'ON_DEMAND',
                'InstanceRole': 'MASTER',
                'InstanceType': 'm5.xlarge',
                'InstanceCount': 1,
            },
            {
                'Name': 'Core node',
                'Market': 'ON_DEMAND',
                'InstanceRole': 'CORE',
                'InstanceType': 'm5.xlarge',
                'InstanceCount': 2,
            }
        ],
        'KeepJobFlowAliveWhenNoSteps': True,
        'TerminationProtected': False,
    },
    'JobFlowRole': 'EMR_EC2_DefaultRole',
    'ServiceRole': 'EMR_DefaultRole',
    'LogUri': EMR_LOG_URI
}


class CustomEmrJobFlowSensor(EmrJobFlowSensor):
    NON_TERMINAL_STATES = ['STARTING', 'BOOTSTRAPPING', 'TERMINATING']

class NotebookExecutionSensor(EmrBaseSensor):
    NON_TERMINAL_STATES = ['START_PENDING', 'STARTING', 'RUNNING', 'FINISHING', 'STOP_PENDING', 'STOPPING']
    FAILED_STATE = ['FAILING', 'FAILED']
    template_fields = ['notebook_execution_id']
    template_ext = ()
    @apply_defaults
    def __init__(self, notebook_execution_id, *args, **kwargs):
        super(NotebookExecutionSensor, self).__init__(*args, **kwargs)
        self.notebook_execution_id = notebook_execution_id
    def get_emr_response(self):
        emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()
        self.log.info('Poking notebook execution %s', self.notebook_execution_id)
        return emr.describe_notebook_execution(NotebookExecutionId=self.notebook_execution_id)
    @staticmethod
    def state_from_response(response):
        return response['NotebookExecution']['Status']
    @staticmethod
    def failure_message_from_response(response):
        state_change_reason = response['NotebookExecution']['LastStateChangeReason']
        if state_change_reason:
            return 'Execution failed with reason: ' + state_change_reason
        return None

def start_execution(**context):
    ti = context['task_instance']
    cluster_id = ti.xcom_pull(key='return_value', task_ids='create_cluster_task')
    print("Starting an execution using cluster: " + cluster_id)
    # generate a JSON key-pair of <String : String Array>, e.g. 
    # "\"CATEGORIES\": [\"Apparel\", \"Automotive\", \"Baby\", \"Books\"]"
    categories_escaped_quotes = ""
    for category in CATEGORIES_CSV.split(','):
        categories_escaped_quotes = categories_escaped_quotes + "\"" + category + "\","
    categories_escaped_quotes = categories_escaped_quotes[:-1]
    categories_parameter = "\"CATEGORIES\" : [" + categories_escaped_quotes + "]"

    output_location_parameter = "\"OUTPUT_LOCATION\": \"" + OUTPUT_LOCATION + "\""
    from_date_parameter = "\"FROM_DATE\": \"" + FROM_DATE + "\""
    to_date_parameter = "\"TO_DATE\": \"" + TO_DATE + "\""
    parameters = f"{{ {categories_parameter}, {output_location_parameter}, {from_date_parameter}, {to_date_parameter} }}"
    emr = boto3.client('emr', region_name=REGION)
    start_resp = emr.start_notebook_execution(
        EditorId=NOTEBOOK_ID,
        RelativePath=NOTEBOOK_FILE_NAME,
        ExecutionEngine={'Id': cluster_id, 'Type': 'EMR'},
        NotebookParams=parameters,
        ServiceRole='EMR_Notebooks_DefaultRole'
    )
    execution_id = start_resp['NotebookExecutionId']
    print("Started an execution: " + execution_id)
    return execution_id



with DAG('test_dag', description='test dag', schedule_interval='0 * * * *', start_date=datetime(2020,3,30), catchup=False) as dag:
    create_cluster = EmrCreateJobFlowOperator(
        task_id='create_cluster_task',
        job_flow_overrides=JOB_FLOW_OVERRIDES,
        aws_conn_id='aws_default',
        emr_conn_id='emr_default',
    )
    cluster_sensor = CustomEmrJobFlowSensor(
        task_id='check_cluster_task',
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster_task', key='return_value') }}",
        aws_conn_id='aws_default',
    )
    start_execution = PythonOperator(
        task_id='start_execution_task', 
        python_callable=start_execution,
        provide_context=True
    )
    execution_sensor = NotebookExecutionSensor(
        task_id='check_execution_task',
        notebook_execution_id="{{ task_instance.xcom_pull(task_ids='start_execution_task', key='return_value') }}",
        aws_conn_id='aws_default',
    )

    cluster_remover = EmrTerminateJobFlowOperator(
        task_id='terminate_cluster',
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster_task', key='return_value') }}",
        aws_conn_id='aws_default',
    )
    
    create_cluster >> cluster_sensor >> start_execution >> execution_sensor >> cluster_remover

The very last line of the DAG code explains how the tasks are linked in the orchestration workflow. It’s overloading the right shift >> operator to create a dependency, meaning that the task on the left should be run first, and the output passed to the task on the right.

Instead of hard-coding the variables in the DAG code, we choose to supply these variables by importing a JSON file in the Airflow UI before actually running the DAG. This way, we can also update the variables without having to update the DAG code, which requires updating the DAG file in Amazon S3. We walk you through how to do so in the later steps. You can see the lines for VARIABLES that we repeated:

# =============== VARIABLES ===============
NOTEBOOK_ID = Variable.get('NOTEBOOK_ID')
NOTEBOOK_FILE_NAME = Variable.get('NOTEBOOK_FILE_NAME')
CATEGORIES_CSV = Variable.get('CATEGORIES_CSV')
REGION = Variable.get('REGION')
SUBNET_ID = Variable.get('SUBNET_ID')
EMR_LOG_URI = Variable.get('EMR_LOG_URI')
OUTPUT_LOCATION = Variable.get('OUTPUT_LOCATION')
FROM_DATE = Variable.get('FROM_DATE')
TO_DATE = Variable.get('TO_DATE')

We create a JSON formatted file named variables.json for our example. See the following code:

{
    "REGION": "us-west-2",
    "SUBNET_ID": "<subnet-********>",
    "EMR_LOG_URI": "s3://<S3 path for EMR logs>/",
    "NOTEBOOK_ID": "<e-*************************>",
    "NOTEBOOK_FILE_NAME": "find_best_sellers.ipynb",
    "CATEGORIES_CSV": "Apparel,Automotive,Baby,Beauty,Books",
    "FROM_DATE": "2015-08-25",
    "TO_DATE": "2015-08-31",
    "OUTPUT_LOCATION": "s3://<S3 path for query output>/"
}

To use this JSON code, you need to replace all the variable values (subnet and S3 paths) with the actual values.

Accessing Apache Airflow UI and running the workflow

To run the workflow, complete the following steps:

  1. On the Amazon MWAA console, find the new environment mwaa-emr-blog-demo we created earlier with the CloudFormation template.

On the Amazon MWAA console, find the new environment mwaa-emr-blog-demo we created earlier with the CloudFormation template.

  1. Choose Open Airflow UI.
  2. Log in as an authenticated user.

Log in as an authenticated user.

Next, we import the JSON file for the variables into Airflow UI.

As we mentioned earlier, we want to supply the variable values for our DAG definition later upon triggering the DAG in Airflow UI instead of hard-coding the values.

  1. On the Admin menu, choose Variables.
  2. Choose Browse.
  3. Choose json.
  4. Choose Import Variables.

For more information about importing variables, see Variables.

  1. Run the following command in the same directory as where file test_dag.py is to upload the DAG file to the dags folder under the S3 bucket specified for the Airflow environment. Replace <your_airflow_bucket_name> with the S3 bucket name that you created as a prerequisite:
    aws s3 cp test_dag.py s3://<your_airflow_bucket_name>/dags/

test_dag.py should automatically appear in the Airflow UI.

  1. Trigger the DAG by turning it to On

Trigger the DAG by turning it to On

  1. Choose test_dag to go to the detail page for the DAG.

On the Graph View tab, we can see the whole workflow of our pipeline and each individual task as defined in our DAG code.

On the Graph View tab, we can see the whole workflow of our pipeline and each individual task as defined in our DAG code.

  1. Optionally, to trigger the DAG, choose Trigger DAG and add the following JSON formatted configuration before activate the DAG.

Optionally, to trigger the DAG, choose Trigger DAG and add the following JSON formatted configuration before activate the DAG.

You now get an email when failure happens on any of the tasks. You can also configure to get email notification when retry happens as well.

  1. On the Amazon EMR console, find the EMR cluster created by the create_cluster_task definition.

On the Amazon EMR console, find the EMR cluster created by the create_cluster_task definition.

  1. On the Airflow UI, you can switch tabs to check the status of the workflow tasks.

After a few minutes, we can see on the Tree View tab that the workflow is complete and all the tasks are successful.

After a few minutes, we can see on the Tree View tab that the workflow is complete and all the tasks are successful.

On the Gantt tab, we can see the time distribution of all the tasks of our workflow.

On the Gantt tab, we can see the time distribution of all the tasks of our workflow.

As specified in our DAG definition, the EMR cluster is stopped when the workflow is complete.

Because we use the cron expression 0 * * * * as the scheduled running interval for our workflow, if the triggered status of the DAG is ON, it runs every hour. You need to switch the status to OFF if you don’t want it to run again.

  1. On the Amazon S3 console, view the result of our notebook job in the S3 folder.

On the Amazon S3 console, view the result of our notebook job in the S3 folder.

For example, the following screenshot is the output for the Books category that we provided as a value in the CATEGORIES parameter. As we can see, Go Set a Watchman: A Novel is the best Books seller from the week of 8-25-2015 to 8-31-2015.

As we can see, Go Set a Watchman: A Novel is the best Books seller from the week of 8-25-2015 to 8-31-2015.

Cleaning up

To avoid ongoing charges, delete the CloudFormation stack and any files in Amazon S3 that were created by running the examples in this post.

Conclusion

This post showed how to use the Amazon EMR Notebooks API and use orchestration services such as Amazon MWAA to build ETL pipelines. It demonstrated how set up a secured Amazon MWAA environment using a CloudFormation template and run a sample workflow with Apache Airflow.

If you want to learn how to run Amazon EMR applications such as PySpark with Amazon MWAA, see Running Spark Jobs on Amazon EMR with Apache Airflow.


About the Authors

Fei Lang is a senior big data architect at Amazon Web Services. She is passionate about building the right big data solution for customers. In her spare time, she enjoys the scenery of the Pacific Northwest, going for a swim, and spending time with her family.

 

 

Ray Liu is a software development engineer at AWS. Besides work, he enjoys traveling and spending time with family.

Amazon EMR Studio (Preview): A new notebook-first IDE experience with Amazon EMR

Post Syndicated from Fei Lang original https://aws.amazon.com/blogs/big-data/amazon-emr-studio-preview-a-new-notebook-first-ide-experience-with-amazon-emr/

We’re happy to announce Amazon EMR Studio (Preview), an integrated development environment (IDE) that makes it easy for data scientists and data engineers to develop, visualize, and debug applications written in R, Python, Scala, and PySpark. EMR Studio provides fully managed Jupyter notebooks and tools like Spark UI and YARN Timeline Service to simplify debugging. EMR Studio uses AWS Single Sign-On (AWS SSO), and allows you to log in directly with your corporate credentials without signing in to the AWS Management Console.

With EMR Studio, you can run notebook code on Amazon EMR running on Amazon Elastic Compute Cloud (Amazon EC2) or Amazon EMR on Amazon Elastic Kubernetes Service (Amazon EKS), and debug your applications. For more information about Amazon EMR on Amazon EKS, see What is Amazon EMR on EKS.

EMR Studio kernels and applications run on EMR clusters, so you get the benefit of distributed data processing with the performance-optimized Apache Spark runtime that Amazon EMR provides. You can also install custom kernels and libraries, collaborate with peers using code repositories such as GitHub and Bitbucket, or run parameterized notebooks as part of scheduled workflows using orchestration services like Apache Airflow or Amazon Managed Workflows for Apache Airflow (Amazon MWAA). Administrators can set up EMR clusters that can be used by EMR Studio users, or create predefined AWS CloudFormation templates for Amazon EMR and allow you to simply choose a template for creating your own cluster.

In this post, we discuss the benefits that EMR Studio offers and we introduce to you some of its capabilities. To learn more about creating and using EMR Studios, see Use Amazon EMR Studio.

Benefits of using EMR Studio

EMR Studio offers the following benefits:

  • Set up a unified experience to develop and diagnose EMR Spark applications – Administrators can set up EMR Studio to allow you to log in using your corporate credentials without having to sign in to the AWS console. You get a single unified environment to interactively explore, process, and visualize data using notebooks, build and schedule pipelines, and debug applications without having to log in to EMR clusters.
  • Use fully managed Jupyter notebooks – With EMR Studio, you can develop analytics and data science applications in R, Python, Scala, and PySpark with fully managed Jupyter notebooks. You can take advantage of distributed processing using the performance-optimized Amazon EMR runtime for Apache Spark with Jupyter kernels and applications running on EMR clusters. you can attach notebooks to an existing cluster that uses Amazon EC2 instances, or to an EMR on EKS virtual cluster. You can also start your own clusters using templates pre-configured by administrators.
  • Collaborate with others using code repositories – From the EMR Studio notebooks environment, you can connect to code repositories such as AWS CodeCommit, GitHub, and Bitbucket to collaborate with peers.
  • Run custom Python libraries and kernels – From EMR Studio, you can install custom Python libraries or Jupyter kernels required for your applications directly to the EMR clusters.
  • Automate workflows using pipelines – EMR Studio makes it easy to move from prototyping to production. You can create EMR Studio notebooks that can be programmatically invoked with parameters, and use APIs to run the parameterized notebooks. You can also use orchestration tools such as Apache Airflow or Amazon MWAA to run notebooks in automated workflows.
  • Simplified debugging – With EMR Studio, you can debug jobs and access logs without logging in to the cluster. EMR Studio provides native application interfaces such as Spark UI and YARN Timeline. When a notebook is run in EMR Studio, the application logs are uploaded to Amazon Simple Storage Service (Amazon S3). As a result, you can access logs and diagnose applications even after your EMR cluster is terminated. You can quickly locate the job to debug by filtering based on the cluster or time when the application was run.

In the following section, we demonstrate some of the capabilities of Amazon EMR Studio using a sample notebook. For our sample notebook, we use the open-source, real-time COVID-19 US daily case reports provided by Johns Hopkins University CSSE from the following GitHub repo.

Notebook-first IDE experience with AWS SSO integration

EMR Studio makes it simple to interact with applications on an EMR cluster. After an administrator sets up EMR Studio and provides the access URL (which looks like https://es-*************************.emrstudio.us-east-1.amazonaws.com), you can log in to EMR Studio with your corporate credentials.

After you log in to EMR Studio, you get started by creating a Workspace. A Workspace is a collection of one or more notebooks for a project. The Workspaces and the notebooks that you create in EMR Studio are automatically saved in an Amazon S3 location.

Now, we create a Workspace by completing the following steps:

  1. On the EMR Studio Dashboard page, choose Create Workspace.
  2. On the Create a Workspace page, enter a Workspace name and a Description.

Naming the Workspace helps identify your project. Your workspace is automatically saved, and you can find it later on the Workspaces page. For this post, we name our Workspace EMR-Studio-WS-Demo1.

  1. On the Subnet drop-down menu, choose a subnet for your Workspace.

Each subnet belongs to the same Amazon Virtual Private Cloud (Amazon VPC) as your EMR Studio. Your administrator may have set up one or more subnets to use for your EMR clusters. You should choose a subnet that matches the subnet where you use EMR clusters. If you’re not sure about which subnet to use, contact your administrator.

  1. For S3 location, choose the Amazon S3 location where EMR Studio backs up all notebook files in the Workspace.

This location is where your Workspace and all the notebooks in the Workspace are automatically saved.

  1. In the Advanced configuration section, you can attach an EMR cluster to your Workspace.

For this post, we skip this step. EMR Studio allows you to create Workspaces and notebooks without attaching to an EMR cluster. You can attach an EMR cluster later when you’re ready to run your notebooks.

  1. Choose Create Workspace.

Fully managed environment for managing and running Jupyter-based notebooks

EMR Studio provides a fully managed environment to help organize and manage Workspaces. Workspaces are the primary building blocks of EMR Studio, and they preserve the state of your notebooks. You can create different Workspaces for each project. From within a Workspace, you can create notebooks, link your Workspace to a code repository, and attach your Workspace to an EMR cluster to run notebooks. Your Workspaces and the notebooks and settings it contains are automatically saved in the Amazon S3 location that you specify.

If you created the workspace EMR-Studio-WS-Demo1 by following the preceding steps, it appears on the Workspaces page with the name EMR-Studio-WS-Demo1 along with status Ready, creation time, and last modified timestamp.

The following table describes each possible Workspace status.

Status Meaning
Starting The Workspace is being prepared, but is not yet ready to use.
Ready You can open the Workspace to use the notebook editor. When a Workspace has a Ready status, you can open or delete it.
Attaching The Workspace is being attached to a cluster.
Attached The Workspace is attached to an EMR cluster. If a Workspace is not attached to an EMR cluster, you need to attach it to an EMR cluster before you can run any notebook code in the Workspace.
Idle

The Workspace is stopped and currently idle. When you launch an idle Workspace, the Workspace status changes from Idle to Starting to Ready.

 

Stopping The Workspace is being stopped.
Deleting When you delete a Workspace, it’s marked for deletion. EMR Studio automatically deletes Workspaces marked for deletion. After a Workspace is deleted, it no longer shows in the list of Workspaces.

You can choose the Workspace that you created (EMR-Studio-WS-Demo1) to open it. This opens a new web browser tab with the JupyterLab interface. The icon-denoted tabs on the left sidebar allow you to access tool panels such as the file browser or JupyterLab command palette. To learn more about the EMR Studio Workspace interface, see Understand the Workspace User Interface.

EMR Studio automatically creates an empty notebook with the same name as the Workspace. For this post, we the Workspace that we created, it automatically creates EMR-Studio-WS-Demo1.ipynb. In the following screenshot, no cluster or kernel is specified in the top right corner, because we didn’t choose to attach any cluster while creating the Workspace. You can write code in your new notebook, but before you run your code, you need to attach it to an EMR cluster and specify a kernel. To attach your workspace to a cluster, choose the EMR clusters icon on the left panel.

Linking Git-based code repositories with your Workspace

You can collaborate with your peers by sharing notebooks as code via code repositories. EMR Studio supports the following Git-based services:

This capability provides the following benefits:

  • Version control – Record code changes in a version control system so you can review the history of your changes and selectively revert them.
  • Collaboration – Share code with team members working in different Workspaces through remote Git-based repositories. Workspaces can clone or merge code from remote repositories and push changes back to those repositories.
  • Code reuse – Many Jupyter notebooks that demonstrate data analysis or machine learning techniques are available in publicly hosted repositories, such as GitHub. You can associate your Workspace with a GitHub repository to reuse the Jupyter notebooks contained in a repository.

To link Git repositories to your Workspace, you can link an existing repository or create a new one. When you link an existing repository, you choose from a list of Git repositories associated with the AWS account in which your EMR Studio was created.

We add a new repository by completing the following steps:

  1. Choose the Git icon.
  2. For Repository name¸ enter a name (for example, emr-notebook).
  3. For Git repository URL, enter the URL for the Git repo (for this post, we use the sample notebook at https://github.com/emrnotebooks/notebook_execution).
  4. For Git credentials, select your credentials. Because we’re using a public repo, we select Use a public repository without credentials.
  5. Choose Add repository.

After it’s added, we can see the repo on the Git repositories drop-down menu.

  1. Choose the repo to link to the Workspace.

You can link up to three Git repositories with an EMR Studio Workspace. For more information, see Link Git-Based Repositories to an EMR Studio Workspace.

  1. Choose the File browser icon to locate the Git repo we just linked.

Attaching and detaching Workspaces to and from EMR clusters

EMR Studio kernels and applications run on EMR clusters, so you get the benefit of distributed data processing using the performance-optimized EMR runtime for Apache Spark. You can attach your Workspace to an EMR cluster and get distributed data processing using Spark or custom kernels. You can use primary node capacity to run non-distributed applications.

In addition to using Amazon EMR clusters running on Amazon EC2, you can attach a Workspace to an Amazon EMR on EKS virtual cluster to run notebook code. For more information about how to use an Amazon EMR on EKS cluster in EMR Studio, see Use an Amazon EMR on EKS Cluster to Run Notebook Code.

Before you can run your notebooks, you must attach your Workspace to an EMR cluster. For more information about clusters, see Create and Use Clusters with EMR Studio.

To run the Git repo notebooks that we linked in the previous step, complete the following steps:

  1. Choose the EMR cluster
  2. Attach the Workspace to an existing EMR cluster running on Amazon EC2 instances.
  3. Open the notebook demo_pyspark.ipynb from the Git repo emr-notebook that we linked to the Workspace.

In the upper right corner of the Workspace UI, we can see the ID of the EMR cluster being attached to our Workspace, as well as the kernel selected to run the notebook.

  1. Record the value of the cluster ID (for example, <j-*************>).

We use this value later to locate the EMR cluster for application debugging purposes.

You can also detach the Workspace from the cluster in the Workspace UI and re-attach it to another cluster. For more information, see Detach a Cluster from Your Workspace.

Being able to easily attach and detach to and from any EMR cluster allows you to move any workload from prototyping into production. For example, you can start your prototype development by attaching your workspace to a development EMR cluster and working with test datasets. When you’re ready to run your notebook with larger production datasets, you can detach your workspace from the development EMR cluster and attach it to a larger production EMR cluster.

Installing and loading custom libraries and kernels

You can install notebook-scoped libraries with a PySpark kernel in EMR Studio. The libraries installed are isolated to your notebook session and don’t interfere with libraries installed via EMR bootstrap actions, or libraries installed by other EMR Studio notebook sessions that may be running on the same EMR cluster. After you install libraries for your Workspace, they’re available for other notebooks in the Workspace in the same session.

Our sample notebook demo_pyspark.ipynb is a Python script. It uses real-time COVID-19 US daily case reports as input data. The following parameters are defined in the first cell:

  • DATE – The given date used when the notebook job is started.
  • TOP_K – The top k US states with confirmed COVID-19 cases. We use this to plot Graph a.
  • US_STATES – The names of the specific US states being checked for the fatality rates of COVID-19 patients. We use this plot Graph b.

The parameters can be any of the Python data types.

Running this notebook plots two graphs:

  • Graph a – Visualizes the top k US states with most the COVID-19 cases on a given date
  • Graph b – Visualizes the fatality rates among specific US states on a given date

In our notebook, we install notebook-scoped libraries by running the following code from within a notebook cell:

sc.install_pypi_package("pandas==0.25.1")
sc.install_pypi_package("requests==2.24.0")
sc.install_pypi_package("numpy==1.19.1")
sc.install_pypi_package("kiwisolver==1.2.0")
sc.install_pypi_package("matplotlib==3.3.0")

We use these libraries in the subsequent cells for the further data analysis and visualization steps in the notebook.

The following set of parameters is used to run the notebook:

{"DATE": "10-15-2020",
 "TOP_K": 6,
"US_STATES": ["Wisconsin", "Texas", "Nevada"]}

Running all the notebook cells generates two graphs. Graph a shows the top six US states with confirmed COVID-19 cases on October 15, 2020.

Graph b shows the fatality rates of COVID-19 patients in Texas, Wisconsin, and Nevada on October 15, 2020.

EMR Studio also allows you to install Jupyter notebook kernels and Python libraries on a cluster primary node, which makes your custom environment available to any EMR Studio Workspace attached the cluster. To install the sas_kernel kernel on a cluster primary node, run the following code within a notebook cell:

!/emr/notebook-env/bin/pip install sas_kernel

The following screenshot shows your output.

For more information about how to install kernels and use libraries, see Installing and Using Kernels and Libraries.

Diagnosing applications and jobs with EMR Studio

In EMR Studio, you can quickly debug jobs and access logs without logging in to the cluster, such as setting up a web proxy through an SSH connection, for both active and stopped clusters. You can use native application interfaces such as Spark UI and YARN Timeline Service directly from EMR Studio. EMR Studio also allows you to quickly locate the cluster or job to debug by using filters such as cluster state, creation time, and cluster ID. For more information, see Diagnose Applications and Jobs with EMR Studio.

Now, we show you how to open a native application interface to debug the notebook job that already finished.

  1. On the EMR Studio page, choose Clusters.

A list appears with all the EMR clusters launched under the same AWS account. You can filter the list by cluster state, cluster ID, or creation time range by entering values in the provided fields.

  1. Choose the cluster ID of the EMR cluster that we attached to the Workspace EMR-Studio-WS-Demo1 for running notebook demo_pyspark.ipynb.
  2. For Spark job debugging, on the Launch application UIs menu, choose Spark History Server.

The following screenshot shows you the Spark job debugging UI.

We can traverse the details for our notebook application by checking actual logs from the Spark History Server, as in the following screenshot.

  1. For Yarn application debugging, on the Launch application UIs menu, choose Yarn Timeline Server.

The following screenshot shows the Yarn debugging UI.

Orchestrating analytics notebook jobs to build ETL production pipelines

EMR Studio makes it easy for you to move any analytics workload from prototyping to production. With EMR Studio, you can run parameterized notebooks as part of scheduled workflows using orchestration services like AWS Step Functions and Apache Airflow or Amazon MWAA.

In this section, we show a simple example of how to orchestrate running notebook workflows using Apache Airflow.

We have a fully tested notebook under an EMR Studio Workspace, and want to schedule a workflow that runs the notebook on an on-demand EMR cluster every 10 minutes.

Record the value of the Workspace ID (for example, e-*****************************) and the notebook file path relative to the home directory within the Workspace (for example, demo.ipynb or my_folder/demo.ipynb)

The workflow that we create takes care of the following tasks:

  1. Create an EMR cluster.
  2. Wait until the cluster is ready.
  3. Start running a notebook defined by the Workspace ID, notebook file path, and the cluster created.
  4. Wait until the notebook is complete.

The following screenshot is the tree view of this example DAG. The DAG definition is available on the GitHub repo. Make sure you replace any placeholder values with the actual ones before using.

When you open the Gantt chart of one of the successful notebooks, we can see the timeline of our workflow. The time spent creating the cluster and creating a notebook execution is negligible compared to the time spent waiting for the cluster to be ready and waiting for the notebook to finish, which meets the expectation of our SLA.

This example is a just starting point. Try it out and extend it with more sophisticated workflows that suit your needs.

Summary

In this post, we highlighted some of the capabilities of EMR Studio, such as the ability to log in via AWS SSO, access fully managed Jupyter notebooks, link Git-based code repositories, change clusters, load custom Python libraries and kernels, diagnose clusters and jobs using native application UIs, and orchestrate notebook jobs using Apache Airflow or Amazon MWAA.

There is no additional charge for using EMR Studio in public preview, and you only pay for the use of the EMR cluster or other AWS services such as AWS Service Catalog. For more information, see the EMR Studio FAQs.

EMR Studio is available on Amazon EMR release version 6.2 and later, in the US East (N. Virginia), US West (Oregon), and EU (Ireland) Regions for public preview. For the latest Region availability for the public preview, see Considerations.

If you have questions or suggestions, feel free to leave a comment.


About the  Authors

Fei Lang is a Senior Big Data Architect at Amazon Web Services. She is passionate about building the right big data solution for customers. In her spare time, she enjoys the scenery of the Pacific Northwest, going for a swim, and spending time with her family.

 

 

 

Shuang Li is a Senior Product Manager for Amazon EMR at AWS. She holds a doctoral degree in Computer Science and Engineering from Ohio State University.

 

 

Ray Liu is a Software Development Engineer at AWS. Besides work, he enjoys traveling and spending time with family.

 

 

 

Kendra Ellis is a Programmer Writer at AWS.

 

 

 

 

Orchestrating analytics jobs by running Amazon EMR Notebooks programmatically

Post Syndicated from Fei Lang original https://aws.amazon.com/blogs/big-data/orchestrating-analytics-jobs-by-running-amazon-emr-notebooks-programmatically/

Amazon EMR is a big data service offered by AWS to run Apache Spark and other open-source applications on AWS in a cost-effective manner. Amazon EMR Notebooks is a managed environment based on Jupyter Notebook that allows data scientists, analysts, and developers to prepare and visualize data, collaborate with peers, build applications, and perform interactive analysis using EMR clusters.

EMR notebook APIs are available on Amazon EMR release version 5.18.0 or later and can be used to run EMR notebooks via a script or command line. The ability to start, stop, list, and describe EMR notebook runs without the Amazon EMR console enables you to programmatically control running an EMR notebook. Using a parameterized notebook cell allows you to pass different parameter values to a notebook without having to create a copy of the notebook for each new set of parameter values. With this feature, you can schedule running EMR notebooks with cron scripts, chain multiple EMR notebooks, and use orchestration services such as AWS Step Functions or Apache Airflow to build pipelines. If you want to use EMR notebooks in a non-interactive manner, this enables you to run ETL workloads, especially in production.

In this post, we show how to orchestrate analytics jobs by running EMR Notebooks programmatically with the following two use cases:

For our data source, we use the open-source, real-time COVID-19 US daily case reports provided by Johns Hopkins University CSSE in the following GitHub repo.

Prerequisites

Before getting started, you must have the following prerequisites:

Record the notebook ID (for example, <e-*************************>); you use this later for our examples later. Organize the notebook files in the Jupyter UI as follows:

  • /demo_pyspark.ipynb
  • /experiment/trailing_N_day.ipynb

See Creating a Notebook for more information on how to create an EMR notebook.

Use case 1: Scheduling an EMR notebook to run via crontab and the AWS CLI

We use demo_pyspark.ipynb as the input notebook file, as mentioned in the prerequisites. In this use case, we use the AWS CLI to call the EMR Notebooks Execution API to run a notebook using some parameters that we pass in. We then download the notebook output and visualize it using the local Jupyter server.

First, we use the AWS CLI to run an example notebook using the EMR Notebooks Execution API.

demo_pyspark.ipynb is a Python script. The following parameters are defined in the first cell:

  • DATE – The date used when the notebook job is started.
  • TOP_K – The top k US states with confirmed COVID-19 cases. We use this to plot Graph
  • US_STATES – The names of the specific US states being checked for the fatality rates of COVID-19 patients. We use this plot Graph b.

Running this notebook plots two graphs:

  • Graph a – Visualizes the top k US states with most COVID-19 cases on a given date
  • Graph b – Visualizes the fatality rates among specific US states on a given date

The parameters in the first cell can be passed to the EMR Notebooks StartNotebookExecution API, which you can call via the AWS CLI or SDK. The following code is an example of the EMR notebook first cell, containing parameters with corresponding values in JSON format. It means the notebook uses the date 10-13-2020. For Graph a, we visualize the top five US states with confirmed COVID-19 cases on October 13, 2020. For Graph b, we visualize the fatality rates of COVID-19 patients in Alabama, California, and Arizona on October 13, 2020. See the following code:

{"DATE": "10-13-2020",
 "TOP_K": 5,
"US_STATES": ["Alabama", "California", "Arizona"]}

For this example, the parameters can be any of the Python Data Types.

Run the notebook using the following new set of parameters:

{"DATE": "10-15-2020",
 "TOP_K": 6,
"US_STATES": ["Wisconsin", "Texas", "Nevada"]}

Running an EMR notebook with the AWS CLI

Run the following command (replace <e-*************************> with the ID of the EMR notebook and <j-*************> with the EMR cluster ID as mentioned in the prerequisites):

% aws emr --region us-west-2 start-notebook-execution \
--editor-id <e-*************************> \
--notebook-params '{"DATE":"10-15-2020", "TOP_K": 6, "US_STATES": ["Wisconsin", "Texas", "Nevada"]}' \
--relative-path demo_pyspark.ipynb \
--notebook-execution-name demo \
--execution-engine '{"Id" : "<j-*************>"}' \
--service-role EMR_Notebooks_DefaultRole

The start-notebook-execution command returns an output similar to the following JSON document:

{
 "NotebookExecutionId": "ex-*****************************"
}

Record the value of NotebookExecutionId; you use in the next step.

Running the describe-notebook-execution command

Run the following command (replace <ex-*****************************> with the value of NotebookExecutionId from the previous step):

% aws emr --region us-west-2 describe-notebook-execution \
--notebook-execution-id <ex-*****************************>

The describe-notebook-execution command returns an output similar to the following JSON document:

{
  "NotebookExecution": {
    "NotebookExecutionId": "ex-*****************************",
    "EditorId": "e-*************************",
    "ExecutionEngine": {
      "Id": "<j-*************>",
      "Type": "EMR",
      "MasterInstanceSecurityGroupId": "sg-********"
    },
    "NotebookExecutionName": "demo",
    "NotebookParams": "{\"DATE\":\"10-15-2020\", \"TOP_K\": 6, \"US_STATES\": [\"Wisconsin\", \"Texas\", \"Nevada\"]}",
    "Status": "FINISHED",
    "StartTime": "2020-10-18T19:46:01.125000-07:00",
    "EndTime": "2020-10-18T19:47:24.014000-07:00",
    "Arn": "arn:aws:elasticmapreduce:us-west-2:123456789012:notebook-execution/ex-*****************************",
    "OutputNotebookURI": "s3://<notebook_bucket_location>/e-*************************/executions/ex-*****************************/demo_pyspark.ipynb",
    "LastStateChangeReason": "Execution is finished for cluster j-*************.",
    "NotebookInstanceSecurityGroupId": "sg-********",
    "Tags": []
  }
}

You can pass different parameter values to the same notebook without having to create a copy of the notebook for each new set of parameter values or log in to the Jupyter Notebooks UI via the Amazon EMR console.

Downloading the output file and visualizing the output with a local Jupyter server

EMR notebooks use Papermill to run the notebook. When it runs, a new notebook file is created with input parameters so as not to overwrite the existing file. The notebook is then started, and the output notebook can be found in s3://<Notebook bucket location>/<editor id>/executions/<Execution id>/<input file name>.

We run the following s3 cp command to download the EMR notebook output file to a local directory (replace <notebook_bucket_location> with the S3 location specified for the notebook during creation, <e-*************************> with the EMR Notebook ID, and <ex-*****************************> with the value of NotebookExecutionId from the previous step):

% aws s3 cp s3://<notebook_bucket_location>/<e-*************************>/executions/<ex-*****************************>/demo_pyspark.ipynb

In the same directory where we downloaded the EMR notebook output file, run the following command to start a local Jupyter server:

% jupyter lab

The URL http://localhost:8888/lab automatically opens in your web browser, as shown in the following screenshot.

Choose demo_pyspark.ipynb to view the output file. In the output, it plots two graphs. Graph a shows the top six US states with confirmed COVID-19 cases on a given date.

Graph b shows the fatality rates of COVID-19 patients in Texas, Wisconsin, and Nevada on a given date.

Scheduling to run a notebook daily using crontab

We have completed running the EMR notebook using the AWS CLI. Now, we demonstrate how to schedule running a notebook daily using crontab. We use the same notebook input file with the same parameters as the previous example. On a daily basis, it generates Graph a with the top six US states with confirmed COVID-19 cases, and Graph b with the fatality rates of COVID-19 patients in Texas, Wisconsin, and Nevada.

We start by creating a bash script named run_notebook_daily.sh. The script starts an EMR notebook, waits for the notebook to either finish running or fail, and copies the output file to the local directory ~/daily_reports/.

The following code is the content of run_notebook_daily.sh (replace <e-*************************> with the ID of EMR Notebook and <j-*************> with the EMR cluster ID):

# Generate a report for day before yesterday
day_before_yesterday=`date -v-2d +'%m-%d-%Y'`

# Start an execution
execution_id=`aws emr start-notebook-execution \
--editor-id <e-*****************************> \
--notebook-params '{"DATE":"'"$day_before_yesterday"'", "TOP_K": 6, "US_STATES": ["Wisconsin", "Texas", "Nevada"]}' \
--relative-path demo_pyspark.ipynb \
--notebook-execution-name demo \
--execution-engine '{"Id" : "<j-*********">}' \
--service-role EMR_Notebooks_DefaultRole | jq -r .'NotebookExecutionId'`

echo "Started an execution for the date $day_before_yesterday. Execution id: $execution_id"

# Poll for execution to finish
while
    execution_status=`aws emr describe-notebook-execution --notebook-execution-id $execution_id | jq -r .'NotebookExecution.Status'`
    echo "Execution Status: $execution_status"
    
    if [ $execution_status == "FINISHED" ] || [ $execution_status == "FAILED" ]; then
        # Copy the output file to local directory
        output_file=`aws emr describe-notebook-execution --notebook-execution-id $execution_id | jq -r .'NotebookExecution.OutputNotebookURI'`
        mkdir -p daily_reports
        aws s3 cp "$output_file" daily_reports/
       break
    fi
    sleep 15s
do true; done

Next, we add this script to a crontab to run our EMR notebook job daily at 9:00 AM:

% crontab
0 9 * * * bash /folder/path/run_notebook_daily.sh >/tmp/stdout.log 2>/tmp/stderr.log

This is a simple example of how to schedule running an EMR notebook with a crontab.

Use case 2: Chaining EMR notebooks with Step Functions triggered by CloudWatch Events

We use demo_pyspark.ipynb and trailing_N_day.ipynb as the input notebook files for this use case. We also provide a CloudFormation template as a general guide. Please review and customize it as needed. Be aware that some of the resources deployed by this stack incur costs when they remain in use.

The following diagram illustrates the resources that the CloudFormation template creates.

The template first creates a step function to run a chain of EMR notebooks, which takes care of the following tasks:

  • Runs notebook demo_pyspark.ipynb with given parameters and waits until it’s complete. It plots a graph of the top k US states with most COVID-19 cases yesterday.
  • Runs notebook input trailing_N_day.ipynb using the output from the first task. It takes the US state with the most confirmed COVID-19 cases nationally yesterday as the input, and plots a 30-day confirmed COVID-19 case number graph, showing the case growth trend of that state until yesterday.

The template also creates a CloudWatch event that periodically triggers the step function according to the given schedule expression.

Launching the CloudFormation template

To launch your stack and provision your resources, complete the following steps:

  1. Choose Launch Stack:

This automatically launches AWS CloudFormation in your AWS account with a template. It may prompt you to sign in as needed. You can view the template on the AWS CloudFormation console as required. Make sure that you create the stack in your intended Region.

The CloudFormation stack requires a few parameters, as shown in the following screenshot.

The following table describes the parameters:

Parameter Description Default Value
Stack name Enter a meaningful name for the stack, for example, emrRunnableNotebookDemo. None
ClusterId The unique ID of the EMR cluster that runs the notebook (j-*************). None
NotebookARelativePath The path and file name of the notebook input file A (demo_pyspark.ipynb), relative to the path specified for the EMR notebook. For more information, see Notebook execution CLI command samples. demo_pyspark.ipynb
NotebookBRelativePath The path and file name of the notebook input file B (trailing_N_day.ipynb), relative to the path specified for the EMR notebook. experiment/trailing_N_day.ipynb
NotebookId The unique ID of the EMR notebook to use for running the notebook (e-*****************************). None
ScheduleExpression How the notebook is scheduled to run. For more information, see Schedule Expressions for Rules. rate(1 day)
StorageLocation The Amazon S3 path where the EMR notebook is stored (s3://aws-emr-resources-************-us-west-2/notebooks/e-*************************). None
TopK The value of one of the parameters used to run notebook A. In this example, it checks the top k US states with confirmed COVID-19 cases and plots a graph for it. 20

 

  1. Enter the parameter values from the preceding table.
  2. Review the Capabilities section and select the check boxes confirming AWS CloudFormation might create IAM resources with custom names.
  3. Choose Create Stack.

Stack creation only takes a few minutes. When the stack is complete, on the Resources tab, you can find the resources created as shown in the following screenshot.

Checking the notebook output files

When a step function is complete, you can find the execution IDs in the step function output.

We run the following command to view the output files (replace <notebook_bucket_location> with the Amazon S3 location specified for the notebook during creation and <e-*************************> with the EMR notebook ID):

% aws s3 ls --recursive s3://<notebook_bucket_location>/<e-*************************>/executions/

The aws s3 ls --recursive command returns an output similar to the following:

2020-10-16 16:39:02     267780 notebooks/e-*************************/executions/ex-*****************************/demo_pyspark.ipynb
2020-10-16 16:44:14     267780 notebooks/e-*************************/executions/ex-*****************************/trailing_N_day.ipynb.ipynb
2020-10-16 17:00:37      18600 notebooks/e-*************************/executions/ex-*****************************/demo_pyspark.ipynb
2020-10-16 16:49:08     267781 notebooks/e-*************************/executions/ex-*****************************/trailing_N_day.ipynb.ipynb
2020-10-16 16:59:01     267780 notebooks/e-*************************/executions/ex-*****************************/demo_pyspark.ipynb
2020-10-16 16:54:06     267780 notebooks/e-*************************/executions/ex-*****************************/trailing_N_day.ipynb.ipynb

Downloading and visualizing the results

Follow the same steps in the first use case to download and visualize the results.

The following screenshot is the graph plotted in the notebook input file A (demo_pyspark.ipynb ) output file. It shows the top 20 US states with confirmed COVID-19 cases yesterday.

The output of input file B (trailing_N_day.ipynb) plots the graph as shown in the following screenshot. It takes the US state with the most confirmed COVID-19 cases nationally yesterday as the input and plots a 30-day confirmed COVID-19 case number graph, showing the case growth trend of that state until yesterday.

This example step function is the orchestration for running two notebook input files: the second notebook uses the result from the first. It also monitors the first notebook until it is complete, and populates the Amazon S3 file location in the outputs. You can achieve more sophisticated orchestration by adding more states in the step function.

Cleaning up

To avoid ongoing charges, delete the CloudFormation stack, the EMR cluster, and any files in Amazon S3 that were created by running the examples in this post.

Conclusion

This post showed how you can schedule running an EMR notebook using crontab and the AWS CLI, and how to chain EMR notebooks with Step Functions triggered by CloudWatch events. The EMR Notebooks Execution API enables the parameterization for EMR notebooks. With this feature, you can also use orchestration services such as Apache Airflow to build ETL pipelines.


About the Authors

Fei Lang is a senior big data architect at Amazon Web Services. She is passionate about building the right big data solution for customers. In her spare time, she enjoys the scenery of the Pacific Northwest, going for a swim, and spending time with her family.

 

 

 

Ray Liu is a software development engineer at AWS. Besides work, he enjoys traveling and spending time with family.

 

 

 

Palaniappan Nagarajan is a Software Development Engineer at Amazon EMR working mainly on EMR Notebooks. In his spare time, he likes to hike, try out different cuisines, and scan the night sky with his telescope.

 

 

Shuang Li is a senior product manager for Amazon EMR at AWS. She holds a doctoral degree in Computer Science and Engineering from Ohio State University.