Improve observability across Amazon MWAA tasks

Post Syndicated from Payal Singh original https://aws.amazon.com/blogs/big-data/improve-observability-across-amazon-mwaa-tasks/

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow that makes it simple to set up and operate end-to-end data pipelines in the cloud at scale. A data pipeline is a set of tasks and processes used to automate the movement and transformation of data between different systems.­ The Apache Airflow open-source community provides over 1,000 pre-built operators (plugins that simplify connections to services) for Apache Airflow to build data pipelines. The Amazon provider package for Apache Airflow comes with integrations for over 31 AWS services, such as Amazon Simple Storage Service (Amazon S3), Amazon Redshift, Amazon EMR, AWS Glue, Amazon SageMaker, and more.

The most common use case for Airflow is ETL (extract, transform, and load). Nearly all Airflow users implement ETL pipelines ranging from simple to complex. Operationalizing machine learning (ML) is another growing use case, where data has to be transformed and normalized before it can be loaded into an ML model. In both use cases, the data pipeline is preparing the data for consumption by ingesting data from different sources and transforming it through a series of steps.

Observability across the different processes within the data pipeline is a key component to monitor the success or failure of the pipeline. Although scheduling the runs of tasks within the data pipeline is controlled by Airflow, the run of the task itself (transforming, normalizing, and aggregating data) is done by different services based on the use case. Having an end-to-end view of the data flow is a challenge due to multiple touch points in the data pipeline.

In this post, we provide an overview of logging enhancements when working with Amazon MWAA, which is one of the pillars of observability. We then discuss a solution to further enhance end-to-end observability by modifying the task definitions that make up the data pipeline. For this post, we focus on task definitions for two services: AWS Glue and Amazon EMR­, however the same method can be applied across different services.

Challenge

Many customers’ data pipelines start simple, orchestrating a few tasks, and over time grow to be more complex, consisting of a large number of tasks and dependencies between them. As the complexity increases, it becomes increasingly hard to operate and debug in case of failure, which creates a need for a single pane of glass to provide end-to-end data pipeline orchestration and health management. For data pipeline orchestration, the Apache Airflow UI is a user-friendly tool that provides detailed views into your data pipeline. When it comes to pipeline health management, each service that your tasks are interacting with could be storing or publishing logs to different locations, such as an S3 bucket or Amazon CloudWatch logs. As the number of integration touch points increases, stitching the distributed logs generated by different services in various locations can be challenging.

One solution provided by Amazon MWAA to consolidate the Airflow and task logs within the directed acyclic graph (DAG) is to forward the logs to CloudWatch log groups. A separate log group is created for each enabled Airflow logging option (For example, DAGProcessing, Scheduler, Task, WebServer, and Worker). These logs can be queried across log groups using CloudWatch Logs Insights.

A common approach in distributed tracing is to use a correlation ID to stitch and query distributed logs. A correlation ID is a unique identifier that is passed through a request flow for tracking a sequence of activities throughout the lifetime of the workflow. When each service in the workflow needs to log information, it can include this correlation ID, thereby ensuring you can track a full request from start to finish.

The Airflow engine passes a few variables by default that are accessible to all templates. run_id is one such variable, which is a unique identifier for a DAG run. The run_id can be used as the correlation ID to query against different log groups within CloudWatch to capture all the logs for a particular DAG run.

However, be aware that services that your tasks are interacting with will use a separate log group and won’t log the run_id as part of their output. This will prevent you from getting an end-to-end view across the DAG run.

For example, if your data pipeline consists of an AWS Glue task running a Spark job as part of the pipeline, then the Airflow task logs will be available in one CloudWatch log group and the AWS Glue job logs will be in a different CloudWatch log group. However, the Spark job that is run as part of the AWS Glue job doesn’t have access to the correlation ID and can’t be tied back to a particular DAG run. So even if you use the correlation ID to query the different CloudWatch log groups, you won’t get any information about the run of the Spark job.

Solution overview

As you now know, run_id is a variable that is a unique identifier for a DAG run. The run_id is present as part of the Airflow task logs. To use the run_id effectively and increase the observability across the DAG run, we use run_id as the correlation ID and pass it to different tasks with the DAG. The correlation ID is then be consumed by the scripts used within the tasks.

The following diagram illustrates the solution architecture.

Architecture Diagram

The data pipeline that we focus on consists of the following components:

  • An S3 bucket that contains the source data
  • An AWS Glue crawler that creates the table metadata in the Data Catalog from the source data
  • An AWS Glue job that transforms the raw data into a processed data format while performing file format conversions
  • An EMR job that generates reporting datasets

For details on the architecture and complete steps on how to run the DAG refer, to Amazon MWAA for Analytics Workshop.

In the next sections, we explore the following topics:

  • The DAG file, in order to understand how to define and then pass the correlation ID in the AWS Glue and EMR tasks
  • The code needed in the Python scripts to output information based on the correlation ID

Refer to the GitHub repo for the detailed DAG definition and Spark scripts. To run the scripts, refer to the Amazon MWAA analytics workshop.

DAG definitions

In this section, we look at snippets of the additions needed to the DAG file. We also discuss how to pass the correlation ID to the AWS Glue and EMR jobs. Refer to the GitHub repo for the complete DAG code.

The DAG file begins by defining the variables:

# Variables

correlation_id = “{{ run_id }}” 
dag_name = “data_pipeline” 
S3_BUCKET_NAME = “airflow_data_pipeline_bucket”

Next, let’s look at how to pass the correlation ID to the AWS Glue job using the AWS Glue operator. Operators are the building blocks of Airflow DAGs. They contain the logic of how data is processed in the data pipeline. Each task in a DAG is defined by instantiating an operator.

Airflow provides operators for different tasks. For this post, we use the AWS Glue operator.

The AWS Glue task definition contains the following:

  • The Python Spark job script (raw_to_tranform.py) to run the job
  • The DAG name, task ID, and correlation ID, which are passed as arguments
  • The AWS Glue service role assigned, which has permissions to run the crawler and the jobs

See the following code:

# Glue Task definition

glue_task = AwsGlueJobOperator(
    task_id=’glue_task’,
    job_name=’raw_to_transform’,
    iam_role_name=’AWSGlueServiceRoleDefault’,
    script_args={‘--dag_name’: dag_name,
                 ‘--task_id’: ‘glue_task’,
                 ‘--correlation_id’: correlation_id},
)

Next, we pass the correlation ID to the EMR job using the EMR operator. This includes the following steps:

  1. Define the configuration of an EMR cluster.
  2. Create the EMR cluster.
  3. Define the steps to be run by the EMR job.
  4. Run the EMR job:
    1. We use the Python Spark job script aggregations.py.
    2. We pass the DAG name, task ID, and correlation ID as arguments to the steps for the EMR task.

Let’s start with defining the configuration for the EMR cluster. The correlation_id is passed in the name of the cluster to easily identify the cluster corresponding to a DAG run. The logs generated by EMR jobs are published to a S3 bucket; the correlation_id is part of the LogUri as well. See the following code:

# Define the EMR cluster configuration

emr_task_id=’create_emr_cluster’
JOB_FLOW_OVERRIDES = {
    "Name": dag_name + "." + emr_task_id + "-" + correlation_id,
    "ReleaseLabel": "emr-5.29.0",
    "LogUri": "s3://{}/logs/emr/{}/{}/{}".format(S3_BUCKET_NAME, dag_name, emr_task_id, correlation_id),
    "Instances": {
      "InstanceGroups": [{
         "Name": "Master nodes",
         "Market": "ON_DEMAND",
         "InstanceRole": "MASTER",
         "InstanceType": "m5.xlarge",
         "InstanceCount": 1
       },{
         "Name": "Slave nodes",
         "Market": "ON_DEMAND",
         "InstanceRole": "CORE",
         "InstanceType": "m5.xlarge",
         "InstanceCount": 2
       }],
       "TerminationProtected": False,
       "KeepJobFlowAliveWhenNoSteps": True
}}

Now let’s define the task to create the EMR cluster based on the configuration:

# Create the EMR cluster

cluster_creator = EmrCreateJobFlowOperator(
    task_id= emr_task_id,
    job_flow_overrides=JOB_FLOW_OVERRIDES,
    aws_conn_id=’aws_default’,
    emr_conn_id=’emr_default’,
    dag=dag
)

Next, let’s define the steps needed to run as part of the EMR job. The input and output data processed by the EMR job is stored in an S3 bucket passed as arguments. Dag_name, task_id, and correlation_id are also passed in as arguments. The task_id used can be the name of your choice; here we use add_steps:

# EMR steps to be executed by EMR cluster

SPARK_TEST_STEPS = [{
    'Name': 'Run Spark',
    'ActionOnFailure': 'CANCEL_AND_WAIT',
    'HadoopJarStep': {
        'Jar': 'command-runner.jar',
        'Args': ['spark-submit',
        '/home/hadoop/aggregations.py',
            's3://{}/data/transformed/green'.format(S3_BUCKET_NAME),
            's3://{}/data/aggregated/green'.format(S3_BUCKET_NAME),
             dag_name,
             'add_steps',
             correlation_id]
}]

Next, let’s add a task to run the steps on the EMR cluster. The job_flow_id is the ID of the JobFlow, which is passed down from the EMR create task described earlier using Airflow XComs. See the following code:

#Run the EMR job

step_adder = EmrAddStepsOperator(
    task_id='add_steps',
    job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster', key='return_value') }}",      
    aws_conn_id='aws_default',
    steps=SPARK_TEST_STEPS,
)

This completes the steps needed to pass the correlation ID within the DAG task definition.

In the next section, we use this ID within the script run to log details.

Job script definitions

In this section, we review the changes required to log information based on the correlation_id. Let’s start with the AWS Glue job script (for the complete code, refer to the following file in GitHub):

# Script changes to file ‘raw_to_transform’

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME','dag_name','task_id','correlation_id'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
logger = glueContext.get_logger()
correlation_id = args['dag_name'] + "." + args['task_id'] + " " + args['correlation_id']
logger.info("Correlation ID from GLUE job: " + correlation_id)

Next, we focus on the EMR job script (for the complete code, refer to the file in GitHub):

# Script changes to file ‘nyc_aggregations’

from __future__ import print_function
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum

if __name__ == "__main__":
    if len(sys.argv) != 6:
        print("""
        Usage: nyc_aggregations.py <s3_input_path> <s3_output_path> <dag_name> <task_id> <correlation_id>
        """, file=sys.stderr)
        sys.exit(-1)
    input_path = sys.argv[1]
    output_path = sys.argv[2]
    dag_task_name = sys.argv[3] + "." + sys.argv[4]
    correlation_id = dag_task_name + " " + sys.argv[5]
    spark = SparkSession\
        .builder\
        .appName(correlation_id)\
        .getOrCreate()
    sc = spark.sparkContext
    log4jLogger = sc._jvm.org.apache.log4j
    logger = log4jLogger.LogManager.getLogger(dag_task_name)
    logger.info("Spark session started: " + correlation_id)

This completes the steps for passing the correlation ID to the script run.

After we complete the DAG definitions and script additions, we can run the DAG. Logs for a particular DAG run can be queried using the correlation ID. The correlation ID for a DAG run can be found via the Airflow UI. An example of a correlation ID is manual__2022-07-12T00:22:36.111190+00:00. With this unique string, we can run queries on the relevant CloudWatch log groups using CloudWatch Logs Insights. The result of the query includes the logging provided by the AWS Glue and EMR scripts, along with other logs associated with the correlation ID.

Example query for DAG level logs : manual__2022-07-12T00:22:36.111190+00:00

We can also obtain task-level logs by using the format <dag_name.task_id correlation_id>:

Example query : data_pipeline.glue_task manual__2022-07-12T00:22:36.111190+00:00

Clean up

If you created the setup to run and test the scripts using the Amazon MWAA analytics workshop, perform the cleanup steps to avoid incurring charges.

Conclusion

In this post, we showed how to send Amazon MWAA logs to CloudWatch log groups. We then discussed how to tie in logs from different tasks within a DAG using the unique correlation ID. The correlation ID can be outputted with as much or as little information needed by your job to provide more details across your entire DAG run. You can then use CloudWatch Logs Insights to query the logs.

With this solution, you can use Amazon MWAA as a single pane of glass for data pipeline orchestration and CloudWatch logs for data pipeline health management. The unique identifier improves the end-to-end observability for a DAG run and helps reduce the time needed for troubleshooting.

To learn more and get hands-on experience, start with the Amazon MWAA analytics workshop and then use the scripts in the GitHub repo to gain more observability of your DAG run.


About the Author

Payal Singh is a Partner Solutions Architect at Amazon Web Services, focused on the Serverless platform. She is responsible for helping partner and customers modernize and migrate their applications to AWS.