Orchestrating analytics jobs by running Amazon EMR Notebooks programmatically

Post Syndicated from Fei Lang original https://aws.amazon.com/blogs/big-data/orchestrating-analytics-jobs-by-running-amazon-emr-notebooks-programmatically/

Amazon EMR is a big data service offered by AWS to run Apache Spark and other open-source applications on AWS in a cost-effective manner. Amazon EMR Notebooks is a managed environment based on Jupyter Notebook that allows data scientists, analysts, and developers to prepare and visualize data, collaborate with peers, build applications, and perform interactive analysis using EMR clusters.

EMR notebook APIs are available on Amazon EMR release version 5.18.0 or later and can be used to run EMR notebooks via a script or command line. The ability to start, stop, list, and describe EMR notebook runs without the Amazon EMR console enables you to programmatically control running an EMR notebook. Using a parameterized notebook cell allows you to pass different parameter values to a notebook without having to create a copy of the notebook for each new set of parameter values. With this feature, you can schedule running EMR notebooks with cron scripts, chain multiple EMR notebooks, and use orchestration services such as AWS Step Functions or Apache Airflow to build pipelines. If you want to use EMR notebooks in a non-interactive manner, this enables you to run ETL workloads, especially in production.

In this post, we show how to orchestrate analytics jobs by running EMR Notebooks programmatically with the following two use cases:

For our data source, we use the open-source, real-time COVID-19 US daily case reports provided by Johns Hopkins University CSSE in the following GitHub repo.

Prerequisites

Before getting started, you must have the following prerequisites:

Record the notebook ID (for example, <e-*************************>); you use this later for our examples later. Organize the notebook files in the Jupyter UI as follows:

  • /demo_pyspark.ipynb
  • /experiment/trailing_N_day.ipynb

See Creating a Notebook for more information on how to create an EMR notebook.

Use case 1: Scheduling an EMR notebook to run via crontab and the AWS CLI

We use demo_pyspark.ipynb as the input notebook file, as mentioned in the prerequisites. In this use case, we use the AWS CLI to call the EMR Notebooks Execution API to run a notebook using some parameters that we pass in. We then download the notebook output and visualize it using the local Jupyter server.

First, we use the AWS CLI to run an example notebook using the EMR Notebooks Execution API.

demo_pyspark.ipynb is a Python script. The following parameters are defined in the first cell:

  • DATE – The date used when the notebook job is started.
  • TOP_K – The top k US states with confirmed COVID-19 cases. We use this to plot Graph
  • US_STATES – The names of the specific US states being checked for the fatality rates of COVID-19 patients. We use this plot Graph b.

Running this notebook plots two graphs:

  • Graph a – Visualizes the top k US states with most COVID-19 cases on a given date
  • Graph b – Visualizes the fatality rates among specific US states on a given date

The parameters in the first cell can be passed to the EMR Notebooks StartNotebookExecution API, which you can call via the AWS CLI or SDK. The following code is an example of the EMR notebook first cell, containing parameters with corresponding values in JSON format. It means the notebook uses the date 10-13-2020. For Graph a, we visualize the top five US states with confirmed COVID-19 cases on October 13, 2020. For Graph b, we visualize the fatality rates of COVID-19 patients in Alabama, California, and Arizona on October 13, 2020. See the following code:

{"DATE": "10-13-2020",
 "TOP_K": 5,
"US_STATES": ["Alabama", "California", "Arizona"]}

For this example, the parameters can be any of the Python Data Types.

Run the notebook using the following new set of parameters:

{"DATE": "10-15-2020",
 "TOP_K": 6,
"US_STATES": ["Wisconsin", "Texas", "Nevada"]}

Running an EMR notebook with the AWS CLI

Run the following command (replace <e-*************************> with the ID of the EMR notebook and <j-*************> with the EMR cluster ID as mentioned in the prerequisites):

% aws emr --region us-west-2 start-notebook-execution \
--editor-id <e-*************************> \
--notebook-params '{"DATE":"10-15-2020", "TOP_K": 6, "US_STATES": ["Wisconsin", "Texas", "Nevada"]}' \
--relative-path demo_pyspark.ipynb \
--notebook-execution-name demo \
--execution-engine '{"Id" : "<j-*************>"}' \
--service-role EMR_Notebooks_DefaultRole

The start-notebook-execution command returns an output similar to the following JSON document:

{
 "NotebookExecutionId": "ex-*****************************"
}

Record the value of NotebookExecutionId; you use in the next step.

Running the describe-notebook-execution command

Run the following command (replace <ex-*****************************> with the value of NotebookExecutionId from the previous step):

% aws emr --region us-west-2 describe-notebook-execution \
--notebook-execution-id <ex-*****************************>

The describe-notebook-execution command returns an output similar to the following JSON document:

{
  "NotebookExecution": {
    "NotebookExecutionId": "ex-*****************************",
    "EditorId": "e-*************************",
    "ExecutionEngine": {
      "Id": "<j-*************>",
      "Type": "EMR",
      "MasterInstanceSecurityGroupId": "sg-********"
    },
    "NotebookExecutionName": "demo",
    "NotebookParams": "{\"DATE\":\"10-15-2020\", \"TOP_K\": 6, \"US_STATES\": [\"Wisconsin\", \"Texas\", \"Nevada\"]}",
    "Status": "FINISHED",
    "StartTime": "2020-10-18T19:46:01.125000-07:00",
    "EndTime": "2020-10-18T19:47:24.014000-07:00",
    "Arn": "arn:aws:elasticmapreduce:us-west-2:123456789012:notebook-execution/ex-*****************************",
    "OutputNotebookURI": "s3://<notebook_bucket_location>/e-*************************/executions/ex-*****************************/demo_pyspark.ipynb",
    "LastStateChangeReason": "Execution is finished for cluster j-*************.",
    "NotebookInstanceSecurityGroupId": "sg-********",
    "Tags": []
  }
}

You can pass different parameter values to the same notebook without having to create a copy of the notebook for each new set of parameter values or log in to the Jupyter Notebooks UI via the Amazon EMR console.

Downloading the output file and visualizing the output with a local Jupyter server

EMR notebooks use Papermill to run the notebook. When it runs, a new notebook file is created with input parameters so as not to overwrite the existing file. The notebook is then started, and the output notebook can be found in s3://<Notebook bucket location>/<editor id>/executions/<Execution id>/<input file name>.

We run the following s3 cp command to download the EMR notebook output file to a local directory (replace <notebook_bucket_location> with the S3 location specified for the notebook during creation, <e-*************************> with the EMR Notebook ID, and <ex-*****************************> with the value of NotebookExecutionId from the previous step):

% aws s3 cp s3://<notebook_bucket_location>/<e-*************************>/executions/<ex-*****************************>/demo_pyspark.ipynb

In the same directory where we downloaded the EMR notebook output file, run the following command to start a local Jupyter server:

% jupyter lab

The URL http://localhost:8888/lab automatically opens in your web browser, as shown in the following screenshot.

Choose demo_pyspark.ipynb to view the output file. In the output, it plots two graphs. Graph a shows the top six US states with confirmed COVID-19 cases on a given date.

Graph b shows the fatality rates of COVID-19 patients in Texas, Wisconsin, and Nevada on a given date.

Scheduling to run a notebook daily using crontab

We have completed running the EMR notebook using the AWS CLI. Now, we demonstrate how to schedule running a notebook daily using crontab. We use the same notebook input file with the same parameters as the previous example. On a daily basis, it generates Graph a with the top six US states with confirmed COVID-19 cases, and Graph b with the fatality rates of COVID-19 patients in Texas, Wisconsin, and Nevada.

We start by creating a bash script named run_notebook_daily.sh. The script starts an EMR notebook, waits for the notebook to either finish running or fail, and copies the output file to the local directory ~/daily_reports/.

The following code is the content of run_notebook_daily.sh (replace <e-*************************> with the ID of EMR Notebook and <j-*************> with the EMR cluster ID):

# Generate a report for day before yesterday
day_before_yesterday=`date -v-2d +'%m-%d-%Y'`

# Start an execution
execution_id=`aws emr start-notebook-execution \
--editor-id <e-*****************************> \
--notebook-params '{"DATE":"'"$day_before_yesterday"'", "TOP_K": 6, "US_STATES": ["Wisconsin", "Texas", "Nevada"]}' \
--relative-path demo_pyspark.ipynb \
--notebook-execution-name demo \
--execution-engine '{"Id" : "<j-*********">}' \
--service-role EMR_Notebooks_DefaultRole | jq -r .'NotebookExecutionId'`

echo "Started an execution for the date $day_before_yesterday. Execution id: $execution_id"

# Poll for execution to finish
while
    execution_status=`aws emr describe-notebook-execution --notebook-execution-id $execution_id | jq -r .'NotebookExecution.Status'`
    echo "Execution Status: $execution_status"
    
    if [ $execution_status == "FINISHED" ] || [ $execution_status == "FAILED" ]; then
        # Copy the output file to local directory
        output_file=`aws emr describe-notebook-execution --notebook-execution-id $execution_id | jq -r .'NotebookExecution.OutputNotebookURI'`
        mkdir -p daily_reports
        aws s3 cp "$output_file" daily_reports/
       break
    fi
    sleep 15s
do true; done

Next, we add this script to a crontab to run our EMR notebook job daily at 9:00 AM:

% crontab
0 9 * * * bash /folder/path/run_notebook_daily.sh >/tmp/stdout.log 2>/tmp/stderr.log

This is a simple example of how to schedule running an EMR notebook with a crontab.

Use case 2: Chaining EMR notebooks with Step Functions triggered by CloudWatch Events

We use demo_pyspark.ipynb and trailing_N_day.ipynb as the input notebook files for this use case. We also provide a CloudFormation template as a general guide. Please review and customize it as needed. Be aware that some of the resources deployed by this stack incur costs when they remain in use.

The following diagram illustrates the resources that the CloudFormation template creates.

The template first creates a step function to run a chain of EMR notebooks, which takes care of the following tasks:

  • Runs notebook demo_pyspark.ipynb with given parameters and waits until it’s complete. It plots a graph of the top k US states with most COVID-19 cases yesterday.
  • Runs notebook input trailing_N_day.ipynb using the output from the first task. It takes the US state with the most confirmed COVID-19 cases nationally yesterday as the input, and plots a 30-day confirmed COVID-19 case number graph, showing the case growth trend of that state until yesterday.

The template also creates a CloudWatch event that periodically triggers the step function according to the given schedule expression.

Launching the CloudFormation template

To launch your stack and provision your resources, complete the following steps:

  1. Choose Launch Stack:

This automatically launches AWS CloudFormation in your AWS account with a template. It may prompt you to sign in as needed. You can view the template on the AWS CloudFormation console as required. Make sure that you create the stack in your intended Region.

The CloudFormation stack requires a few parameters, as shown in the following screenshot.

The following table describes the parameters:

Parameter Description Default Value
Stack name Enter a meaningful name for the stack, for example, emrRunnableNotebookDemo. None
ClusterId The unique ID of the EMR cluster that runs the notebook (j-*************). None
NotebookARelativePath The path and file name of the notebook input file A (demo_pyspark.ipynb), relative to the path specified for the EMR notebook. For more information, see Notebook execution CLI command samples. demo_pyspark.ipynb
NotebookBRelativePath The path and file name of the notebook input file B (trailing_N_day.ipynb), relative to the path specified for the EMR notebook. experiment/trailing_N_day.ipynb
NotebookId The unique ID of the EMR notebook to use for running the notebook (e-*****************************). None
ScheduleExpression How the notebook is scheduled to run. For more information, see Schedule Expressions for Rules. rate(1 day)
StorageLocation The Amazon S3 path where the EMR notebook is stored (s3://aws-emr-resources-************-us-west-2/notebooks/e-*************************). None
TopK The value of one of the parameters used to run notebook A. In this example, it checks the top k US states with confirmed COVID-19 cases and plots a graph for it. 20

 

  1. Enter the parameter values from the preceding table.
  2. Review the Capabilities section and select the check boxes confirming AWS CloudFormation might create IAM resources with custom names.
  3. Choose Create Stack.

Stack creation only takes a few minutes. When the stack is complete, on the Resources tab, you can find the resources created as shown in the following screenshot.

Checking the notebook output files

When a step function is complete, you can find the execution IDs in the step function output.

We run the following command to view the output files (replace <notebook_bucket_location> with the Amazon S3 location specified for the notebook during creation and <e-*************************> with the EMR notebook ID):

% aws s3 ls --recursive s3://<notebook_bucket_location>/<e-*************************>/executions/

The aws s3 ls --recursive command returns an output similar to the following:

2020-10-16 16:39:02     267780 notebooks/e-*************************/executions/ex-*****************************/demo_pyspark.ipynb
2020-10-16 16:44:14     267780 notebooks/e-*************************/executions/ex-*****************************/trailing_N_day.ipynb.ipynb
2020-10-16 17:00:37      18600 notebooks/e-*************************/executions/ex-*****************************/demo_pyspark.ipynb
2020-10-16 16:49:08     267781 notebooks/e-*************************/executions/ex-*****************************/trailing_N_day.ipynb.ipynb
2020-10-16 16:59:01     267780 notebooks/e-*************************/executions/ex-*****************************/demo_pyspark.ipynb
2020-10-16 16:54:06     267780 notebooks/e-*************************/executions/ex-*****************************/trailing_N_day.ipynb.ipynb

Downloading and visualizing the results

Follow the same steps in the first use case to download and visualize the results.

The following screenshot is the graph plotted in the notebook input file A (demo_pyspark.ipynb ) output file. It shows the top 20 US states with confirmed COVID-19 cases yesterday.

The output of input file B (trailing_N_day.ipynb) plots the graph as shown in the following screenshot. It takes the US state with the most confirmed COVID-19 cases nationally yesterday as the input and plots a 30-day confirmed COVID-19 case number graph, showing the case growth trend of that state until yesterday.

This example step function is the orchestration for running two notebook input files: the second notebook uses the result from the first. It also monitors the first notebook until it is complete, and populates the Amazon S3 file location in the outputs. You can achieve more sophisticated orchestration by adding more states in the step function.

Cleaning up

To avoid ongoing charges, delete the CloudFormation stack, the EMR cluster, and any files in Amazon S3 that were created by running the examples in this post.

Conclusion

This post showed how you can schedule running an EMR notebook using crontab and the AWS CLI, and how to chain EMR notebooks with Step Functions triggered by CloudWatch events. The EMR Notebooks Execution API enables the parameterization for EMR notebooks. With this feature, you can also use orchestration services such as Apache Airflow to build ETL pipelines.


About the Authors

Fei Lang is a senior big data architect at Amazon Web Services. She is passionate about building the right big data solution for customers. In her spare time, she enjoys the scenery of the Pacific Northwest, going for a swim, and spending time with her family.

 

 

 

Ray Liu is a software development engineer at AWS. Besides work, he enjoys traveling and spending time with family.

 

 

 

Palaniappan Nagarajan is a Software Development Engineer at Amazon EMR working mainly on EMR Notebooks. In his spare time, he likes to hike, try out different cuisines, and scan the night sky with his telescope.

 

 

Shuang Li is a senior product manager for Amazon EMR at AWS. She holds a doctoral degree in Computer Science and Engineering from Ohio State University.