Tag Archives: Amazon Managed Workflows for Apache Airflow (Amazon MWAA)

Serverless ICYMI Q1 2023

Post Syndicated from Julian Wood original https://aws.amazon.com/blogs/compute/serverless-icymi-q1-2023/

Welcome to the 21st edition of the AWS Serverless ICYMI (in case you missed it) quarterly recap. Every quarter, we share all the most recent product launches, feature enhancements, blog posts, webinars, live streams, and other interesting things that you might have missed!

ICYMI2023Q1

In case you missed our last ICYMI, check out what happened last quarter here.

Artificial intelligence (AI) technologies, ChatGPT, and DALL-E are creating significant interest in the industry at the moment. Find out how to integrate serverless services with ChatGPT and DALL-E to generate unique bedtime stories for children.

Example notification of a story hosted with Next.js and App Runner

Example notification of a story hosted with Next.js and App Runner

Serverless Land is a website maintained by the Serverless Developer Advocate team to help you build serverless applications and includes workshops, code examples, blogs, and videos. There is now enhanced search functionality so you can search across resources, patterns, and video content.

SLand-search

ServerlessLand search

AWS Lambda

AWS Lambda has improved how concurrency works with Amazon SQS. You can now control the maximum number of concurrent Lambda functions invoked.

The launch blog post explains the scaling behavior of Lambda using this architectural pattern, challenges this feature helps address, and a demo of maximum concurrency in action.

Maximum concurrency is set to 10 for the SQS queue.

Maximum concurrency is set to 10 for the SQS queue.

AWS Lambda Powertools is an open-source library to help you discover and incorporate serverless best practices more easily. Lambda Powertools for .NET is now generally available and currently focused on three observability features: distributed tracing (Tracer), structured logging (Logger), and asynchronous business and application metrics (Metrics). Powertools is also available for Python, Java, and Typescript/Node.js programming languages.

To learn more:

Lambda announced a new feature, runtime management controls, which provide more visibility and control over when Lambda applies runtime updates to your functions. The runtime controls are optional capabilities for advanced customers that require more control over their runtime changes. You can now specify a runtime management configuration for each function with three settings, Automatic (default), Function update, or manual.

There are three new Amazon CloudWatch metrics for asynchronous Lambda function invocations: AsyncEventsReceived, AsyncEventAge, and AsyncEventsDropped. You can track the asynchronous invocation requests sent to Lambda functions to monitor any delays in processing and take corrective actions if required. The launch blog post explains the new metrics and how to use them to troubleshoot issues.

Lambda now supports Amazon DocumentDB change streams as an event source. You can use Lambda functions to process new documents, track updates to existing documents, or log deleted documents. You can use any programming language that is supported by Lambda to write your functions.

There is a helpful blog post suggesting best practices for developing portable Lambda functions that allow you to port your code to containers if you later choose to.

AWS Step Functions

AWS Step Functions has expanded its AWS SDK integrations with support for 35 additional AWS services including Amazon EMR Serverless, AWS Clean Rooms, AWS IoT FleetWise, AWS IoT RoboRunner and 31 other AWS services. In addition, Step Functions also added support for 1000+ new API actions from new and existing AWS services such as Amazon DynamoDB and Amazon Athena. For the full list of added services, visit AWS SDK service integrations.

Amazon EventBridge

Amazon EventBridge has launched the AWS Controllers for Kubernetes (ACK) for EventBridge and Pipes . This allows you to manage EventBridge resources, such as event buses, rules, and pipes, using the Kubernetes API and resource model (custom resource definitions).

EventBridge event buses now also support enhanced integration with Service Quotas. Your quota increase requests for limits such as PutEvents transactions-per-second, number of rules, and invocations per second among others will be processed within one business day or faster, enabling you to respond quickly to changes in usage.

AWS SAM

The AWS Serverless Application Model (SAM) Command Line Interface (CLI) has added the sam list command. You can now show resources defined in your application, including the endpoints, methods, and stack outputs required to test your deployed application.

AWS SAM has a preview of sam build support for building and packaging serverless applications developed in Rust. You can use cargo-lambda in the AWS SAM CLI build workflow and AWS SAM Accelerate to iterate on your code changes rapidly in the cloud.

You can now use AWS SAM connectors as a source resource parameter. Previously, you could only define AWS SAM connectors as a AWS::Serverless::Connector resource. Now you can add the resource attribute on a connector’s source resource, which makes templates more readable and easier to update over time.

AWS SAM connectors now also support multiple destinations to simplify your permissions. You can now use a single connector between a single source resource and multiple destination resources.

In October 2022, AWS released OpenID Connect (OIDC) support for AWS SAM Pipelines. This improves your security posture by creating integrations that use short-lived credentials from your CI/CD provider. There is a new blog post on how to implement it.

Find out how best to build serverless Java applications with the AWS SAM CLI.

AWS App Runner

AWS App Runner now supports retrieving secrets and configuration data stored in AWS Secrets Manager and AWS Systems Manager (SSM) Parameter Store in an App Runner service as runtime environment variables.

AppRunner also now supports incoming requests based on HTTP 1.0 protocol, and has added service level concurrency, CPU and Memory utilization metrics.

Amazon S3

Amazon S3 now automatically applies default encryption to all new objects added to S3, at no additional cost and with no impact on performance.

You can now use an S3 Object Lambda Access Point alias as an origin for your Amazon CloudFront distribution to tailor or customize data to end users. For example, you can resize an image depending on the device that an end user is visiting from.

S3 has introduced Mountpoint for S3, a high performance open source file client that translates local file system API calls to S3 object API calls like GET and LIST.

S3 Multi-Region Access Points now support datasets that are replicated across multiple AWS accounts. They provide a single global endpoint for your multi-region applications, and dynamically route S3 requests based on policies that you define. This helps you to more easily implement multi-Region resilience, latency-based routing, and active-passive failover, even when data is stored in multiple accounts.

Amazon Kinesis

Amazon Kinesis Data Firehose now supports streaming data delivery to Elastic. This is an easier way to ingest streaming data to Elastic and consume the Elastic Stack (ELK Stack) solutions for enterprise search, observability, and security without having to manage applications or write code.

Amazon DynamoDB

Amazon DynamoDB now supports table deletion protection to protect your tables from accidental deletion when performing regular table management operations. You can set the deletion protection property for each table, which is set to disabled by default.

Amazon SNS

Amazon SNS now supports AWS X-Ray active tracing to visualize, analyze, and debug application performance. You can now view traces that flow through Amazon SNS topics to destination services, such as Amazon Simple Queue Service, Lambda, and Kinesis Data Firehose, in addition to traversing the application topology in Amazon CloudWatch ServiceLens.

SNS also now supports setting content-type request headers for HTTPS notifications so applications can receive their notifications in a more predictable format. Topic subscribers can create a DeliveryPolicy that specifies the content-type value that SNS assigns to their HTTPS notifications, such as application/json, application/xml, or text/plain.

EDA Visuals collection added to Serverless Land

The Serverless Developer Advocate team has extended Serverless Land and introduced EDA visuals. These are small bite sized visuals to help you understand concept and patterns about event-driven architectures. Find out about batch processing vs. event streaming, commands vs. events, message queues vs. event brokers, and point-to-point messaging. Discover bounded contexts, migrations, idempotency, claims, enrichment and more!

EDA-visuals

EDA Visuals

To learn more:

Serverless Repos Collection on Serverless Land

There is also a new section on Serverless Land containing helpful code repositories. You can search for code repos to use for examples, learning or building serverless applications. You can also filter by use-case, runtime, and level.

Serverless Repos Collection

Serverless Repos Collection

Serverless Blog Posts

January

Jan 12 – Introducing maximum concurrency of AWS Lambda functions when using Amazon SQS as an event source

Jan 20 – Processing geospatial IoT data with AWS IoT Core and the Amazon Location Service

Jan 23 – AWS Lambda: Resilience under-the-hood

Jan 24 – Introducing AWS Lambda runtime management controls

Jan 24 – Best practices for working with the Apache Velocity Template Language in Amazon API Gateway

February

Feb 6 – Previewing environments using containerized AWS Lambda functions

Feb 7 – Building ad-hoc consumers for event-driven architectures

Feb 9 – Implementing architectural patterns with Amazon EventBridge Pipes

Feb 9 – Securing CI/CD pipelines with AWS SAM Pipelines and OIDC

Feb 9 – Introducing new asynchronous invocation metrics for AWS Lambda

Feb 14 – Migrating to token-based authentication for iOS applications with Amazon SNS

Feb 15 – Implementing reactive progress tracking for AWS Step Functions

Feb 23 – Developing portable AWS Lambda functions

Feb 23 – Uploading large objects to Amazon S3 using multipart upload and transfer acceleration

Feb 28 – Introducing AWS Lambda Powertools for .NET

March

Mar 9 – Server-side rendering micro-frontends – UI composer and service discovery

Mar 9 – Building serverless Java applications with the AWS SAM CLI

Mar 10 – Managing sessions of anonymous users in WebSocket API-based applications

Mar 14 –
Implementing an event-driven serverless story generation application with ChatGPT and DALL-E

Videos

Serverless Office Hours – Tues 10AM PT

Weekly office hours live stream. In each session we talk about a specific topic or technology related to serverless and open it up to helping you with your real serverless challenges and issues. Ask us anything you want about serverless technologies and applications.

January

Jan 10 – Building .NET 7 high performance Lambda functions

Jan 17 – Amazon Managed Workflows for Apache Airflow at Scale

Jan 24 – Using Terraform with AWS SAM

Jan 31 – Preparing your serverless architectures for the big day

February

Feb 07- Visually design and build serverless applications

Feb 14 – Multi-tenant serverless SaaS

Feb 21 – Refactoring to Serverless

Feb 28 – EDA visually explained

March

Mar 07 – Lambda cookbook with Python

Mar 14 – Succeeding with serverless

Mar 21 – Lambda Powertools .NET

Mar 28 – Server-side rendering micro-frontends

FooBar Serverless YouTube channel

Marcia Villalba frequently publishes new videos on her popular serverless YouTube channel. You can view all of Marcia’s videos at https://www.youtube.com/c/FooBar_codes.

January

Jan 12 – Serverless Badge – A new certification to validate your Serverless Knowledge

Jan 19 – Step functions Distributed map – Run 10k parallel serverless executions!

Jan 26 – Step Functions Intrinsic Functions – Do simple data processing directly from the state machines!

February

Feb 02 – Unlock the Power of EventBridge Pipes: Integrate Across Platforms with Ease!

Feb 09 – Amazon EventBridge Pipes: Enrichment and filter of events Demo with AWS SAM

Feb 16 – AWS App Runner – Deploy your apps from GitHub to Cloud in Record Time

Feb 23 – AWS App Runner – Demo hosting a Node.js app in the cloud directly from GitHub (AWS CDK)

March

Mar 02 – What is Amazon DynamoDB? What are the most important concepts? What are the indexes?

Mar 09 – Choreography vs Orchestration: Which is Best for Your Distributed Application?

Mar 16 – DynamoDB Single Table Design: Simplify Your Code and Boost Performance with Table Design Strategies

Mar 23 – 8 Reasons You Should Choose DynamoDB for Your Next Project and How to Get Started

Sessions with SAM & Friends

SAMFiends

AWS SAM & Friends

Eric Johnson is exploring how developers are building serverless applications. We spend time talking about AWS SAM as well as others like AWS CDK, Terraform, Wing, and AMPT.

Feb 16 – What’s new with AWS SAM

Feb 23 – AWS SAM with AWS CDK

Mar 02 – AWS SAM and Terraform

Mar 10 – Live from ServerlessDays ANZ

Mar 16 – All about AMPT

Mar 23 – All about Wing

Mar 30 – SAM Accelerate deep dive

Still looking for more?

The Serverless landing page has more information. The Lambda resources page contains case studies, webinars, whitepapers, customer stories, reference architectures, and even more Getting Started tutorials.

You can also follow the Serverless Developer Advocacy team on Twitter to see the latest news, follow conversations, and interact with the team.

Improve observability across Amazon MWAA tasks

Post Syndicated from Payal Singh original https://aws.amazon.com/blogs/big-data/improve-observability-across-amazon-mwaa-tasks/

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow that makes it simple to set up and operate end-to-end data pipelines in the cloud at scale. A data pipeline is a set of tasks and processes used to automate the movement and transformation of data between different systems.­ The Apache Airflow open-source community provides over 1,000 pre-built operators (plugins that simplify connections to services) for Apache Airflow to build data pipelines. The Amazon provider package for Apache Airflow comes with integrations for over 31 AWS services, such as Amazon Simple Storage Service (Amazon S3), Amazon Redshift, Amazon EMR, AWS Glue, Amazon SageMaker, and more.

The most common use case for Airflow is ETL (extract, transform, and load). Nearly all Airflow users implement ETL pipelines ranging from simple to complex. Operationalizing machine learning (ML) is another growing use case, where data has to be transformed and normalized before it can be loaded into an ML model. In both use cases, the data pipeline is preparing the data for consumption by ingesting data from different sources and transforming it through a series of steps.

Observability across the different processes within the data pipeline is a key component to monitor the success or failure of the pipeline. Although scheduling the runs of tasks within the data pipeline is controlled by Airflow, the run of the task itself (transforming, normalizing, and aggregating data) is done by different services based on the use case. Having an end-to-end view of the data flow is a challenge due to multiple touch points in the data pipeline.

In this post, we provide an overview of logging enhancements when working with Amazon MWAA, which is one of the pillars of observability. We then discuss a solution to further enhance end-to-end observability by modifying the task definitions that make up the data pipeline. For this post, we focus on task definitions for two services: AWS Glue and Amazon EMR­, however the same method can be applied across different services.

Challenge

Many customers’ data pipelines start simple, orchestrating a few tasks, and over time grow to be more complex, consisting of a large number of tasks and dependencies between them. As the complexity increases, it becomes increasingly hard to operate and debug in case of failure, which creates a need for a single pane of glass to provide end-to-end data pipeline orchestration and health management. For data pipeline orchestration, the Apache Airflow UI is a user-friendly tool that provides detailed views into your data pipeline. When it comes to pipeline health management, each service that your tasks are interacting with could be storing or publishing logs to different locations, such as an S3 bucket or Amazon CloudWatch logs. As the number of integration touch points increases, stitching the distributed logs generated by different services in various locations can be challenging.

One solution provided by Amazon MWAA to consolidate the Airflow and task logs within the directed acyclic graph (DAG) is to forward the logs to CloudWatch log groups. A separate log group is created for each enabled Airflow logging option (For example, DAGProcessing, Scheduler, Task, WebServer, and Worker). These logs can be queried across log groups using CloudWatch Logs Insights.

A common approach in distributed tracing is to use a correlation ID to stitch and query distributed logs. A correlation ID is a unique identifier that is passed through a request flow for tracking a sequence of activities throughout the lifetime of the workflow. When each service in the workflow needs to log information, it can include this correlation ID, thereby ensuring you can track a full request from start to finish.

The Airflow engine passes a few variables by default that are accessible to all templates. run_id is one such variable, which is a unique identifier for a DAG run. The run_id can be used as the correlation ID to query against different log groups within CloudWatch to capture all the logs for a particular DAG run.

However, be aware that services that your tasks are interacting with will use a separate log group and won’t log the run_id as part of their output. This will prevent you from getting an end-to-end view across the DAG run.

For example, if your data pipeline consists of an AWS Glue task running a Spark job as part of the pipeline, then the Airflow task logs will be available in one CloudWatch log group and the AWS Glue job logs will be in a different CloudWatch log group. However, the Spark job that is run as part of the AWS Glue job doesn’t have access to the correlation ID and can’t be tied back to a particular DAG run. So even if you use the correlation ID to query the different CloudWatch log groups, you won’t get any information about the run of the Spark job.

Solution overview

As you now know, run_id is a variable that is a unique identifier for a DAG run. The run_id is present as part of the Airflow task logs. To use the run_id effectively and increase the observability across the DAG run, we use run_id as the correlation ID and pass it to different tasks with the DAG. The correlation ID is then be consumed by the scripts used within the tasks.

The following diagram illustrates the solution architecture.

Architecture Diagram

The data pipeline that we focus on consists of the following components:

  • An S3 bucket that contains the source data
  • An AWS Glue crawler that creates the table metadata in the Data Catalog from the source data
  • An AWS Glue job that transforms the raw data into a processed data format while performing file format conversions
  • An EMR job that generates reporting datasets

For details on the architecture and complete steps on how to run the DAG refer, to Amazon MWAA for Analytics Workshop.

In the next sections, we explore the following topics:

  • The DAG file, in order to understand how to define and then pass the correlation ID in the AWS Glue and EMR tasks
  • The code needed in the Python scripts to output information based on the correlation ID

Refer to the GitHub repo for the detailed DAG definition and Spark scripts. To run the scripts, refer to the Amazon MWAA analytics workshop.

DAG definitions

In this section, we look at snippets of the additions needed to the DAG file. We also discuss how to pass the correlation ID to the AWS Glue and EMR jobs. Refer to the GitHub repo for the complete DAG code.

The DAG file begins by defining the variables:

# Variables

correlation_id = “{{ run_id }}” 
dag_name = “data_pipeline” 
S3_BUCKET_NAME = “airflow_data_pipeline_bucket”

Next, let’s look at how to pass the correlation ID to the AWS Glue job using the AWS Glue operator. Operators are the building blocks of Airflow DAGs. They contain the logic of how data is processed in the data pipeline. Each task in a DAG is defined by instantiating an operator.

Airflow provides operators for different tasks. For this post, we use the AWS Glue operator.

The AWS Glue task definition contains the following:

  • The Python Spark job script (raw_to_tranform.py) to run the job
  • The DAG name, task ID, and correlation ID, which are passed as arguments
  • The AWS Glue service role assigned, which has permissions to run the crawler and the jobs

See the following code:

# Glue Task definition

glue_task = AwsGlueJobOperator(
    task_id=’glue_task’,
    job_name=’raw_to_transform’,
    iam_role_name=’AWSGlueServiceRoleDefault’,
    script_args={‘--dag_name’: dag_name,
                 ‘--task_id’: ‘glue_task’,
                 ‘--correlation_id’: correlation_id},
)

Next, we pass the correlation ID to the EMR job using the EMR operator. This includes the following steps:

  1. Define the configuration of an EMR cluster.
  2. Create the EMR cluster.
  3. Define the steps to be run by the EMR job.
  4. Run the EMR job:
    1. We use the Python Spark job script aggregations.py.
    2. We pass the DAG name, task ID, and correlation ID as arguments to the steps for the EMR task.

Let’s start with defining the configuration for the EMR cluster. The correlation_id is passed in the name of the cluster to easily identify the cluster corresponding to a DAG run. The logs generated by EMR jobs are published to a S3 bucket; the correlation_id is part of the LogUri as well. See the following code:

# Define the EMR cluster configuration

emr_task_id=’create_emr_cluster’
JOB_FLOW_OVERRIDES = {
    "Name": dag_name + "." + emr_task_id + "-" + correlation_id,
    "ReleaseLabel": "emr-5.29.0",
    "LogUri": "s3://{}/logs/emr/{}/{}/{}".format(S3_BUCKET_NAME, dag_name, emr_task_id, correlation_id),
    "Instances": {
      "InstanceGroups": [{
         "Name": "Master nodes",
         "Market": "ON_DEMAND",
         "InstanceRole": "MASTER",
         "InstanceType": "m5.xlarge",
         "InstanceCount": 1
       },{
         "Name": "Slave nodes",
         "Market": "ON_DEMAND",
         "InstanceRole": "CORE",
         "InstanceType": "m5.xlarge",
         "InstanceCount": 2
       }],
       "TerminationProtected": False,
       "KeepJobFlowAliveWhenNoSteps": True
}}

Now let’s define the task to create the EMR cluster based on the configuration:

# Create the EMR cluster

cluster_creator = EmrCreateJobFlowOperator(
    task_id= emr_task_id,
    job_flow_overrides=JOB_FLOW_OVERRIDES,
    aws_conn_id=’aws_default’,
    emr_conn_id=’emr_default’,
    dag=dag
)

Next, let’s define the steps needed to run as part of the EMR job. The input and output data processed by the EMR job is stored in an S3 bucket passed as arguments. Dag_name, task_id, and correlation_id are also passed in as arguments. The task_id used can be the name of your choice; here we use add_steps:

# EMR steps to be executed by EMR cluster

SPARK_TEST_STEPS = [{
    'Name': 'Run Spark',
    'ActionOnFailure': 'CANCEL_AND_WAIT',
    'HadoopJarStep': {
        'Jar': 'command-runner.jar',
        'Args': ['spark-submit',
        '/home/hadoop/aggregations.py',
            's3://{}/data/transformed/green'.format(S3_BUCKET_NAME),
            's3://{}/data/aggregated/green'.format(S3_BUCKET_NAME),
             dag_name,
             'add_steps',
             correlation_id]
}]

Next, let’s add a task to run the steps on the EMR cluster. The job_flow_id is the ID of the JobFlow, which is passed down from the EMR create task described earlier using Airflow XComs. See the following code:

#Run the EMR job

step_adder = EmrAddStepsOperator(
    task_id='add_steps',
    job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster', key='return_value') }}",      
    aws_conn_id='aws_default',
    steps=SPARK_TEST_STEPS,
)

This completes the steps needed to pass the correlation ID within the DAG task definition.

In the next section, we use this ID within the script run to log details.

Job script definitions

In this section, we review the changes required to log information based on the correlation_id. Let’s start with the AWS Glue job script (for the complete code, refer to the following file in GitHub):

# Script changes to file ‘raw_to_transform’

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME','dag_name','task_id','correlation_id'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
logger = glueContext.get_logger()
correlation_id = args['dag_name'] + "." + args['task_id'] + " " + args['correlation_id']
logger.info("Correlation ID from GLUE job: " + correlation_id)

Next, we focus on the EMR job script (for the complete code, refer to the file in GitHub):

# Script changes to file ‘nyc_aggregations’

from __future__ import print_function
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum

if __name__ == "__main__":
    if len(sys.argv) != 6:
        print("""
        Usage: nyc_aggregations.py <s3_input_path> <s3_output_path> <dag_name> <task_id> <correlation_id>
        """, file=sys.stderr)
        sys.exit(-1)
    input_path = sys.argv[1]
    output_path = sys.argv[2]
    dag_task_name = sys.argv[3] + "." + sys.argv[4]
    correlation_id = dag_task_name + " " + sys.argv[5]
    spark = SparkSession\
        .builder\
        .appName(correlation_id)\
        .getOrCreate()
    sc = spark.sparkContext
    log4jLogger = sc._jvm.org.apache.log4j
    logger = log4jLogger.LogManager.getLogger(dag_task_name)
    logger.info("Spark session started: " + correlation_id)

This completes the steps for passing the correlation ID to the script run.

After we complete the DAG definitions and script additions, we can run the DAG. Logs for a particular DAG run can be queried using the correlation ID. The correlation ID for a DAG run can be found via the Airflow UI. An example of a correlation ID is manual__2022-07-12T00:22:36.111190+00:00. With this unique string, we can run queries on the relevant CloudWatch log groups using CloudWatch Logs Insights. The result of the query includes the logging provided by the AWS Glue and EMR scripts, along with other logs associated with the correlation ID.

Example query for DAG level logs : manual__2022-07-12T00:22:36.111190+00:00

We can also obtain task-level logs by using the format <dag_name.task_id correlation_id>:

Example query : data_pipeline.glue_task manual__2022-07-12T00:22:36.111190+00:00

Clean up

If you created the setup to run and test the scripts using the Amazon MWAA analytics workshop, perform the cleanup steps to avoid incurring charges.

Conclusion

In this post, we showed how to send Amazon MWAA logs to CloudWatch log groups. We then discussed how to tie in logs from different tasks within a DAG using the unique correlation ID. The correlation ID can be outputted with as much or as little information needed by your job to provide more details across your entire DAG run. You can then use CloudWatch Logs Insights to query the logs.

With this solution, you can use Amazon MWAA as a single pane of glass for data pipeline orchestration and CloudWatch logs for data pipeline health management. The unique identifier improves the end-to-end observability for a DAG run and helps reduce the time needed for troubleshooting.

To learn more and get hands-on experience, start with the Amazon MWAA analytics workshop and then use the scripts in the GitHub repo to gain more observability of your DAG run.


About the Author

Payal Singh is a Partner Solutions Architect at Amazon Web Services, focused on the Serverless platform. She is responsible for helping partner and customers modernize and migrate their applications to AWS.

Automate data lineage on Amazon MWAA with OpenLineage

Post Syndicated from Stephen Said original https://aws.amazon.com/blogs/big-data/automate-data-lineage-on-amazon-mwaa-with-openlineage/

In modern data architectures, datasets are combined across an organization using a variety of purpose-built services to unlock insights. As a result, data governance becomes a key component for data consumers and producers to know that their data-driven decisions are based on trusted and accurate datasets. One aspect of data governance is data lineage, which captures the flow of data as it goes through various systems and allows consumers to understand how a dataset was derived.

In order to capture data lineage consistently across various analytical services, you need to use a common lineage model and a robust job orchestration that is able to tie together diverse data flows. One possible solution is the open-source OpenLineage project. It provides a technology-agnostic metadata model for capturing data lineage and integrates with widely used tools. For job orchestration, it integrates with Apache Airflow, which you can run on AWS conveniently through the managed service Amazon Managed Workflows for Apache Airflow (Amazon MWAA). OpenLineage provides a plugin for Apache Airflow that extracts data lineage from Directed Acyclic Graphs (DAGs).

In this post, we show how to get started with data lineage on AWS using OpenLineage. We provide a step-by-step configuration guide for the openlineage-airflow plugin on Amazon MWAA. Additionally, we share an AWS Cloud Development Kit (AWS CDK) project that deploys a pre-configured demo environment for evaluating and experiencing OpenLineage first-hand.

OpenLineage on Apache Airflow

In the following example, Airflow turns OLTP data into a star schema on Amazon Redshift Serverless.

After staging and preparing source data from Amazon Simple Storage Service (Amazon S3), fact and dimension tables are eventually created. For this, Airflow orchestrates the execution of SQL statements that create and populate tables on Redshift Serverless.

Overview on DAGs in Amazon MWAA

The openlineage-airflow plugin collects metadata about creation of datasets and dependencies between them. This allows us to move from a jobs-centric approach of Airflow to a datasets-centric approach, improving the observability of workflows.

The following screenshot shows parts of the captured lineage for the previous example. It’s displayed in Marquez, an open-source metadata service for collection and visualization of data lineage with support for the OpenLineage standard. In Marquez, you can analyze the upstream datasets and transformations that eventually create the user dimension table on the right.

Data lineage graph in marquez

The example in this post is based on SQL and Amazon Redshift. OpenLineage also supports other transformation engines and data stores such as Apache Spark and dbt.

Solution overview

The following diagram shows the AWS setup required to capture data lineage using OpenLineage.

Solution overview

The workflow includes the following components:

  1. The openlineage-airflow plugin is configured on Airflow as a lineage backend. Metadata about the DAG runs is passed by Airflow core to the plugin, which converts it into OpenLineage format and sends it to an external metadata store. In our demo setup, we use Marquez as the metadata store.
  2. The openlineage-airflow plugin receives its configuration from environment variables. To populate these variables on Amazon MWAA, a custom Airflow plugin is used. First, the plugin reads source values from AWS Secrets Manager. Then, it creates environment variables.
  3. Secrets Manager is configured as a secrets backend. Typically, this type of configuration is stored in Airflow’s native metadata database. However, this approach has limitations. For instance, in case of multiple Airflow environments, you need to track and store credentials across multiple environments, and updating credentials requires you to update all the environments. With a secrets backend, you can centralize configuration.
  4. For demo purposes, we collect data lineage from a data pipeline, which creates a star schema in Redshift Serverless.

In the following sections, we walk you through the steps for end-to-end configuration.

Install the openlineage-airflow plugin

Specify the following dependency in the requirements.txt file of the Amazon MWAA environment. Note that the latest Airflow version currently available on Amazon MWAA is 2.4.3; for this post, use the compatible version 0.19.2 of the plugin:

openlineage-airflow==0.19.2

For more details on installing Python dependencies on Amazon MWAA, refer to Installing Python dependencies.

For Airflow < 2.3, configure the plugin’s lineage backend through the following configuration overrides on the Amazon MWAA environment and load it immediately at Airflow start by disabling lazy load of plugins:

AirflowConfigurationOptions:
    core.lazy_load_plugins: False
    lineage.backend: openlineage.lineage_backend.OpenLineageBackend

For more information on configuration overrides, refer to Configuration options overview.

Configure the Secrets Manager backend with Amazon MWAA

Using Secrets Manager as a secrets backend for Amazon MWAA is straightforward. First, provide the execution role of Amazon MWAA with read permission to Secrets Manager. You can use the following policy template as a starting point:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "secretsmanager:GetResourcePolicy",
        "secretsmanager:GetSecretValue",
        "secretsmanager:DescribeSecret",
        "secretsmanager:ListSecretVersionIds"
      ],
      "Resource": "arn:aws:secretsmanager:AWS_REGION:<YOUR_ACCOUNT_ID>:secret:"
    },
    {
      "Effect": "Allow",
      "Action": "secretsmanager:ListSecrets",
      "Resource": ""
    }
  ]
}

Second, configure Secrets Manager as a backend in Amazon MWAA through the following configuration overrides:

AirflowConfigurationOptions:
secrets.backend: airflow.contrib.secrets.aws_secrets_manager.SecretsManagerBackend
secrets.backend_kwargs: '{"connections_prefix" : "airflow/connections", "variables_prefix" : "airflow/variables"}'

For more information configuring a secrets backend in Amazon MWAA, refer to Configuring an Apache Airflow connection using a Secrets Manager secret and Move your Apache Airflow connections and variables to AWS Secrets Manager.

Deploy a custom envvar plugin to Amazon MWAA

Apache Airflow has a built-in plugin manager through which it can be extended with custom functionality. In our case, this functionality is to populate OpenLineage-specific environment variables based on values in Secrets Manager. Natively, Amazon MWAA allows environment variables with the prefix AIRFLOW__, but the openlineage-airflow plugin expects the prefix OPENLINEAGE__.

The following Python code is used in the plugin. We assume the file is called envvar_plugin.py:

from airflow.plugins_manager import AirflowPlugin
from airflow.models import Variable
import os

os.environ["OPENLINEAGE_URL"] = Variable.get('OPENLINEAGE_URL', default_var='')

class EnvVarPlugin(AirflowPlugin):
  name = "env_var_plugin"

Amazon MWAA has a mechanism to install a plugin through a zip archive. You zip your code, upload the archive to an S3 bucket, and pass the URL to the file to Amazon MWAA:

zip plugins.zip envvar_plugin.py

Upload plugins.zip to an S3 bucket and configure the URL in Amazon MWAA. The following screenshot shows the configuration via the Amazon MWAA console.

Configuration of a custom plugin in Amazon MWAA

For more information on installing custom plugins on Amazon MWAA, refer to Creating a custom plugin that generates runtime environment variables.

Configure connectivity between the openlineage-airflow plugin and Marquez

As a last step, store the URL to Marquez in Secrets Manager. For this, create a secret called airflow/variables/OPENLINEAGE_URL with value <protocol>://<hostname/ip>:<port> (for example, https://marquez.mysite.com:5000).

Configuration of OPENLINEAGE_URL as secret

In case you need to spin up Marquez on AWS, you have multiple options to host, including running it on Amazon Elastic Kubernetes Service (Amazon EKS) or Amazon Elastic Compute Cloud (Amazon EC2). Refer to Running Marquez on AWS or check out our infrastructure template in the next section to deploy Marquez on AWS.

Deploy with an AWS CDK-based solution template

Assuming you want to set up a demo infrastructure for all of the above in one step, you can use the following template based on the AWS CDK.

The template has the following prerequisites:

Complete the following steps to deploy the template:

  1. Clone GitHub repository and install Python dependencies. Bootstrap the AWS CDK if required.
    git clone https://github.com/aws-samples/aws-mwaa-openlineage 
    	cd aws-mwaa-openlineage
    	python3 -m venv .env && source .env/bin/activate
    	python3 -m pip install -r requirements.txt
    	cdk bootstrap

  2. Update the value for the variable EXTERNAL_IP in constants.py to your outbound IP for connecting to the internet:
    # Set variable to outbound IP for connecting to the internet.
    EXTERNAL_IP = "255.255.255.255"

    This configures security groups so that you can access Marquez but block other clients. constants.py is found in the root folder of the cloned repository.

  3. Deploy the VPC_S3 stack to provision a new VPC dedicated for this solution as well as the security groups that are used by the different components:
    cdk deploy vpc-s3

    It creates a new S3 bucket and uploads the source raw data based on the TICKIT sample database. This serves as the landing area from the OLTP database. We then need to parse the metadata of these files through an AWS Glue crawler, which facilitates the native integration between Amazon Redshift and the S3 data lake.

  4. Deploy the lineage stack to create an EC2 instance that hosts Marquez:
    cdk deploy marquez

    Access the Marquez web UI through https://{ec2.public_dns_name}:3000/. This URL is also available as part of the AWS CDK outputs for the lineage stack.

  5. Deploy the Amazon Redshift stack to create a Redshift Serverless endpoint:
    cdk deploy redshift

  6. Deploy the Amazon MWAA stack to create an Amazon MWAA environment:
    cdk deploy mwaa

    You can access the Amazon MWAA UI through the URL provided in the AWS CDK output.

Test a sample data pipeline

On Amazon MWAA, you can see an example data pipeline deployed that consists of two DAGs. It builds a star schema on top of the TICKIT sample database. One DAG is responsible for loading data from the S3 data lake into an Amazon Redshift staging layer; the second DAG loads data from the staging layer to the dimensional model.

Datamodel of star schema

Open the Amazon MWAA UI through the URL obtained in the deployment steps and launch the following DAGs: rs_source_to_staging and rs_staging_to_dm. As part of the run, the lineage metadata is sent to Marquez.

After the DAG has been run, open the Marquez URL obtained in the deployment steps. In Marquez, you can find the lineage metadata for the computed star schema and related data assets on Amazon Redshift.

Clean up

Delete the AWS CDK stacks to avoid ongoing charges for the resources that you created. Run the following command in the aws-mwaa-openlineage project directory so that all resources are undeployed:

cdk destroy --all

Summary

In this post, we showed you how to automate data lineage with OpenLineage on Amazon MWAA. As part of this, we covered how to install and configure the openlineage-airflow plugin on Amazon MWAA. Additionally, we provided a ready-to-use infrastructure template for a complete demo environment.

We encourage you to explore what else can be achieved with OpenLineage. A job orchestrator like Apache Airflow is only one piece of a data platform and not all possible data lineage can be captured on it. We recommend exploring OpenLineage’s integration with other platforms like Apache Spark or dbt. For more information, refer to Integrations.

Additionally, we recommend you visit the AWS Big Data Blog for other useful blog posts on Amazon MWAA and data governance on AWS.


About the Authors

Stephen Said is a Senior Solutions Architect and works with Digital Native Businesses. His areas of interest are data analytics, data platforms and cloud-native software engineering.

Vishwanatha Nayak is a Senior Solutions Architect at AWS. He works with large enterprise customers helping them design and build secure, cost-effective, and reliable modern data platforms using the AWS cloud. He is passionate about technology and likes sharing knowledge through blog posts and twitch sessions.

Paul Villena is an Analytics Solutions Architect with expertise in building modern data and analytics solutions to drive business value. He works with customers to help them harness the power of the cloud. His areas of interests are infrastructure-as-code, serverless technologies and coding in Python.

Introducing container, database, and queue utilization metrics for the Amazon MWAA environment

Post Syndicated from David Boyne original https://aws.amazon.com/blogs/compute/introducing-container-database-and-queue-utilization-metrics-for-the-amazon-mwaa-environment/

This post is written by Uma Ramadoss (Senior Specialist Solutions Architect), and Jeetendra Vaidya (Senior Solutions Architect).

Today, AWS is announcing the availability of container, database, and queue utilization metrics for Amazon Managed Workflows for Apache Airflow (Amazon MWAA). This is a new collection of metrics published by Amazon MWAA in addition to existing Apache Airflow metrics in Amazon CloudWatch. With these new metrics, you can better understand the performance of your Amazon MWAA environment, troubleshoot issues related to capacity, delays, and get insights on right-sizing your Amazon MWAA environment.

Previously, customers were limited to Apache Airflow metrics such as DAG processing parse times, pool running slots, and scheduler heartbeat to measure the performance of the Amazon MWAA environment. While these metrics are often effective in diagnosing Airflow behavior, they lack the ability to provide complete visibility into the utilization of the various Apache Airflow components in the Amazon MWAA environment. This could limit the ability for some customers to monitor the performance and health of the environment effectively.

Overview

Amazon MWAA is a managed service for Apache Airflow. There are a variety of deployment techniques with Apache Airflow. The Amazon MWAA deployment architecture of Apache Airflow is carefully chosen to allow customers to run workflows in production at scale.

Amazon MWAA has distributed architecture with multiple schedulers, auto-scaled workers, and load balanced web server. They are deployed in their own Amazon Elastic Container Service (ECS) cluster using AWS Fargate compute engine. Amazon Simple Queue Service (SQS) queue is used to decouple Airflow workers and schedulers as part of Celery Executor architecture. Amazon Aurora PostgreSQL-Compatible Edition is used as the Apache Airflow metadata database. From today, you can get complete visibility into the scheduler, worker, web server, database, and queue metrics.

In this post, you can learn about the new metrics published for Amazon MWAA environment, build a sample application with a pre-built workflow, and explore the metrics using CloudWatch dashboard.

Container, database, and queue utilization metrics

  1. In the CloudWatch console, in Metrics, select All metrics.
  2. From the metrics console that appears on the right, expand AWS namespaces and select MWAA tile.
  3. MWAA metrics

    MWAA metrics

  4. You can see a tile of dimensions, each corresponding to the container (cluster), database, and queue metrics.
  5. MWAA metrics drilldown

    MWAA metrics drilldown

Cluster metrics

The base MWAA environment comes up with three Amazon ECS clusters – scheduler, one worker (BaseWorker), and a web server. Workers can be configured with minimum and maximum numbers. When you configure more than one minimum worker, Amazon MWAA creates another ECS cluster (AdditionalWorker) to host the workers from 2 up to n where n is the max workers configured in your environment.

When you select Cluster from the console, you can see the list of metrics for all the clusters. To learn more about the metrics, visit the Amazon ECS product documentation.

MWAA metrics list

MWAA metrics list

CPU usage is the most important factor for schedulers due to DAG file processing. When you have many DAGs, CPU usage can be higher. You can improve the performance by setting min_file_process_interval higher. Similarly, you can apply other techniques described in the Apache Airflow Scheduler page to fine tune the performance.

Higher CPU or memory utilization in the worker can be due to moving large files or doing computation on the worker itself. This can be resolved by offloading the compute to purpose-built services such as Amazon ECS, Amazon EMR, and AWS Glue.

Database metrics

Amazon Aurora DB clusters used by Amazon MWAA come up with a primary DB instance and a read replica to support the read operations. Amazon MWAA publishes database metrics for both READER and WRITER instances. When you select Database tile, you can view the list of metrics available for the database cluster.

Database metrics

Database metrics

Amazon MWAA uses connection pooling technique so the database connections from scheduler, workers, and web servers are taken from the connection pool. If you have many DAGs scheduled to start at the same time, it can overload the scheduler and increase the number of database connections at a high frequency. This can be minimized by staggering the DAG schedule.

SQS metrics

An SQS queue helps decouple scheduler and worker so they can independently scale. When workers read the messages, they are considered in-flight and not available for other workers. Messages become available for other workers to read if they are not deleted before the 12 hours visibility timeout. Amazon MWAA publishes in-flight message count (RunningTasks), messages available for reading count (QueuedTasks) and the approximate age of the earliest non-deleted message (ApproximateAgeOfOldestTask).

Database metrics

Database metrics

Getting started with container, database and queue utilization metrics for Amazon MWAA

The following sample project explores some key metrics using an Amazon CloudWatch dashboard to help you find the number of workers running in your environment at any given moment.

The sample project deploys the following resources:

  • Amazon Virtual Private Cloud (Amazon VPC).
  • Amazon MWAA environment of size small with 2 minimum workers and 10 maximum workers.
  • A sample DAG that fetches NOAA Global Historical Climatology Network Daily (GHCN-D) data, uses AWS Glue Crawler to create tables and AWS Glue Job to produce an output dataset in Apache Parquet format that contains the details of precipitation readings for the US between year 2010 and 2022.
  • Amazon MWAA execution role.
  • Two Amazon S3 buckets – one for Amazon MWAA DAGs, one for AWS Glue job scripts and weather data.
  • AWSGlueServiceRole to be used by AWS Glue Crawler and AWS Glue job.

Prerequisites

There are a few tools required to deploy the sample application. Ensure that you have each of the following in your working environment:

Setting up the Amazon MWAA environment and associated resources

  1. From your local machine, clone the project from the GitHub repository.
  2. git clone https://github.com/aws-samples/amazon-mwaa-examples

  3. Navigate to mwaa_utilization_cw_metric directory.
  4. cd usecases/mwaa_utilization_cw_metric

  5. Run the makefile.
  6. make deploy

  7. Makefile runs the terraform template from the infra/terraform directory. While the template is being applied, you are prompted if you want to perform these actions.
  8. MWAA utilization terminal

    MWAA utilization terminal

This provisions the resources and copies the necessary files and variables for the DAG to run. This process can take approximately 30 minutes to complete.

Generating metric data and exploring the metrics

  1. Login into your AWS account through the AWS Management Console.
  2. In the Amazon MWAA environment console, you can see your environment with the Airflow UI link in the right of the console.
  3. MMQA environment console

    MMQA environment console

  4. Select the link Open Airflow UI. This loads the Apache Airflow UI.
  5. Apache Airflow UI

    Apache Airflow UI

  6. From the Apache Airflow UI, enable the DAG using Pause/Unpause DAG toggle button and run the DAG using the Trigger DAG link.
  7. You can see the Treeview of the DAG run with the tasks running.
  8. Navigate to the Amazon CloudWatch dashboard in another browser tab. You can see a dashboard by the name, MWAA_Metric_Environment_env_health_metric_dashboard.
  9. Access the dashboard to view different key metrics across cluster, database, and queue.
  10. MWAA dashboard

    MWAA dashboard

  11. After the DAG run is complete, you can look into the dashboard for worker count metrics. Worker count started with 2 and increased to 4.

When you trigger the DAG, the DAG runs 13 tasks in parallel to fetch weather data from 2010-2022. With two small size workers, the environment can run 10 parallel tasks. The rest of the tasks wait for either the running tasks to complete or automatic scaling to start. As the tasks take more than a few minutes to finish, MWAA automatic scaling adds additional workers to handle the workload. Worker count graph now plots higher with AdditionalWorker count increased to 3 from 1.

Cleanup

To delete the sample application infrastructure, use the following command from the usecases/mwaa_utilization_cw_metric directory.

make undeploy

Conclusion

This post introduces the new Amazon MWAA container, database, and queue utilization metrics. The example shows the key metrics and how you can use the metrics to solve a common question of finding the Amazon MWAA worker counts. These metrics are available to you from today for all versions supported by Amazon MWAA at no additional cost.

Start using this feature in your account to monitor the health and performance of your Amazon MWAA environment, troubleshoot issues related to capacity and delays, and to get insights into right-sizing the environment

Build your own CloudWatch dashboard using the metrics data JSON and Airflow metrics. To deploy more solutions in Amazon MWAA, explore the Amazon MWAA samples GitHub repo.

For more serverless learning resources, visit Serverless Land.

Apache, Apache Airflow, and Airflow are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries.

How ZS created a multi-tenant self-service data orchestration platform using Amazon MWAA

Post Syndicated from Manish Mehra original https://aws.amazon.com/blogs/big-data/how-zs-created-a-multi-tenant-self-service-data-orchestration-platform-using-amazon-mwaa/

This is post is co-authored by Manish Mehra, Anirudh Vohra, Sidrah Sayyad, and Abhishek I S (from ZS), and Parnab Basak (from AWS). The team at ZS collaborated closely with AWS to build a modern, cloud-native data orchestration platform.

ZS is a management consulting and technology firm focused on transforming global healthcare and beyond. We leverage our leading-edge analytics, plus the power of data, science, and products, to help our clients make more intelligent decisions, deliver innovative solutions, and improve outcomes for all. Founded in 1983, ZS has more than 12,000 employees in 35 offices worldwide.

ZAIDYNTM by ZS is an intelligent, cloud-native platform that helps life sciences organizations shape the future. Its analytics, algorithms, and workflows empower people, transform processes, and unlock real value. Designed to learn and grow with our clients, the platform is modular, future-ready, and fueled by global connectivity. And as more people engage, share, and build, our platform gets smarter—helping organizations fuel discovery, connect with customers, deliver treatments, and improve lives. ZAIDYN is helping companies of all sizes gain fluency in the full spectrum of life sciences so they can move faster, together through its Data & Analytics, Customer Engagement, Field Performance and Clinical Development offerings.

ZAIDYN Data & Analytics apps provide business users with self-service tools to innovate and scale insights delivery across the enterprise. ZAIDYN Data Hub (a part of the Data & Analytics product category) provides self-service options for guided workflows, data connectors, quality checks, and more. The elastic data processing offered by AWS helps prioritize processing speeds.

Data Hub customers wanted a one-stop solution for managing their data pipelines. A solution that does not require end users to gain additional knowledge about the nitty-gritties of the tool, one which is easy for users to get onboarded on, thereby increasing the demand for data orchestration capabilities within the application. A few of the sophisticated asks like start and stop of workflows, maintaining history of past runs, and providing real-time status updates for individual tasks of the workflow became increasingly important for end clients. We needed a mature orchestration tool, which led us to Amazon Managed Workflows for Apache Airflow (Amazon MWAA).

Amazon MWAA is a managed orchestration service for Apache Airflow that makes it easier to set up and operate end-to-end data pipelines in the cloud at scale.

In this post, we share how ZS created a multi-tenant self-service data orchestration platform using Amazon MWAA.

Why we chose Amazon MWAA

Choosing the right orchestration tool was critical for us because we had to ensure that the service was operationally efficient and cost-effective, provided high availability, had extensive features to support our business cases, and yet was easy to adapt for our end-users (data engineers). We evaluated and experimented among Amazon MWAA, Azkaban on Amazon EMR, and AWS Step Functions before project initiation.

The following benefits of Amazon MWAA convinced us to adopt it:

  • AWS managed service – With Amazon MWAA, we don’t have to manage the underlying infrastructure for scalability and availability to maintain quality of service. The built-in autoscaling mechanism of Amazon MWAA automatically increases the number of Apache Airflow workers in response to running and queued tasks, and disposes of extra workers when there are no more tasks queued or running. The default environment is already built for high availability with multiple Airflow schedulers and workers, and the metadata database distributed across multiple Availability Zones. We also evaluated hosting open-source Airflow on our ZS infrastructure. However, due to infrastructure maintenance overhead and the high investment needed to make and maintain it at production grade, we decided to drop that option.
  • Security – With Amazon MWAA, our data is secure by default because workloads run in our own isolated and secure cloud environment using Amazon Virtual Private Cloud (Amazon VPC), and data is automatically encrypted using AWS Key Management Service (AWS KMS). We can control role-based authentication and authorization for Apache Airflow’s user interface via AWS Identity and Access Management (IAM), providing users single sign-on (SSO) access for scheduling and viewing workflow runs.
  • Compatibility and active community support – Amazon MWAA hosts the same open-source Apache Airflow version without any forks. The open-source community for Apache Airflow is very active with multiple commits, files changes, issue resolutions, and community advice.
  • Language and connector support – The flow definitions for Apache Airflow are based on Python, which is easy for our engineers to adapt. An extensive list of features and connectors is available out of the box in Amazon MWAA, including connectors for Hive, Amazon EMR, Livy, and Kubernetes. We needed to run all our Data Hub jobs (ingestion, applying custom rules and quality checks, or exporting data to third-party systems) on Amazon EMR. The necessary Amazon EMR operators are already available as a part of the Amazon-provided package for Airflow (apache-airflow-providers-amazon), which we could supplement rather than construct one from the ground up.
  • Cost – Cost was the most important aspect for us when adopting Amazon MWAA. Amazon MWAA is useful for those who are running thousands of tasks in the prod environment, which is why we decided to the make the Amazon MWAA environment multi-tenant such that the cost can be shared among clients. With our large Amazon MWAA environment, we only pay for what we use, with no minimum fees or upfront commitments. We estimated paying less than $1,000 per month, combined for our environment usage and additional worker instance pricing, yet achieve the scale of being able to run 200 concurrent tasks running 3 hours per day over 10 concurrent workers. This meant reduced operational costs and engineering overhead while meeting the on-demand monitoring needs of end-to-end data pipeline orchestration.

Solution overview

The following diagram illustrates the solution architecture.

We have a common control tier account where we host our software as a service application (Data Hub) on Amazon Elastic Compute Cloud (Amazon EC2) instances. Each client has their own version of this application deployed on this shared infrastructure. Amazon MWAA is also hosted in the same common control tier account. The control tier account has connectivity with tenant-specific AWS accounts. This is to maintain strong physical isolation of client data by segregating the AWS accounts for each client. Each client-specific account hosts EMR clusters where data processing takes place. When a processing job is complete, data may reside on Amazon EMR (an HDFS cluster) or on Amazon Simple Storage Service (Amazon S3), an EMRFS cluster, depending on configuration. The DAG files generated by our Data Hub application contain metadata of the processes, and don’t contain any sensitive client information. When a job is submitted from Data Hub, the API request contains tenant-specific information needed to pull up the corresponding AWS connection details, which are stored as Airflow connection objects. These connection details are consumed by our custom implementation of Airflow EMR step operators (add and watch) to perform operations on the tenant EMR clusters.

Because the data orchestration capability is an application offering, the client teams create their processes on the Data Hub UI and don’t have access to the underlying Amazon MWAA environment.

The following screenshot shows how an end-user can configure Data Hub process on the application UI.

How Data Hub processes map to Amazon MWAA DAGs

Data Hub processes map to Amazon MWAA DAGs as follows:

  • Each process in Data Hub corresponds to a DAG in Amazon MWAA, and each component is a task (denoted by Sn​) that is submitted as a step on the client EMR clusters.
  • The application generates the DAG file dynamically and updates it on the S3 bucket linked to the Amazon MWAA environment.
  • Parsing dedicated structures representing a given process and submitting or tracking the Amazon EMR steps is abstracted from the end-user. Dynamic DAG generation is responsible for using the latest version of the underlying components and helps in managing the DAG schedule.
  • Some Airflow tasks are created as a part of the DAG, which take care of interacting with the application APIs to ensure that the required metadata is captured in a separate Amazon Relational Database Service (Amazon RDS) database instance.

A user can trigger a given process to run from the Data Hub UI or can schedule it to run at a specified time. Because a single Amazon MWAA environment is responsible for the data orchestration needs of multiple clients, our DAG decode logic ensures that the correct EMR cluster ID and Airflow connection ID are picked up at runtime. The configs responsible for storing these details are placed and updated on the S3 buckets via an automated deployment pipeline. A dedicated connection ID is created per client in Airflow, which is then utilized in our custom implementation of EmrAddStepsOperator. The connection ID captures the Region and role ARN to be assumed to interact with the EMR cluster in the client account. These cross-account roles have access to limited resources in each client account, following the principle of least privilege.

Generating a DAG from a process defined on Data Hub UI

Our front-end application is built using Angular (version 11) and uses a third-party library that facilitates drag-and-drop of components from the left pane on a canvas. Components are stitched together with connections defining dependencies to form a process. This process is translated by our custom engine to generate a dynamic Airflow DAG. A sample DAG generated from the preceding example process defined on the UI looks like the following figure.

We wrap the DAG by PEntry and PExit Python operators, and for each of the components on the Data Hub UI, we create two tasks: Cn and Wn.

The relevant terms for this solution are as follows:

  • PEntry​ – The Python operator used to insert an entry in the RDS database that the process run has started via API call.​
  • Cn– The ZS custom implementation of EMRAddStepsOperator used to submit a job (Data Hub component) on a running EMR cluster.​ This is followed by an API call to insert an entry in the database that the component job has started.​
  • Wn– The custom implementation of Airflow Watcher (EmrStepSensor), which checks the status of the step from our metadata database.​
  • PExit​ – The Python operator used to update an entry in the RDS database (more of a finally block) via API call.​

Lessons learned during the implementation

When implementing this solution, we learned the following:

  • We faced challenges in being able to consistently predict when a DAG will be parsed and made available in the Airflow UI in Amazon MWAA after the DAG file is synced to the linked S3 bucket. Depending on how complex the DAG is, it could happen within seconds or several minutes. Due to the lack of availability of an API or AWS Command Line Interface (AWS CLI) command to ascertain this, we put in some blanket restrictions (delay) on user operations from our UI to overcome this limitation.
  • Within Airflow, data pipelines are represented by DAGs, and these DAGs change over time as business needs evolve. A key challenge faced by Airflow users is looking at how a DAG was run in the past, and when it was replaced by a newer version of the DAG. This is because within Airflow (as of this writing), only the current (latest) version of the DAG is represented within the user interface, without any reference to prior versions of the DAG. To overcome this limitation, we implemented a backend way of generating a DAG from the available metadata, and use it to version control over runs.
  • Airflow CLI commands when invoked in DAGs always return an HTTP 200 response. You can’t solely rely on the HTTP response code to ascertain the status of commands. We applied additional parsing logic (particularly to analyze the errors on failure) to determine the true status of commands.
  • Airflow doesn’t have a command to gracefully stop a DAG that is currently running. You can stop a DAG (unmark as running) and clear the task’s state or even delete it in the UI. The actual running tasks in the executor won’t stop, but might be stopped if the executor realizes that it’s not in the database anymore.

Conclusion

Amazon MWAA sets up Apache Airflow for you using the same Apache Airflow user interface and open-source code. With Amazon MWAA, you can use Airflow and Python to create workflows without having to manage the underlying infrastructure for scalability, availability, and security. Amazon MWAA automatically scales its workflow run capacity to meet your needs, and is integrated with AWS security services to help provide you with fast and secure access to your data. In this post, we discussed how you can build a bridge tenancy isolation model with a central Amazon MWAA orchestrating task against independent infrastructure stacks in dedicated accounts deployed for each of your tenants. Through a custom UI, you can enable self-service workflow runs via Airflow dynamic DAGs using the power and flexibility of Python. This enables you to achieve economies of scale and operational efficiency while meeting your regulatory, security, and cost considerations.


About the Authors

Manish Mehra is a Software Architect, working with the SD group in ZS. He has more than 11 years of experience working in banking, gaming, and life science domains. He is currently looking into the architecture of the Data & Analytics product category of the ZAIDYN Platform. He has expertise in full-stack application development and building robust, scalable, enterprise-grade big data applications.

Anirudh Vohra is a Director of Cloud Architecture, working within the Cloud Center of Excellence space at ZS. He is passionate about being a developer advocate for internal engineering teams, also designing and building cloud platforms and abstractions to empower developers and troubleshoot complex systems.

Abhishek I S is Associate Cloud Architect at ZS Associates working within the Cloud Centre of Excellence space. He has diverse experience ranging from application development to cloud engineering. Currently, he is primarily focusing on architecture design and automation for the cloud-native solutions of various ZS products.

Sidrah Sayyad is an Associate Software Architect at ZS working within the Software Development (SD) group. She has 9 years of experience, which includes working on identity management, infrastructure management, and ETL applications. She is passionate about coding and helps architect and build applications to achieve business outcomes.

Parnab Basak is a Solutions Architect and a Serverless Specialist at AWS. He specializes in creating new solutions that are cloud native using modern software development practices like serverless, DevOps, and analytics. Parnab was closely involved with the engagement with ZS, providing architectural guidance as well as helping the team overcome technical challenges during the implementation.

How GE Proficy Manufacturing Data Cloud replatformed to improve TCO, data SLA, and performance

Post Syndicated from Jyothin Madari original https://aws.amazon.com/blogs/big-data/how-ge-proficy-manufacturing-data-cloud-replatformed-to-improve-tco-data-sla-and-performance/

This is post is co-authored by Jyothin Madari, Madhusudhan Muppagowni and Ayush Srivastava from GE.

GE Proficy Manufacturing Data Cloud (MDC), part of the GE Digital’s Manufacturing Execution Systems (MES) suite of solutions, allows GED’s customers to increase the derived value easily and quickly from the MES by reliably bringing enterprise-wide manufacturing data into the cloud and transforming it into a structured dataset for advanced analytics and deeper insights into the manufacturing processes.

In this post, we share how MDC modernized the hybrid cloud strategy by replatforming. This solution improved scalability, their data availability Service Level Agreement (SLA), and performance.

Challenge

MDC v1 was built on Predix services using industrial use case-optimized Predix services such as Predix Columnar Store (Cassandra) and Predix Insights (Amazon EMR). MDC evolved in both features and the underlying platform over the past year with a goal to improve TCO, data SLA, and performance. MDC’s customer base grew and the number of sites from customers grew to over 100 in the past couple of years. The increased number of sites needed more compute and storage capacity. This increased infrastructure and operational cost significantly, while introducing increased data latency and lowering the data freshness interval from the cloud.

How we started

MDC evaluated several vendors for their storage and compute capabilities using various measurements: security, performance, scalability, ease of management and operation, reduction of overall cost and increase in ROI, partnership, and migration help (technology assistance). The MDC team saw opportunities to improve the product by using native AWS services such as Amazon Redshift, AWS Glue, and Amazon Managed Workflows for Apache Airflow (Amazon MWAA), which made the product more performant and scalable while reducing operation costs and making it future-ready for advanced analytics and new customer use cases.

The GE Digital team, comprised of domain experts, developers, and QA, worked shoulder to shoulder with the AWS ProServe team, comprised of Solution Architects, Data Architects, and Big Data Experts, in determining the key architectural changes required and solutions to implementation challenges.

Overview of solution

The following diagram illustrates the high-level architecture of the solution.

This is a broad overview, and the specifics of networking and security between components are out of scope for this post.

The solution includes the following main steps and components:

  1. CDC and log collector – Compressed CSV data is collected from over 100 Manufacturing Data Sources Proficy Plant Applications and sinked into an Amazon Simple Storage Service (Amazon S3) bucket.
  2. S3 raw bucket – Our data lands in Amazon S3 without any transformation, but appropriately partitioned (tenant, site, date, and so on) for the ease of future processing.
  3. AWS Lambda – When the file lands in the S3 raw bucket, it triggers an S3 event notification, which invokes AWS Lambda. Lambda extracts metadata (bucket name, key name, date, and so on) from the event and saves it in Amazon DynamoDB.
  4. AWS Glue – Our goal is now to take CSV files, with varying schemas, and convert them into Apache Parquet format. An AWS Glue extract, transform, and load (ETL) job reads a list of files to be processed from the DynamoDB table and fetches them from the S3 raw bucket. We have preconfigured unified AVRO schemas in the AWS Glue Schema Registry for schema conversion. Converted data lands in the S3 raw Parquet bucket.
  5. S3 raw Parquet bucket – Data in this bucket is still raw and unmodified; only the format was changed. This intermediary storage is required due to schema and column order mismatch in CSV files.
  6. Amazon Redshift – The majority of transformations and data enrichment happens in this step. Amazon Redshift Spectrum consumes data from the S3 raw Parquet bucket and external PostgreSQL dimension tables (through a federated query). Transformations are performed via stored procedures, where we encapsulate logic for data transformation, data validation, and business-specific logic. The Amazon Redshift cluster is configured with concurrency scaling, auto workload management (WLM) with caching, and the latest RA3 instance types.
  7. MDC API – These custom-built, web-based, REST API microservices talk on the backend with Amazon Redshift and expose data to external users, business intelligence (BI) tools, and partners.
  8. Amazon Redshift data export and archival – On a scheduled basis, Amazon Redshift exports (UNLOAD command) contextualized and business-defined aggregated data. Exports are landed in the S3 bucket as Apache Parquet files.
  9. S3 Parquet export bucket – This bucket stores the exported data (hundreds of TBs) used by external users who need to run extensive, heavy analytics and AI or machine learning (ML) with various tools (such as Amazon EMR, Amazon Athena, Apache Spark, and Dremio).
  10. End-users – External users consume data from the API. The main use case here is reporting and visual analytics.
  11. Amazon MWAA – The orchestrator of the solution, Amazon MWAA is used for scheduling Amazon Redshift stored procedures, AWS Glue ETL jobs, and Amazon Redshift exports at regular intervals with error handling and retries built in.

Bringing it all together

MDC replaced both Predix Columnar Store (Cassandra) and Predix Insights (Amazon EMR) with Amazon Redshift for both storage of the MDC data models and compute (ELT). Amazon MWAA is used to schedule the workloads that do the bulk of the ELT. Lambda, AWS Glue, and DynamoDB are used to normalize the schema differences between sites. It was important not to disrupt MDC customers while replatforming. To achieve this, MDC used a phased approach to migrate the data models to Amazon Redshift. They used federated queries to query existing PostgreSQL for dimensional data, which facilitated having some of the data models in Amazon Redshift, while the others were in Cassandra with no interruption to MDC customers. Redshift Spectrum facilitated querying the raw data in Amazon S3 directly both for ETL and data validation.

75% of the MDC team along with the AWS ProServe team and AWS Solution Architects collaborated with the GE Digital Security Team and Platform Team to implement the architecture with AWS native services. It took approximately 9 months to implement, secure, and performance tune the architecture and migrate data models in three phases. Each phase has gone through a GE Digital internal security review. Amazon Redshift Auto WLM, short query acceleration, and tuning the sort keys to optimize querying patterns improved the Proficy MDC API performance. Because the unload of the data from Amazon Redshift was fast, Proficy MDC is now able to export the data much more frequently to our end customers.

Conclusion

With replatforming, Proficy MDC was able to improve ETL performance by approximately 75%. Data latency and freshness improved by approximately 87%. The solution reduced TCO of the platform by approximately 50%. Proficy MDC was also able reduce the infrastructure and operational cost. Improved performance and reduced latency has allowed us to speed up the next steps in our journey to modernize the enterprise data architecture and hybrid cloud data platform.


About the Authors

Jyothin Madari leads the Manufacturing Data Cloud (MDC) engineering team; part of the manufacturing suite of products at GE Digital. He has 18 years of experience, 4 of which is with GE Digital. Most recently he has been working on data migration projects with an aim to reduce costs and improve performance. He is an AWS Certified Cloud Practitioner, a keen learner and loves solving interesting problems. Connect with him on LinkedIn.

Madhusudhan (Madhu) Muppagowni is a Technical Architect and Principal Software Developer based in Silicon Valley, Bay Area, California.  He is passionate about Software Development and Architecture. He thrives on producing Well-Architected and Secure SaaS Products, Data Pipelines that can make a real impact.  He loves outdoors and an avid hiker and backpacker. Connect with him on LinkedIn.

Ayush Srivastava is a Senior Staff Engineer and Technical Anchor based in Hyderabad, India. He is passionate about Software Development and Architecture. He has Demonstrated track record of successfully technical anchoring small to large Secure SaaS Products, Data Pipelines from start to finish. He loves exploring different places and he says “I’m in love with cities I have never been to and people I have never met.” Connect with him on LinkedIn.

Karen Grygoryan is Data Architect with AWS ProServe. Connect with him on LinkedIn.

Gnanasekaran Kailasam is a Data Architect at AWS. He has worked with building data warehouses and big data solutions for over 16 years. He loves to learn new technologies and solving, automating, and simplifying customer problems with easy-to-use cloud data solutions on AWS. Connect with him on LinkedIn.

Persist and analyze metadata in a transient Amazon MWAA environment

Post Syndicated from Praveen Kumar original https://aws.amazon.com/blogs/big-data/persist-and-analyze-metadata-in-a-transient-amazon-mwaa-environment/

Customers can harness sophisticated orchestration capabilities through the open-source tool Apache Airflow. Airflow can be installed on Amazon EC2 instances or can be dockerized and deployed as a container on AWS container services. Alternatively, customers can also opt to leverage Amazon Managed Workflows for Apache Airflow (MWAA).

Amazon MWAA is a fully managed service that enables customers to focus more of their efforts on high-impact activities such as programmatically authoring data pipelines and workflows, as opposed to maintaining or scaling the underlying infrastructure. Amazon MWAA offers auto-scaling capabilities where it can respond to surges in demand by scaling the number of Airflow workers out and back in.

With Amazon MWAA, there are no upfront commitments and you only pay for what you use based on instance uptime, additional auto-scaling capacity, and storage of the Airflow back-end metadata database. This database is provisioned and managed by Amazon MWAA and contains the necessary metadata to support the Airflow application.  It hosts key data points such as historical execution times for tasks and workflows and is valuable in understanding trends and behaviour of your data pipelines over time. Although the Airflow console does provide a series of visualisations that help you analyse these datasets, these are siloed from other Amazon MWAA environments you might have running, as well as the rest of your business data.

Data platforms encompass multiple environments. Typically, non-production environments are not subject to the same orchestration demands and schedule as those of production environments. In most instances, these non-production environments are idle outside of business hours and can be spun down to realise further cost-efficiencies. Unfortunately, terminating Amazon MWAA instances results in the purging of that critical metadata.

In this post, we discuss how to export, persist and analyse Airflow metadata in Amazon S3 enabling you to run and perform pipeline monitoring and analysis. In doing so, you can spin down Airflow instances without losing operational metadata.

Benefits of Airflow metadata

Persisting the metadata in the data lake enables customers to perform pipeline monitoring and analysis in a more meaningful manner:

  • Airflow operational logs can be joined and analysed across environments
  • Trend analysis can be conducted to explore how data pipelines are performing over time, what specific stages are taking the most time, and how is performance effected as data scales
  • Airflow operational data can be joined with business data for improved record level lineage and audit capabilities

These insights can help customers understand the performance of their pipelines over time and guide focus towards which processes need to be optimised.

The technique described below to extract metadata is applicable to any Airflow deployment type, but we will focus on Amazon MWAA in this blog.

Solution Overview

The below diagram illustrates the solution architecture. Please note, Amazon QuickSight is NOT included as part of the CloudFormation stack and is not covered in this tutorial. It has been placed in the diagram to illustrate that metadata can be visualised using a business intelligence tool.

As part of this tutorial, you will be performing the below high-level tasks:

  • Run CloudFormation stack to create all necessary resources
  • Trigger Airflow DAGs to perform sample ETL workload and generate operational metadata in back-end database
  • Trigger Airflow DAG to export operational metadata into Amazon S3
  • Perform analysis with Amazon Athena

This post comes with an AWS CloudFormation stack that automatically provisions the necessary AWS resources and infrastructure, including an active Amazon MWAA instance, for this solution. The entire code is available in the GitHub repository.

The Amazon MWAA instance will already have three directed-acyclic graphs (DAGs) imported:

  1. glue-etl – This ETL workflow leverages AWS Glue to perform transformation logic on a CSV file (customer_activity.csv). This file will be loaded as part of the CloudFormation template into the s3://<DataBucket>/raw/ prefix.

The first task glue_csv_to_parquet converts the ‘raw’ data to parquet format and stores the data in location s3://<DataBucket>/optimised/.  By converting the data in parquet format, you can achieve faster query performance and lower query costs.

The second task glue_transform runs an aggregation over the newly created parquet format and stores the aggregated data in location s3://<DataBucket>/conformed/.

  1. db_export_dag – This DAG consists of one task, export_db, which exports the data from the back-end Airflow database into Amazon S3 in the location s3://<DataBucket>/export/.

Please note that you may experience time-out issues when extracting large amounts of data. On busy Airflow instances, our recommendation will be to set up frequent extracts in small chunks.

  1. run-simple-dag – This DAG does not perform any data transformation or manipulation. It is used in this blog for the purposes of populating the back-end Airflow database with sufficient operational data.

Prerequisites

To implement the solution outlined in this blog, you will need following :

Steps to run a data pipeline using Amazon MWAA and saving metadata to s3:

  1. Choose Launch Stack:
  2. Choose Next.
  3. For Stack name, enter a name for your stack.
  4. Choose Next.
  5. Keep the default settings on the ‘Configure stack options’ page, and choose Next.
  6. Acknowledge that the template may create AWS Identity and Access Management (IAM) resources.
  7. Choose Create stack. The stack can take up to 30 mins to complete.

The CloudFormation template generates the following resources:

    • VPC infrastructure that uses Public routing over the Internet.
    • Amazon S3 buckets required to support Amazon MWAA, detailed below:
      • The Data Bucket, refered in this blog as s3://<DataBucket>, holds the data which will be optimised and transformed for further analytical consumption. This bucket will also hold the data from the Airflow back-end metadata database once extracted.
      • The Environment Bucket, refered in this blog as s3://<EnvironmentBucket>, stores your DAGs, as well as any custom plugins, and Python dependencies you may have.
    • Amazon MWAA environment that’s associated to the  s3://<EnvironmentBucket>/dags location.
    • AWS Glue jobs for data processing and help generate airflow metadata.
    • AWS Lambda-backed custom resources to upload to Amazon S3 the sample data, AWS Glue scripts and DAG configuration files,
    • AWS Identity and Access Management (IAM) users, roles, and policies.
  1. Once the stack creation is successful, navigate to the Outputs tab of the CloudFormation stack and make note of DataBucket and EnvironmentBucket name. Store your Apache Airflow Directed Acyclic Graphs (DAGs), custom plugins in a plugins.zip file, and Python dependencies in a requirements.txt file.
  2. Open the Environments page on the Amazon MWAA console.
  3. Choose the environment created above. (The environment name will include the stack name). Click on Open Airflow UI.
  4. Choose glue-etl DAG , unpause by clicking the radio button next to the name of the DAG and click on the Play Button on Right hand side to Trigger DAG. It may take up to a minute for DAG to appear.
  5. Leave Configuration JSON as empty and hit Trigger.
  6. Choose run-simple-dag DAG, unpause and click on Trigger DAG.
  7. Once both DAG executions have completed, select the db_export_dag DAG, unpause and click on Trigger DAG. Leave Configuration JSON as empty and hit Trigger.

This step will extract the dag and task metadata to a S3 location. This is a sample list of tables and more tables can be added as required. The exported metadata will be located in s3://<DataBucket>/export/ folder.

Visualise using Amazon QuickSight and Amazon Athena

Amazon Athena is a serverless interactive query service that can be used to run exploratory analysis on data stored in Amazon S3.

If you are using Amazon Athena for the first time, please find the steps here to setup query location. We can use Amazon Athena to explore and analyse the metadata generated from airflow dag runs.

  1. Navigate to Athena Console and click explore the query editor.
  2. Hit View Settings.
  3. Click Manage.
  4. Replace with s3://<DataBucket>/logs/athena/. Once completed, return to the query editor.
  5. Before we can perform our pipeline analysis, we need to create the below DDLs. Replace the <DataBucket> as part of the LOCATION clause with the parameter value as defined in the CloudFormation stack (noted in Step 8 above).
    CREATE EXTERNAL TABLE default.airflow_metadata_dagrun (
            sa_instance_state STRING,
            dag_id STRING,
            state STRING,
            start_date STRING,
            run_id STRING,
            external_trigger STRING,
            conf_name STRING,
            dag_hash STRING,
             id STRING,
            execution_date STRING,
            end_date STRING,
            creating_job_id STRING,
            run_type STRING,
            last_scheduling_decision STRING
       )
    PARTITIONED BY (dt string)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    LOCATION 's3://<DataBucket>/export/dagrun/'
    TBLPROPERTIES ("skip.header.line.count"="1");
    MSCK REPAIR TABLE default.airflow_metadata_dagrun;
    
    CREATE EXTERNAL TABLE default.airflow_metadata_taskinstance (
            sa_instance_state STRING,
            start_date STRING,
            job_id STRING,
            pid STRING,
            end_date STRING,
            pool STRING,
            executor_config STRING,
            duration STRING,
            pool_slots STRING,
            external_executor_id STRING,
            state STRING,
            queue STRING,
            try_number STRING,
            max_tries STRING,
            priority_weight STRING,
            task_id STRING,
            hostname STRING,
            operator STRING,
            dag_id STRING,
            unixname STRING,
            queued_dttm STRING,
            execution_date STRING,
            queued_by_job_id STRING,
            test_mode STRING
       )
    PARTITIONED BY (dt string)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    LOCATION 's3://<DataBucket>/export/taskinstance/'
    TBLPROPERTIES ("skip.header.line.count"="1");
    MSCK REPAIR TABLE default.airflow_metadata_taskinstance;

  6. You can preview the table in the query editor of Amazon Athena.

  7. With the metadata persisted, you can perform pipeline monitoring and derive some powerful insights on the performance of your data pipelines overtime. As an example to illustrate this, execute the below SQL query in Athena.

This query returns pertinent metrics at a monthly grain which include number of executions of the DAG in that month, success rate, minimum/maximum/average duration for the month and a variation compared to the previous months average.

Through the below SQL query, you will be able to understand how your data pipelines are performing over time.

select dag_run_prev_month_calcs.*
        , avg_duration - prev_month_avg_duration as var_duration
from
    (
select dag_run_monthly_calcs.*
            , lag(avg_duration, 1, avg_duration) over (partition by dag_id order by year_month) as prev_month_avg_duration
    from
        (
            select dag_id
                    , year_month
                    , sum(counter) as num_executions
                    , sum(success_ind) as num_success
                    , sum(failed_ind) as num_failed
                    , (cast(sum(success_ind) as double)/ sum(counter))*100 as success_rate
                    , min(duration) as min_duration
                    , max(duration) as max_duration
                    , avg(duration) as avg_duration
            from
                (
                    select dag_id
                            , 1 as counter
                            , case when state = 'success' then 1 else 0 end as success_ind
                            , case when state = 'failed' then 1 else 0 end as failed_ind
                            , date_parse(start_date,'%Y-%m-%d %H:%i:%s.%f+00:00') as start_date
                            , date_parse(end_date,'%Y-%m-%d %H:%i:%s.%f+00:00') as end_date
                            , date_parse(end_date,'%Y-%m-%d %H:%i:%s.%f+00:00') - date_parse(start_date,'%Y-%m-%d %H:%i:%s.%f+00:00') as duration
                            , date_format(date_parse(start_date,'%Y-%m-%d %H:%i:%s.%f+00:00'), '%Y-%m') as year_month
                    from "default"."airflow_metadata_dagrun"
                    where state <> 'running'
                )  dag_run_counters
            group by dag_id, year_month
        ) dag_run_monthly_calcs
    ) dag_run_prev_month_calcs
order by dag_id, year_month

  1. You can also visualize this data using your BI tool of choice. While step by step details of creating a dashboard is not covered in this blog, please refer the below dashboard built on Amazon QuickSight as an example of what can be built based on the metadata extracted above. If you are using Amazon QuickSight for the first time, please find the steps here on how to get started.

Through QuickSight, we can quickly visualise and derive that our data pipelines are completing successfully, but on average are taking a longer time to complete over time.

Clean up the environment

  1. Navigate to the S3 console and click on the <DataBucket> noted in step 8 above.
  2. Click on Empty bucket.
  3. Confirm the selection.
  4. Repeat this step for bucket <EnvironmentBucket> (noted in step 8 above) and Empty bucket.
  5. Run the below statements in the query editor to drop the two Amazon Athena tables. Run statements individually.
    DROP TABLE default.airflow_metadata_dagrun;
    DROP TABLE default.airflow_metadata_taskinstance;

  6. On the AWS CloudFormation console, select the stack you created and choose Delete.

Summary

In this post, we presented a solution to further optimise the costs of Amazon MWAA by tearing down instances whilst preserving the metadata. Storing this metadata in your data lake enables you to better perform pipeline monitoring and analysis. This process can be scheduled and orchestrated programatically and is applicable to all Airflow deployments, such as Amazon MWAA, Apache Airflow installed on Amazon EC2, and even on-premises installations of Apache Airflow.

To learn more, please visit Amazon MWAA and Getting Started with Amazon MWAA.


About the Authors

Praveen Kumar is a Specialist Solution Architect at AWS with expertise in designing, building, and implementing modern data and analytics platforms using cloud-native services. His areas of interests are serverless technology, streaming applications, and modern cloud data warehouses.

Avnish Jain is a Specialist Solution Architect in Analytics at AWS with experience designing and implementing scalable, modern data platforms on the cloud for large scale enterprises. He is passionate about helping customers build performant and robust data-driven solutions and realise their data & analytics potential.

How to Run Massively Scalable ADAS Simulation Workloads on CAEdge

Post Syndicated from Hendrik Schoeneberg original https://aws.amazon.com/blogs/architecture/how-to-run-massively-scalable-adas-simulation-workloads-on-caedge/

This post was co-written by Hendrik Schoeneberg, Sr. Global Big Data Architect, The An Binh Nguyen, Product Owner for Cloud Simulation at Continental, Autonomous Mobility – Engineering Platform, Rumeshkrishnan Mohan, Global Big Data Architect, and Junjie Tang, Principal Consultant at AWS Professional Services.

AV/ADAS simulations processing large-scale field sensor data such as radar, lidar, and high-resolution video come with many challenges. Typically, the simulation workloads are spiky with occasional, but high compute demands, so the platform must scale up or down elastically to match the compute requirements. The platform must be flexible enough to integrate specialized ADAS simulation software, use distributed computing or HPC frameworks, and leverage GPU accelerated compute resources where needed.

Continental created the Continental Automotive Edge (CAEdge) Framework to address these challenges. It is a modular multi-tenant hardware and software framework that connects the vehicle to the cloud. You can learn more about this in Developing a Platform for Software-defined Vehicles with Continental Automotive Edge (CAEdge) and Developing a platform for software-defined vehicles (re:Invent session-AUT304).

In this blog post, we’ll illustrate how the CAEdge Framework orchestrates ADAS simulation workloads with Amazon Managed Workflows for Apache Airflow (MWAA). We’ll show how it delegates the high-performance workloads to AWS Batch for elastic, highly scalable, and customizable compute needs. We’ll showcase the “bring your own software-in-the-loop” (BYO-SIL) pattern, detailing how to leverage specialized and proprietary simulation software in your workflow. We’ll also demonstrate how to integrate the simulation platform with tenant data in the CAEdge framework. This orchestrate and delegate pattern has previously been introduced in Field Notes: Deploying Autonomous Driving and ADAS Workloads at Scale with Amazon Managed Workflows for Apache Airflow and the Reference Architecture for Autonomous Driving Data Lake.

Solution overview for autonomous driving simulation

The following diagram shows a high-level overview of this solution.

Figure 1. Architecture diagram for autonomous driving simulation

Figure 1. Architecture diagram for autonomous driving simulation

It can be broken down into five major parts, illustrated in Figure 1.

  1. Simulation API: Amazon API Gateway (label 1) provides a REST API for authenticated users to schedule or monitor simulation. It runs on Amazon Managed Workflows for Apache Airflow (MWAA) using AWS Lambda (label 2).
  2. Simulation control plane: The simulation control plane (label 3) built on MWAA enables users to design and initiate workflows and integrate with AWS services.
  3. Scalable compute backend: We leverage the parallelization and elastic scalability capabilities of AWS Batch to distribute the workload (label 4). Additionally, we want to be able to run proprietary software components as part of the simulation workflows and use highly customizable Amazon EC2 compute environments. For example, we can use GPU acceleration or Graviton-based instances for workloads that must run on ARM, instead of x86 architectures.
  4. Autonomous Driving Data Lake (ADDL) integration: The simulations’ input and output data will be stored in a data lake (label 5) on Amazon S3. To provide efficient read and write access, data gets copied before each simulation run using RAID0 bundled ephemeral instance storage drives. After the simulation, results are written back to the data lake and are ready for reporting and analytics. We use AWS Lake Formation for metadata storage, data cataloging, and permission handling.
  5. Automated deployment: The solution architecture’s deployment is fully automated using a CI/CD pipeline in AWS CodePipeline and AWS CodeBuild (label 6). We can then perform automated testing and deployment to multiple target environments.

In this blog we will focus on the simulation control plane, scalable compute backend, and ADDL integration.

Simulation control plane

In a typical simulation, there are tasks like data movement (copying input data and persisting the results). These steps can require high levels of parallelization or GPU support. In addition, they can involve third-party or proprietary software, specialized runtime environments, or architectures like ARM. To facilitate all task requirements, we will delegate task initiation to the AWS services that best match the requirements. Correct sequence of tasks is achieved by an orchestration layer. Decoupling the simulation orchestration from task initiation follows the key design pattern to separate concerns. This enables the architecture to be adapted to specific requirements.

For the high-level task orchestration, we’ll introduce a simulation control plane built on MWAA. Modeling all simulation tasks in a directed acyclic graph (DAG), MWAA performs scheduling and task initiation in the correct sequence. It also handles the integration with AWS’ services. You can intuitively monitor and debug the progress of a simulation run across different services and networks. For the workflow within CAEdge, we’ll use AWS Batch to run simulations that are packaged as Docker containers.

Parameterizable workflows: Airflow supports parameterization of DAGs by providing a JSON object when triggering a DAG run that can be accessed at runtime. This enables us to create a simulation execution framework in which we model the simulation steps. It lets you specify the simulations’ parameters such as which Docker container to execute, runtime configuration, or input and output data locations, see Figures 2 and 3.

Figure 2. Specify simulations’ parameters in JSON

Figure 2. Specify simulations’ parameters in JSON

Figure 3. Read simulations’ parameters from JSON

Figure 3. Read simulations’ parameters from JSON

Scalable compute backend

Publishing the Docker image: To run a simulation, you must provide a Docker container in Amazon Elastic Container Registry (ECR). Manually push Docker images to ECR using the command line interface (CLI) or using CI/CD pipeline automation.

Choosing the compute option: Containers at AWS describes the services options for developers. Various factors can contribute to your decision-making. For the CAEdge platform, we want to run thousands of containers with fine-grained control over the underlying compute instance’s configuration. We also want to run containers in privileged mode, so AWS Batch on EC2 is a great match. Figure 4 outlines the compute backend’s architecture.

Figure 4. Compute backend architecture diagram

Figure 4. Compute backend architecture diagram

To run a Docker container on AWS Batch, we need three components: A job definition, a compute environment, and a job queue. The AWS Batch job definition specifies which job to run and how to run it. For example, we can define the Docker image to use, and specify the vCPU and memory configuration. The job definition will be submitted to a job queue, which in turn is linked to one or more compute environments. The compute environment specifies the compute configuration used to run a containerized workload, in addition to instance types and storage configurations. Creating a compute environment describes this more in detail. In the section ‘ADDL integration,’ we’ll describe how to select and configure the EC2 container instances to maximize network and disk I/O.

The AWS Batch array job feature can spin up thousands of independent, but similarly configured jobs. Instead of submitting a single job for each input file from the simulation control plane, we can instead submit a single array job. This provides the collection of input files as input. AWS Batch can spin up child jobs to process each entry of the collection in parallel, reducing operational overhead drastically.

With these components, we can now submit a job definition to a job queue from the simulation control plane, which will initiate on the corresponding compute environment.

ADDL integration

In the CAEdge platform, all simulation recordings and their metadata are stored and cataloged in the data lake. On average, single recordings are around 100 GB in size with simulation requests containing 100–300 recordings. A simulation request can therefore require 10–30 TB of data movement from S3 to the containers before the simulation starts. The containers’ performance will directly depend on I/O performance, as the input data is read and processed during execution. To provide the highest performance of data transfer and simulation workload, we need data storage options that maximize network and disk I/O throughput.

Choosing the storage option: For CAEdge’s simulation workloads, the storage solution acts as a temporary scratch space for the simulation containers’ input data. It should maximize I/O throughput. These requirements are met in the most cost-efficient way by choosing EC2 instances of the M5d family and their attached instance storage.

The M5d are general-purpose instances with NVMe-based SSDs, which are physically connected to the host server and provide block-level storage coupled to the lifetime of the instance. As described in the previous section, we configured AWS Batch to create compute environments using EC2’s M5d instances. While other storage options like Amazon Elastic File System (EFS) and Amazon Elastic Block Storage (EBS) can scale to match even demanding throughput scenarios, the simulation benefits from a high-performance, temporary storage location that is directly attached to the host.

Bundling the volumes: M5d instances can have multiple NVMe drives attached, depending on their size and storage configuration. To provide a single stable storage location for the simulation containers, bundle all attached NVMe SSDs into a single RAID0 volume during the instance’s launch, using a modified user data script. With this, we can provide a fixed storage location that can be accessed from the simulation containers. Additionally, we are distributing I/O operations evenly across all disks in the RAID0 configuration, which improves the read and write performance.

KPIs used for measurement

The SIL factor tells you how long the simulation takes compared to the duration of the recorded data.

SIL factor example:

  • For a recording with 60 minutes of data and a simulation duration of 120 minutes, the resulting SIL factor is 120 / 60 = 2. The simulation runs twice as long as real time.
  • For a recording with 60 minutes of data and a simulation duration of 60 minutes, the resulting SIL factor is 60 / 60 = 1. The simulation runs in real time.

Similarly, the aggregated SIL factor tells you how long the simulation for multiple recordings takes compared to the duration of the recorded data. It also factors in the horizontal scaling capabilities.

Aggregated SIL factor example:

  • For 3 recordings with 60 minutes of data and an overall simulation duration of 120 minutes, the resulting SIL factor is 120 / 3 * 60 = 0.67. The overall simulation is a third faster than real time.

Performance results

The storage optimizations described in the ADDL integration KPI section preceding, led to an improvement of the overall simulation duration of 50%. To benchmark the overall simulation platform’s performance, we created a simulation run with 15 recordings. The simulation run completed successfully with an aggregated SIL factor of 0.363, or, alternatively put, the simulation was roughly three times faster than real time. During production use, the platform will handle simulation runs with an average of 100-300 recordings. For these runs, the aggregated SIL factor is expected to be even smaller, as more simulations can be processed in parallel.

Conclusion

In this post, we showed how to design a platform for ADAS simulation workloads using the Bring Your Own Software-In-the-Loop (BYO-SIL) concept. We covered the key components including simulation control plane, scalable compute backend, and ADDL integration based on Continental Automotive Edge (CAEdge). We discussed the key performance benefits due to horizontal scalability and how to choose an optimized storage integration pattern. This led to an improvement of the overall simulation duration of 50%.

How ENGIE scales their data ingestion pipelines using Amazon MWAA

Post Syndicated from Anouar Zaaber original https://aws.amazon.com/blogs/big-data/how-engie-scales-their-data-ingestion-pipelines-using-amazon-mwaa/

ENGIE—one of the largest utility providers in France and a global player in the zero-carbon energy transition—produces, transports, and deals electricity, gas, and energy services. With 160,000 employees worldwide, ENGIE is a decentralized organization and operates 25 business units with a high level of delegation and empowerment. ENGIE’s decentralized global customer base had accumulated lots of data, and it required a smarter, unique approach and solution to align its initiatives and provide data that is ingestible, organizable, governable, sharable, and actionable across its global business units.

In 2018, the company’s business leadership decided to accelerate its digital transformation through data and innovation by becoming a data-driven company. Yves Le Gélard, chief digital officer at ENGIE, explains the company’s purpose: “Sustainability for ENGIE is the alpha and the omega of everything. This is our raison d’être. We help large corporations and the biggest cities on earth in their attempts to transition to zero carbon as quickly as possible because it is actually the number one question for humanity today.”

ENGIE, as with any other big enterprise, is using multiple extract, transform, and load (ETL) tools to ingest data into their data lake on AWS. Nevertheless, they usually have expensive licensing plans. “The company needed a uniform method of collecting and analyzing data to help customers manage their value chains,” says Gregory Wolowiec, the Chief Technology Officer who leads ENGIE’s data program. ENGIE wanted a free-license application, well integrated with multiple technologies and with a continuous integration, continuous delivery (CI/CD) pipeline to more easily scale all their ingestion process.

ENGIE started using Amazon Managed Workflows for Apache Airflow (Amazon MWAA) to solve this issue and started moving various data sources from on-premise applications and ERPs, AWS services like Amazon Redshift, Amazon Relational Database Service (Amazon RDS), Amazon DynamoDB, external services like Salesforce, and other cloud providers to a centralized data lake on top of Amazon Simple Storage Service (Amazon S3).

Amazon MWAA is used in particular to collect and store harmonized operational and corporate data from different on-premises and software as a service (SaaS) data sources into a centralized data lake. The purpose of this data lake is to create a “group performance cockpit” that enables an efficient, data-driven analysis and thoughtful decision-making by the Engie Management board.

In this post, we share how ENGIE created a CI/CD pipeline for an Amazon MWAA project template using an AWS CodeCommit repository and plugged it into AWS CodePipeline to build, test, and package the code and custom plugins. In this use case, we developed a custom plugin to ingest data from Salesforce based on the Airflow Salesforce open-source plugin.

Solution overview

The following diagrams illustrate the solution architecture defining the implemented Amazon MWAA environment and its associated pipelines. It also describes the customer use case for Salesforce data ingestion into Amazon S3.

The following diagram shows the architecture of the deployed Amazon MWAA environment and the implemented pipelines.

The preceding architecture is fully deployed via infrastructure as code (IaC). The implementation includes the following:

  • Amazon MWAA environment – A customizable Amazon MWAA environment packaged with plugins and requirements and configured in a secure manner.
  • Provisioning pipeline – The admin team can manage the Amazon MWAA environment using the included CI/CD provisioning pipeline. This pipeline includes a CodeCommit repository plugged into CodePipeline to continuously update the environment and its plugins and requirements.
  • Project pipeline – This CI/CD pipeline comes with a CodeCommit repository that triggers CodePipeline to continuously build, test and deploy DAGs developed by users. Once deployed, these DAGs are made available in the Amazon MWAA environment.

The following diagram shows the data ingestion workflow, which includes the following steps:

  1. The DAG is triggered by Amazon MWAA manually or based on a schedule.
  2. Amazon MWAA initiates data collection parameters and calculates batches.
  3. Amazon MWAA distributes processing tasks among its workers.
  4. Data is retrieved from Salesforce in batches.
  5. Amazon MWAA assumes an AWS Identity and Access Management (IAM) role with the necessary permissions to store the collected data into the target S3 bucket.

This AWS Cloud Development Kit (AWS CDK) construct is implemented with the following security best practices:

  • With the principle of least privilege, you grant permissions to only the resources or actions that users need to perform tasks.
  • S3 buckets are deployed with security compliance rules: encryption, versioning, and blocking public access.
  • Authentication and authorization management is handled with AWS Single Sign-On (AWS SSO).
  • Airflow stores connections to external sources in a secure manner either in Airflow’s default secrets backend or an alternative secrets backend such as AWS Secrets Manager or AWS Systems Manager Parameter Store.

For this post, we step through a use case using the data from Salesforce to ingest it into an ENGIE data lake in order to transform it and build business reports.

Prerequisites for deployment

For this walkthrough, the following are prerequisites:

  • Basic knowledge of the Linux operating system
  • Access to an AWS account with administrator or power user (or equivalent) IAM role policies attached
  • Access to a shell environment or optionally with AWS CloudShell

Deploy the solution

To deploy and run the solution, complete the following steps:

  1. Install AWS CDK.
  2. Bootstrap your AWS account.
  3. Define your AWS CDK environment variables.
  4. Deploy the stack.

Install AWS CDK

The described solution is fully deployed with AWS CDK.

AWS CDK is an open-source software development framework to model and provision your cloud application resources using familiar programming languages. If you want to familiarize yourself with AWS CDK, the AWS CDK Workshop is a great place to start.

Install AWS CDK using the following commands:

npm install -g aws-cdk
# To check the installation
cdk --version

Bootstrap your AWS account

First, you need to make sure the environment where you’re planning to deploy the solution to has been bootstrapped. You only need to do this one time per environment where you want to deploy AWS CDK applications. If you’re unsure whether your environment has been bootstrapped already, you can always run the command again:

cdk bootstrap aws://YOUR_ACCOUNT_ID/YOUR_REGION

Define your AWS CDK environment variables

On Linux or MacOS, define your environment variables with the following code:

export CDK_DEFAULT_ACCOUNT=YOUR_ACCOUNT_ID
export CDK_DEFAULT_REGION=YOUR_REGION

On Windows, use the following code:

setx CDK_DEFAULT_ACCOUNT YOUR_ACCOUNT_ID
setx CDK_DEFAULT_REGION YOUR_REGION

Deploy the stack

By default, the stack deploys a basic Amazon MWAA environment with the associated pipelines described previously. It creates a new VPC in order to host the Amazon MWAA resources.

The stack can be customized using the parameters listed in the following table.

To pass a parameter to the construct, you can use the AWS CDK runtime context. If you intend to customize your environment with multiple parameters, we recommend using the cdk.json context file with version control to avoid unexpected changes to your deployments. Throughout our example, we pass only one parameter to the construct. Therefore, for the simplicity of the tutorial, we use the the --context or -c option to the cdk command, as in the following example:

cdk deploy -c paramName=paramValue -c paramName=paramValue ...
Parameter Description Default Valid values
vpcId VPC ID where the cluster is deployed. If none, creates a new one and needs the parameter cidr in that case. None VPC ID
cidr The CIDR for the VPC that is created to host Amazon MWAA resources. Used only if the vpcId is not defined. 172.31.0.0/16 IP CIDR
subnetIds Comma-separated list of subnets IDs where the cluster is deployed. If none, looks for private subnets in the same Availability Zone. None Subnet IDs list (coma separated)
envName Amazon MWAA environment name MwaaEnvironment String
envTags Amazon MWAA environment tags None See the following JSON example: '{"Environment":"MyEnv", "Application":"MyApp", "Reason":"Airflow"}'
environmentClass Amazon MWAA environment class mw1.small mw1.small, mw1.medium, mw1.large
maxWorkers Amazon MWAA maximum workers 1 int
webserverAccessMode Amazon MWAA environment access mode (private or public) PUBLIC_ONLY PUBLIC_ONLY, PRIVATE_ONLY
secretsBackend Amazon MWAA environment secrets backend Airflow SecretsManager

Clone the GitHub repository:

git clone https://github.com/aws-samples/cdk-amazon-mwaa-cicd

Deploy the stack using the following command:

cd mwaairflow && \
pip install . && \
cdk synth && \
cdk deploy -c vpcId=YOUR_VPC_ID

The following screenshot shows the stack deployment:

The following screenshot shows the deployed stack:

Create solution resources

For this walkthrough, you should have the following prerequisites:

If you don’t have a Salesforce account, you can create a SalesForce developer account:

  1. Sign up for a developer account.
  2. Copy the host from the email that you receive.
  3. Log in into your new Salesforce account
  4. Choose the profile icon, then Settings.
  5. Choose Reset my Security Token.
  6. Check your email and copy the security token that you receive.

After you complete these prerequisites, you’re ready to create the following resources:

  • An S3 bucket for Salesforce output data
  • An IAM role and IAM policy to write the Salesforce output data on Amazon S3
  • A Salesforce connection on the Airflow UI to be able to read from Salesforce
  • An AWS connection on the Airflow UI to be able to write on Amazon S3
  • An Airflow variable on the Airflow UI to store the name of the target S3 bucket

Create an S3 bucket for Salesforce output data

To create an output S3 bucket, complete the following steps:

  1. On the Amazon S3 console, choose Create bucket.

The Create bucket wizard opens.

  1. For Bucket name, enter a DNS-compliant name for your bucket, such as airflow-blog-post.
  2. For Region, choose the Region where you deployed your Amazon MWAA environment, for example, US East (N. Virginia) us-east-1.
  3. Choose Create bucket.

For more information, see Creating a bucket.

Create an IAM role and IAM policy to write the Salesforce output data on Amazon S3

In this step, we create an IAM policy that allows Amazon MWAA to write on your S3 bucket.

  1. On the IAM console, in the navigation pane, choose Policies.
  2. Choose Create policy.
  3. Choose the JSON tab.
  4. Enter the following JSON policy document, and replace airflow-blog-post with your bucket name:
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": ["arn:aws:s3:::airflow-blog-post"]
        },
        {
          "Effect": "Allow",
          "Action": [
            "s3:PutObject",
            "s3:GetObject",
            "s3:DeleteObject"
          ],
          "Resource": ["arn:aws:s3:::airflow-blog-post/*"]
        }
      ]
    }

  5. Choose Next: Tags.
  6. Choose Next: Review.
  7. For Name, choose a name for your policy (for example, airflow_data_output_policy).
  8. Choose Create policy.

Let’s attach the IAM policy to a new IAM role that we use in our Airflow connections.

  1. On the IAM console, choose Roles in the navigation pane and then choose Create role.
  2. In the Or select a service to view its use cases section, choose S3.
  3. For Select your use case, choose S3.
  4. Search for the name of the IAM policy that we created in the previous step (airflow_data_output_role) and select the policy.
  5. Choose Next: Tags.
  6. Choose Next: Review.
  7. For Role name, choose a name for your role (airflow_data_output_role).
  8. Review the role and then choose Create role.

You’re redirected to the Roles section.

  1. In the search box, enter the name of the role that you created and choose it.
  2. Copy the role ARN to use later to create the AWS connection on Airflow.

Create a Salesforce connection on the Airflow UI to be able to read from Salesforce

To read data from Salesforce, we need to create a connection using the Airflow user interface.

  1. On the Airflow UI, choose Admin.
  2. Choose Connections, and then the plus sign to create a new connection.
  3. Fill in the fields with the required information.

The following table provides more information about each value.

Field Mandatory Description Values
Conn Id Yes Connection ID to define and to be used later in the DAG For example, salesforce_connection
Conn Type Yes Connection type HTTP
Host Yes Salesforce host name host-dev-ed.my.salesforce.com or host.lightning.force.com. Replace the host with your Salesforce host and don’t add the http:// as prefix.
Login Yes The Salesforce user name. The user must have read access to the salesforce objects. [email protected]
Password Yes The corresponding password for the defined user. MyPassword123
Port No Salesforce instance port. By default, 443. 443
Extra Yes Specify the extra parameters (as a JSON dictionary) that can be used in the Salesforce connection. security_token is the Salesforce security token for authentication. To get the Salesforce security token in your email, you must reset your security token. {"security_token":"AbCdE..."}

Create an AWS connection in the Airflow UI to be able to write on Amazon S3

An AWS connection is required to upload data into Amazon S3, so we need to create a connection using the Airflow user interface.

  1. On the Airflow UI, choose Admin.
  2. Choose Connections, and then choose the plus sign to create a new connection.
  3. Fill in the fields with the required information.

The following table provides more information about the fields.

Field Mandatory Description Value
Conn Id Yes Connection ID to define and to be used later in the DAG For example, aws_connection
Conn Type Yes Connection type Amazon Web Services
Extra Yes It is required to specify the Region. You also need to provide the role ARN that we created earlier.
{
"region":"eu-west-1",
"role_arn":"arn:aws:iam::123456789101:role/airflow_data_output_role "
}

Create an Airflow variable on the Airflow UI to store the name of the target S3 bucket

We create a variable to set the name of the target S3 bucket. This variable is used by the DAG. So, we need to create a variable using the Airflow user interface.

  1. On the Airflow UI, choose Admin.
  2. Choose Variables, then choose the plus sign to create a new variable.
  3. For Key, enter bucket_name.
  4. For Val, enter the name of the S3 bucket that you created in a previous step (airflow-blog-post).

Create and deploy a DAG in Amazon MWAA

To be able to ingest data from Salesforce into Amazon S3, we need to create a DAG (Directed Acyclic Graph). To create and deploy the DAG, complete the following steps:

  1. Create a local Python DAG.
  2. Deploy your DAG using the project CI/CD pipeline.
  3. Run your DAG on the Airflow UI.
  4. Display your data in Amazon S3 (with S3 Select).

Create a local Python DAG

The provided SalesForceToS3Operator allows you to ingest data from Salesforce objects to an S3 bucket. Refer to standard Salesforce objects for the full list of objects you can ingest data from with this Airflow operator.

In this use case, we ingest data from the Opportunity Salesforce object. We retrieve the last 6 months’ data in monthly batches and we filter on a specific list of fields.

The DAG provided in the sample in GitHub repository imports the last 6 months of the Opportunity object (one file by month) by filtering the list of retrieved fields.

This operator takes two connections as parameters:

  • An AWS connection that is used to upload ingested data into Amazon S3.
  • A Salesforce connection to read data from Salesforce.

The following table provides more information about the parameters.

Parameter Type Mandatory Description
sf_conn_id string Yes Name of the Airflow connection that has the following information:

  • user name
  • password
  • security token
sf_obj string Yes Name of the relevant Salesforce object (Account, Lead, Opportunity)
s3_conn_id string Yes The destination S3 connection ID
s3_bucket string Yes The destination S3 bucket
s3_key string Yes The destination S3 key
sf_fields string No The (optional) list of fields that you want to get from the object (Id, Name, and so on).
If none (the default), then this gets all fields for the object.
fmt string No The (optional) format that the S3 key of the data should be in.
Possible values include CSV (default), JSON, and NDJSON.
from_date date format No A specific date-time (optional) formatted input to run queries from for incremental ingestion.
Evaluated against the SystemModStamp attribute.
Not compatible with the query parameter and should be in date-time format (for example, 2021-01-01T00:00:00Z).
Default: None
to_date date format No A specific date-time (optional) formatted input to run queries to for incremental ingestion.
Evaluated against the SystemModStamp attribute.
Not compatible with the query parameter and should be in date-time format (for example, 2021-01-01T00:00:00Z).
Default: None
query string No A specific query (optional) to run for the given object.
This overrides default query creation.
Default: None
relationship_object string No Some queries require relationship objects to work, and these are not the same names as the Salesforce object.
Specify that relationship object here (optional).
Default: None
record_time_added boolean No Set this optional value to true if you want to add a Unix timestamp field to the resulting data that marks when the data was fetched from Salesforce.
Default: False
coerce_to_timestamp boolean No Set this optional value to true if you want to convert all fields with dates and datetimes into Unix timestamp (UTC).
Default: False

The first step is to import the operator in your DAG:

from operators.salesforce_to_s3_operator import SalesforceToS3Operator

Then define your DAG default ARGs, which you can use for your common task parameters:

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': '[email protected]',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'retries': 0,
    'retry_delay': timedelta(minutes=1),
    'sf_conn_id': 'salesforce_connection',
    's3_conn_id': 'aws_connection',
    's3_bucket': 'salesforce-to-s3',
}
...

Finally, you define the tasks to use the operator.

The following examples illustrate some use cases.

Salesforce object full ingestion

This task ingests all the content of the Salesforce object defined in sf_obj. This selects all the object’s available fields and writes them into the defined format in fmt. See the following code:

...
salesforce_to_s3 = SalesforceToS3Operator(
    task_id="Opportunity_to_S3",
    sf_conn_id=default_args["sf_conn_id"],
    sf_obj="Opportunity",
    fmt="ndjson",
    s3_conn_id=default_args["s3_conn_id"],
    s3_bucket=default_args["s3_bucket"],
    s3_key=f"salesforce/raw/dt={s3_prefix}/{table.lower()}.json",
    dag=salesforce_to_s3_dag,
)
...

Salesforce object partial ingestion based on fields

This task ingests specific fields of the Salesforce object defined in sf_obj. The selected fields are defined in the optional sf_fields parameter. See the following code:

...
salesforce_to_s3 = SalesforceToS3Operator(
    task_id="Opportunity_to_S3",
    sf_conn_id=default_args["sf_conn_id"],
    sf_obj="Opportunity",
    sf_fields=["Id","Name","Amount"],
    fmt="ndjson",
    s3_conn_id=default_args["s3_conn_id"],
    s3_bucket=default_args["s3_bucket"],
    s3_key=f"salesforce/raw/dt={s3_prefix}/{table.lower()}.json",
    dag=salesforce_to_s3_dag,
)
...

Salesforce object partial ingestion based on time period

This task ingests all the fields of the Salesforce object defined in sf_obj. The time period can be relative using from_date or to_date parameters or absolute by using both parameters.

The following example illustrates relative ingestion from the defined date:

...
salesforce_to_s3 = SalesforceToS3Operator(
    task_id="Opportunity_to_S3",
    sf_conn_id=default_args["sf_conn_id"],
    sf_obj="Opportunity",
    from_date="YESTERDAY",
    fmt="ndjson",
    s3_conn_id=default_args["s3_conn_id"],
    s3_bucket=default_args["s3_bucket"],
    s3_key=f"salesforce/raw/dt={s3_prefix}/{table.lower()}.json",
    dag=salesforce_to_s3_dag,
)
...

The from_date and to_date parameters support Salesforce date-time format. It can be either a specific date or literal (for example TODAY, LAST_WEEK, LAST_N_DAYS:5). For more information about date formats, see Date Formats and Date Literals.

For the full DAG, refer to the sample in GitHub repository.

This code dynamically generates tasks that run queries to retrieve the data of the Opportunity object in the form of 1-month batches.

The sf_fields parameter allows us to extract only the selected fields from the object.

Save the DAG locally as salesforce_to_s3.py.

Deploy your DAG using the project CI/CD pipeline

As part of the CDK deployment, a CodeCommit repository and CodePipeline pipeline were created in order to continuously build, test, and deploy DAGs into your Amazon MWAA environment.

To deploy the new DAG, the source code should be committed to the CodeCommit repository. This triggers a CodePipeline run that builds, tests, and deploys your new DAG and makes it available in your Amazon MWAA environment.

  1. Sign in to the CodeCommit console in your deployment Region.
  2. Under Source, choose Repositories.

You should see a new repository mwaaproject.

  1. Push your new DAG in the mwaaproject repository under dags. You can either use the CodeCommit console or the Git command line to do so:
    1. CodeCommit console:
      1. Choose the project CodeCommit repository name mwaaproject and navigate under dags.
      2. Choose Add file and then Upload file and upload your new DAG.
    2. Git command line:
      1. To be able to clone and access your CodeCommit project with the Git command line, make sure Git client is properly configured. Refer to Setting up for AWS CodeCommit.
      2. Clone the repository with the following command after replacing <region> with your project Region:
        git clone https://git-codecommit.<region>.amazonaws.com/v1/repos/mwaaproject

      3. Copy the DAG file under dags and add it with the command:
        git add dags/salesforce_to_s3.py

      4. Commit your new file with a message:
        git commit -m "add salesforce DAG"

      5. Push the local file to the CodeCommit repository:
        git push

The new commit triggers a new pipeline that builds, tests, and deploys the new DAG. You can monitor the pipeline on the CodePipeline console.

  1. On the CodePipeline console, choose Pipeline in the navigation pane.
  2. On the Pipelines page, you should see mwaaproject-pipeline.
  3. Choose the pipeline to display its details.

After checking that the pipeline run is successful, you can verify that the DAG is deployed to the S3 bucket and therefore available on the Amazon MWAA console.

  1. On the Amazon S3 console, look for a bucket starting with mwaairflowstack-mwaaenvstackne and go under dags.

You should see the new DAG.

  1. On the Amazon MWAA console, choose DAGs.

You should be able to see the new DAG.

Run your DAG on the Airflow UI

Go to the Airflow UI and toggle on the DAG.

This triggers your DAG automatically.

Later, you can continue manually triggering it by choosing the run icon.

Choose the DAG and Graph View to see the run of your DAG.

If you have any issue, you can check the logs of the failed tasks from the task instance context menu.

Display your data in Amazon S3 (with S3 Select)

To display your data, complete the following steps:

  1. On the Amazon S3 console, in the Buckets list, choose the name of the bucket that contains the output of the Salesforce data (airflow-blog-post).
  2. In the Objects list, choose the name of the folder that has the object that you copied from Salesforce (opportunity).
  3. Choose the raw folder and the dt folder with the latest timestamp.
  4. Select any file.
  5. On the Actions menu, choose Query with S3 Select.
  6. Choose Run SQL query to preview the data.

Clean up

To avoid incurring future charges, delete the AWS CloudFormation stack and the resources that you deployed as part of this post.

  1. On the AWS CloudFormation console, delete the stack MWAAirflowStack.

To clean up the deployed resources using the AWS Command Line Interface (AWS CLI), you can simply run the following command:

cdk destroy MWAAirflowStack

Make sure you are in the root path of the project when you run the command.

After confirming that you want to destroy the CloudFormation stack, the solution’s resources are deleted from your AWS account.

The following screenshot shows the process of deploying the stack:

The following screenshot confirms the stack is undeployed.

  1. Navigate to the Amazon S3 console and locate the two buckets containing mwaairflowstack-mwaaenvstack and mwaairflowstack-mwaaproj that were created during the deployment.
  2. Select each bucket delete its contents, then delete the bucket.
  3. Delete the IAM role created to write on the S3 buckets.

Conclusion

ENGIE discovered significant value by using Amazon MWAA, enabling its global business units to ingest data in more productive ways. This post presented how ENGIE scaled their data ingestion pipelines using Amazon MWAA. The first part of the post described the architecture components and how to successfully deploy a CI/CD pipeline for an Amazon MWAA project template using a CodeCommit repository and plug it into CodePipeline to build, test, and package the code and custom plugins. The second part walked you through the steps to automate the ingestion process from Salesforce using Airflow with an example. For the Airflow configuration, you used Airflow variables, but you can also use Secrets Manager with Amazon MWAA using the secretsBackend parameter when deploying the stack.

The use case discussed in this post is just one example of how you can use Amazon MWAA to make it easier to set up and operate end-to-end data pipelines in the cloud at scale. For more information about Amazon MWAA, check out the User Guide.


About the Authors

Anouar Zaaber is a Senior Engagement Manager in AWS Professional Services. He leads internal AWS, external partner, and customer teams to deliver AWS cloud services that enable the customers to realize their business outcomes.

Amine El Mallem is a Data/ML Ops Engineer in AWS Professional Services. He works with customers to design, automate, and build solutions on AWS for their business needs.

Armando Segnini is a Data Architect with AWS Professional Services. He spends his time building scalable big data and analytics solutions for AWS Enterprise and Strategic customers. Armando also loves to travel with his family all around the world and take pictures of the places he visits.

Mohamed-Ali Elouaer is a DevOps Consultant with AWS Professional Services. He is part of the AWS ProServe team, helping enterprise customers solve complex problems related to automation, security, and monitoring using AWS services. In his free time, he likes to travel and watch movies.

Julien Grinsztajn is an Architect at ENGIE. He is part of the Digital & IT Consulting ENGIE IT team working on the definition of the architecture for complex projects related to data integration and network security. In his free time, he likes to travel the oceans to meet sharks and other marine creatures.

Ingesting Automotive Sensor Data using DXC RoboticDrive Ingestor on AWS

Post Syndicated from Bryan Berezdivin original https://aws.amazon.com/blogs/architecture/ingesting-automotive-sensor-data-using-dxc-roboticdrive-ingestor-on-aws/

This post was co-written by Pawel Kowalski, a Technical Product Manager for DXC RoboticDrive and Dr. Max Böhm, a software and systems architect and DXC Distinguished Engineer.

To build the first fully autonomous vehicle, L5 standard per SAE, auto-manufacturers collected sensor data from test vehicle fleets across the globe in their testing facilities and driving circuits. It wasn’t easy collecting, ingesting and managing massive amounts of data, and that’s before it’s used for any meaningful processing, and training the AI self-driving algorithms. In this blog post, we outline a solution using DCX RoboticDrive Ingestor to address the following challenges faced by automotive original equipment manufacturers (OEMs) when ingesting sensor data from the car to the data lake:

  • Each test drive alone can gather hundreds of terabytes of data per fleet. A few vehicles doing test fleets can produce petabyte(s) worth of data in a day.
  • Vehicles collect data from all the sensors into local storage with fast disks. Disks must offload data to a buffer station and then to a data lake as soon as possible to be reused in the test drives.
  • Data is stored in binary file formats like ROSbag, ADTF or MDF4, that are difficult to read for any pre-ingest checks and security scans.
  • On-premises (ingest location) to Cloud data ingestion requires a hybrid solution with reliable connectivity to the cloud.
  • Ingesting process depositing files into a data lake without organizing them first can cause discovery and read performance issues later in the analytics phase.
  • Orchestrating and scaling an ingest solution can involve several steps, such as copy, security scans, metadata extraction, anonymization, annotation, and more.

Overview of the RoboticDrive Ingestor solution

The DXC RoboticDrive Ingestor (RDI) solution addresses the core requirements as follows:

  • Cloud ready:  Implements a recommended architectural pattern where data from globally located logger copy stations are ingested into the cloud data lake in one or more AWS Regions over AWS Direct Connect or Site-to-Site VPN connections.
  • Scalable: Runs multiple steps as defined in an Ingest Pipeline, containerized on Amazon EKS based Amazon EC2 compute nodes that can auto-scale horizontally and vertically on AWS. Steps can be for example, Virus Scan, Copy to Amazon S3, Quality Check, Metadata Extraction, and others.
  • Performant: Uses the Apache Spark-based DXC RoboticDrive Analyzer component to process large amounts of sensor data files efficiently in parallel, which will be described in a future blog post. Analyzer can read and processes native automotive file formats, extract and prepare metadata, and populate metadata into the DXC RoboticDrive Meta Data Store. The maximum throughput of network and disk is leveraged and data files are placed on the data lake following a hierarchical prefix domain model, that aligns with the best practices around S3 performance.
  • Operations ready: Containerized deployment, custom monitoring UI, visual workflow management with MWAA, Amazon CloudWatch based monitoring for virtual infrastructure and cloud native services.
  • Modular: Programmatically add or customize steps into the managed Ingest Pipeline DAG, defined in Amazon Managed Workflows for Apache Airflow (MWAA).
  • Security: IAM based access policy and technical roles, Amazon Cognito based UI authentication, virus scan, and an optional data anonymization step.

The ingest pipeline copies new files to S3 followed by user configurable steps, such as Virus Scan or Integrity Checks. When new files have been uploaded, cloud native S3 event notifications generate messages in an Amazon SQS queue. These are then consumed by serverless AWS Lambda functions or by containerized workloads in the EKS cluster, to asynchronously perform metadata extraction steps.

Walkthrough

There are two main areas of the ingest solution:

  1. On-Premises devices – these are dedicated copy stations in the automotive OEM’s data center where data cartridges are physically connected. Copy stations are easily configurable devices that automatically mount inserted data cartridges and share its content via NFS. At the end of the mounting process, they call an HTTP post API to trigger the ingest process.
  2. Managed Ingest Pipeline – this is a Managed service in the cloud to orchestrate the ingest process. As soon as a cartridge is inserted into the Copy Station, mounted and shared via NFS, the ingest process can be started.
Figure 1 - Architecture showing the DXC RoboticDrive Ingestor (RDI) solution

Figure 1 – Architecture showing the DXC RoboticDrive Ingestor (RDI) solution

Depending on the customer’s requirements, the ingest process may differ. It can be easily modified using predefined Docker images. A configuration file defines what features from the Ingest stack should be used. It can be a simple copy from the source to the cloud but we can easily extend it with additional steps, such as a virus scan, special data partitioning, data quality checks on even scripts provided by the automotive OEM.

The ingest process is orchestrated by a dedicated MWAA pipeline built dynamically from the configuration file. We can easily track the progress, see the logs and, if required, stop or restart the process. Progress is tracked at a detailed level, including the number and volume of data uploaded, remaining files, estimated finish time, and ingest summary, all visualized in a dedicated ingest UI.

Prerequisites

Ingest pipeline

To build an MWAA pipeline, we need to choose from prebuilt components of the ingestor and add them into the JSON configuration, which is stored in an Airflow variable:

{
    "tasks" : [
        {
            "name" : "Virus_Scan",
            "type" : "copy",
            "srcDir" : "file:///mnt/ingestor/CopyStation1/",
            "quarantineDir" : "s3a://k8s-ingestor/CopyStation1/quarantine/”
        },
        {
            "name" : "Copy_to_S3",
            "type" : "copy",
            "srcDir" : "file:///mnt/ingestor/car01/test.txt",
            "dstDir" : "s3a://k8s-ingestor/copy_dest/",
            "integrity_check" : false
        },
        {
            "name" : "Quality_Check",
            "type" : "copy",
            "srcDir" : "file:///mnt/ingestor/car01/test.txt",
            "InvalidFilesDir" : "s3a://k8s-ingestor/CopyStation1/invalid/",
        }
    ]
}

The final ingest pipeline, a Directed Acyclic Graph (DAG), is built automatically from the configuration and changes are visible within seconds:

Figure 2 - Screenshot showing the final ingest pipeline, a Directed Acyclic Graph (DAG)

Figure 2 – Screenshot showing the final ingest pipeline, a Directed Acyclic Graph (DAG)

If required, external tools or processes can also call ingest components using the Ingest API, which is shown in the following screenshot:

Figure 3 - Screenshot showing a tool being used to call ingest components using the Ingest API

Figure 3 – Screenshot showing a tool being used to call ingest components using the Ingest API

The usual ingest process consists of the following steps:

  1. Ingest data source (cartridge) into the Copy Station

2. Wait for Copy Station to report proper cartridge mounting. In case a dedicated device (Copy Station) is not used, and you want to use an external disc as your data source, it can be mounted manually:

mount -o ro /dev/sda /media/disc

Here, /dev/sda is the disc containing your data and /media/disc is the mountpoint, which must also be added to /etc/exports.

3. Every time the MWAA ingest pipeline (DAG) is triggered, a Kubernetes job is created through a Helm template and Helm values which add the above data source as a persistent volume (PV) mount inside the job.

Helm template is as follows:

apiVersion: v1

kind: PersistentVolume

metadata:

 name: {{ .name }}

 namespace: {{ .Values.namespace }}

spec:

 capacity:

  storage: {{ .storage }}

 accessModes:

  - {{ .accessModes }}

nfs:

 server: {{ .server }}

 path: {{ .path }}

Helm values:

name: "YourResourceName"

server: 10.1.1.44

path: "/cardata"

mountPath: "/media/disc"

storage: 100Mi

accessModes: ReadOnlyMany

4.  Trigger ingest manually from MWAA UI – this can also be automatically triggered from a Copy Station by calling MWAA API:

curl -X POST "https://roboticdrive/api/v1/dags/ingest_dag/dagRuns" -H "accept: application/json" -H "Content-Type: application/json" -d "{\"dag_run_id\":\"IngestDAG\"}"

5.  Watch ingest progress – this can be omitted as the ingest results, such as success, failure, can be notified by email or reported to a ticketing system.

6.  Once successfully uploaded, turn off the Copy Station manually or it can be the last task of a flow in the following ingest DAG:

(…)

def ssh_operator(task_identifier: str, cmd: str):

    return SSHOperator(task_id=task_identifier,

                       command=cmd + ' ',

                       remote_host=CopyStation,

                       ssh_hook=sshHook,

                       trigger_rule=TriggerRule.NONE_FAILED,

                       do_xcom_push=True,

                       dag= IngestDAG)

host_shutdown = ssh_operator(task_identifier='host_shutdown', \

                             cmd='sudo shutdown –h +5 &')

When launched, you can track the progress and basic KPIs on the Monitoring UI:

Figure 5 - Screenshot basic KPIs on the Monitoring UI

Figure 4 – Screenshot basic KPIs on the Monitoring UI

Conclusion

In this blog post, we showed how to set up the DXC RoboticDrive Ingestor. This solution was designed to overcome several data ingestion challenges and resulted in the following positive results:

  • Time-to-Analyze KPI decreased by 30% in our tests, local offload steps become obsolete, and data offloaded from cars were accessible on a globally accessible data-lake (S3) for further processing and analysis.
  • MWAA integrated with EKS made the solution flexible, fault tolerant, as well as easy to manage and maintain.
  • Autoscaling the compute capacity as needed for any pre-processing steps within ingestion, helped with boosting performance and productivity.
  • On-premises to AWS cloud connectivity options provided flexibility in configuring and scaling (up and down), and provides optimal performance and bandwidth.

We hope you found this solution insightful and look forward to hearing your feedback in the comments!

Dr. Max Böhm

Dr. Max Böhm

Dr. Max Böhm is a software and systems architect and DXC Distinguished Engineer with over 20 years’ experience in the IT industry and 9 years of research and teaching at universities. He advises customers across industries on their digital journeys, and has created innovative solutions, including real-time management views for global IT infrastructures and data correlation tools that simplify consumption-based billing. He currently works as a solution architect for DXC Robotic Drive. Max has authored or co-authored over 20 research papers and four patents. He has participated in key industry-research collaborations including projects at CERN.

Pawel Kowalski

Pawel Kowalski

Pawel Kowalski is a Technical Product Manager for DXC RoboticDrive where he leads the Data Value Services stream. His current area of focus is to drive solution development for large-scale (PB) end-to-end data ingestion use cases, ensuring performance and reliability. With over 10 years of experience in Big Data Analytics & Business Intelligence, Pawel has designed and delivered numerous customer tailored solutions.

Batch Inference at Scale with Amazon SageMaker

Post Syndicated from Ramesh Jetty original https://aws.amazon.com/blogs/architecture/batch-inference-at-scale-with-amazon-sagemaker/

Running machine learning (ML) inference on large datasets is a challenge faced by many companies. There are several approaches and architecture patterns to help you tackle this problem. But no single solution may deliver the desired results for efficiency and cost effectiveness. In this blog post, we will outline a few factors that can help you arrive at the most optimal approach for your business. We will illustrate a use case and architecture pattern with Amazon SageMaker to perform batch inference at scale.

ML inference can be done in real time on individual records, such as with a REST API endpoint. Inference can also be done in batch mode as a processing job on a large dataset. While both approaches push data through a model, each has its own target goal when running inference at scale.

With real-time inference, the goal is usually to optimize the number of transactions per second that the model can process. With batch inference, the goal is usually tied to time constraints and the service-level agreement (SLA) for the job. Table 1 shows the key attributes of real-time, micro-batch, and batch inference scenarios.

Real Time Micro Batch Batch
Execution Mode
Synchronous Synchronous/Asynchronous Asynchronous
Prediction Latency
Subsecond Seconds to minutes Indefinite
Data Bounds Unbounded/stream Bounded Bounded
Execution Frequency
Variable Variable Variable/fixed
Invocation Mode
Continuous stream/API calls Event-based Event-based/scheduled
Examples Real-time REST API endpoint Data analyst running a SQL UDF Scheduled inference job

Table 1. Key characteristics of real-time, micro-batch, and batch inference scenarios

Key considerations for batch inference jobs

Batch inference tasks are usually good candidates for horizontal scaling. Each worker within a cluster can operate on a different subset of data without the need to exchange information with other workers. AWS offers multiple storage and compute options that enable horizontal scaling. Table 2 shows some key considerations when architecting for batch inference jobs.

  • Model type and ML framework. Models built with frameworks such as XGBoost and SKLearn require smaller compute instances. Those built with deep learning frameworks, such as TensorFlow and PyTorch require larger ones.
  • Complexity of the model. Simple models can run on CPU instances while more complex ensemble models and large-scale deep learning models can benefit from GPU instances.
  • Size of the inference data. While all approaches work on small datasets, larger datasets come with a unique set of challenges. The storage system must provide sufficient throughput and I/O to reliably run the inference workload.
  • Inference frequency and job concurrency. The volume of jobs within a fixed interval of time is an important consideration to address Service Quotas. The frequency and SLA requirements also proportionally impact the number of concurrent jobs. This might create additional pressure on the underlying Service Quotas.
ML Framework Model Complexity
Inference Data Size
Inference Frequency
Job Concurrency
  • Traditional
    • XGBoost
    • SKLearn
  • Deep Learning
    • Tensorflow
    • PyTorch
  • Low (linear models)
  • Medium (complex ensemble models)
  • High (large scale DL models)
  • Small (<1 GB)
  • Medium (<100 GB)
  • Large (<1 TB)
  • Hyperscale (>1 TB)
  • Hourly
  • Daily
  • Weekly
  • Monthly
  • 1
  • <10
  • <100
  • >100

Table 2. Key considerations when architecting for batch inference jobs

Real world Batch Inference use case and architecture

Often customers in certain domains such as advertising and marketing or healthcare must make predictions on hyperscale datasets. This requires deploying an inference pipeline that can complete several thousand inference jobs on extremely large datasets. The individual models used are typically of low complexity from a compute perspective. They could include a combination of various algorithms implemented in scikit-learn, XGBoost, and TensorFlow, for example. Most of the complexity in these use cases stems from large volumes of data and the number of concurrent jobs that must run to meet the service level agreement (SLA).

The batch inference architecture for these requirements typically is composed of three layers:

  • Orchestration layer. Manages the submission, scheduling, tracking, and error handling of individual jobs or multi-step pipelines
  • Storage layer. Stores the data that will be inferenced upon
  • Compute layer. Runs the inference job

There are several AWS services available that can be used for each of these architectural layers. The architecture in Figure 1 illustrates a real world implementation. Amazon SageMaker Processing and training services are used for compute layer and Amazon S3 for the storage layer. Amazon Managed Workflows for Apache Airflow (MWAA) and Amazon DynamoDB are used for the orchestration and job control layer.

Figure 1. Architecture for batch inference at scale with Amazon SageMaker

Figure 1. Architecture for batch inference at scale with Amazon SageMaker

Orchestration and job control layer. Apache Airflow is used to orchestrate the training and inference pipelines with job metadata captured into DynamoDB. At each step of the pipeline, Airflow updates the status of each model run. A custom Airflow sensor polls the status of each pipeline. It advances the pipeline with the successful completion of each step, or resubmits a job in case of failure.

Compute layer. SageMaker processing is used as the compute option for running the inference workload. SageMaker has a purpose-built batch transform feature for running batch inference jobs. However, this feature often requires additional pre and post-processing steps to get the data into the appropriate input and output format. SageMaker Processing offers a general purpose managed compute environment to run a custom batch inference container with a custom script. In the architecture, the processing script takes the input location of the model artifact generated by a SageMaker training job and the location of the inference data, and performs pre and post-processing along with model inference.

Storage layer. Amazon S3 is used to store the large input dataset and the output inference data. The ShardedByS3Key data distribution strategy distributes the files across multiple nodes within a processing cluster. With this option enabled, SageMaker Processing will automatically copy a different subset of input files into each node of the processing job. This way you can horizontally scale batch inference jobs by requesting a higher number of instances when configuring the job.

One caveat of this approach is that while many ML algorithms utilize multiple CPU cores during training, only one core is utilized during inference. This can be rectified by using Python’s native concurrency and parallelism frameworks such concurrent.futures. The following pseudo-code illustrates how you can distribute the inference workload across all instance cores. This assumes the SageMaker Processing job has been configured to copy the input files into the /opt/ml/processing/input directory.

from concurrent.futures import ProcessPoolExecutor, as_completed
from multiprocessing import cpu_count
import os
from glob import glob
import pandas as pd

def inference_fn(model_dir, file_path, output_dir):

model = joblib.load(f"{model_dir}/model.joblib")
data = pd.read_parquet(file_path)
data["prediction"] = model.predict(data)

output_path = f"{output_dir}/{os.path.basename(file_path)}"

data.to_parquet(output_path)

return output_path

input_files = glob("/opt/ml/processing/input/*")
model_dir = "/opt/ml/model"
output_dir = "/opt/ml/output"

with ProcessPoolExecutor(max_workers=cpu_count()) as executor:
futures = [executor.submit(inference_fn, model_dir, file_path, output_dir) for file in input_files]

results =[]
for future in as_completed(futures):
results.append(future.result())

Conclusion

In this blog post, we described ML inference options and use cases. We primarily focused on batch inference and reviewed key challenges faced when performing batch inference at scale. We provided a mental model of some key considerations and best practices to consider as you make various architecture decisions. We illustrated these considerations with a real world use case and an architecture pattern to perform batch inference at scale. This pattern can be extended to other choices of compute, storage, and orchestration services on AWS to build large-scale ML inference solutions.

More information:

Using Okta as an identity provider with Amazon MWAA

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/using-okta-as-an-identity-provider-with-amazon-mwaa/

This post is written by Henry Robalino, Solutions Architect.

Amazon Managed Workflows for Apache Airflow (Amazon MWAA), is a fully managed service that allows data engineers and data scientists to run data processing workflows in the cloud. Okta is a third-party identity provider (IdP) that allows customers to use AWS Single Sign-On (AWS SSO) for their employees to be able to log in quickly and securely.

This blog post shows how to integrate Okta with AWS SSO to access Amazon MWAA using single sign-on.

Overview

Customers use Amazon MWAA to run workflows at scale on the cloud. They want to use their existing login solutions and investments the business made on their current IdP, in this case Okta.

AWS SSO does not yet provide APIs to automate creation and configuration of custom SAML 2.0 applications. As a result, many of the steps in this blog are manual and require using the AWS Management Console.

Prerequisites

Deploying this solution requires:

Creating an Amazon MWAA application in AWS SSO

Create a custom SAML 2.0 application for Amazon MWAA

  1. Sign into the AWS Management Console, using an account with the appropriate permissions to modify AWS SSO.
  2. In the AWS SSO console, navigate to Applications. Select “Add a new application”.
    Add a new application
  3. On the Add New Application page, select “Add a custom SAML 2.0 application”:
    Add a custom SAML 2.0 application
  4. On the Configure Custom SAML 2.0 application:
    1. For Display name, enter AWS_SSO_Amazon_MWAA.
    2. For Description, enter AWS SSO Application for Amazon MWAA.
      Configuration
  5. In the Application metadata section, select the option to manually type in the metadata values.
    Before:
    Application metadata
    After:
    After entries
  6. Enter the Application properties and Application metadata sections:
    • Application start URL: This is the Amazon MWAA WebLogin URL, which you can locate in the Amazon MWAA console.
      • For example: https://123456a0-0101-2020-9e11-1b159eec9000.c2.us-east-1.airflow.amazonaws.com
    • Application ACS URL: This is the Assertion Consumer Service (ACS) URL that AWS SSO provides.
      • For example: https://us-east-1.signin.aws.amazon.com/platform/saml/acs/012345678-0102-0304-0506-EXAMPLE01
    • Application SAML audience: This is the SAML audience that AWS SSO provides.
      • For example: https://us-east-1.signin.aws.amazon.com/platform/saml/d-012345678E
  7. The Application properties and Application metadata now look like this:
    Resulting dialog
  8. Choose Save changes. A custom SAML 2.0 application for Amazon MWAA is created. You are now redirected to the AWS_SSO_Amazon_MWAA application page.
  9. On the Attribute mappings tab, modify the existing Subject attribute to “${user:subject}” and a Format of “unspecified.” Choose Save changes.
    Subject field
  10. On the Assigned users tab, add the previously created Amazon MWAA Okta user. Select Assign users and the user. Choose Save changes.
    Assign users

You have now created a custom application for Amazon MWAA in AWS SSO. You have added a user and configured the attribute mappings.

Configuring an Amazon MWAA Permission Sets in AWS SSO

Assign IAM permissions to the newly created Amazon MWAA application by using a permissions set. A permission set is a collection of administrator-defined policies that AWS SSO uses to determine a user’s effective permissions to access a given AWS account.

  1. Navigate to the AWS SSO console. Select on AWS accounts on the left-hand side. Select the Permission sets tab and choose the Create permission set button.
    Create permission set
  2. Select the Create a custom permission set option.
    Create permission set workflow
  3. Provide a name for the Custom Permission Set and an optional description. Choose the Create a custom permissions policy check box.
    Workflow step 2
  4. In the new text field, add the IAM policy below. This set of permissions is associated with the AWS_SSO_Amazon_MWAA application. Make sure to use the correct Amazon Resource Names (ARN) for your Amazon MWAA environment in the below sample text.Sample IAM policy:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "airflow:GetEnvironment",
                    "airflow:CreateCliToken"
                ],
                "Resource": "arn:aws:airflow:us-east-1:111222333444:environment/MY-MWAA-ENV"
            },
            {
                "Effect": "Allow",
                "Action": "airflow:CreateWebLoginToken",
                "Resource": "arn:aws:airflow:us-east-1:111222333444:role/MY-MWAA-ENV/viewer"
            }
        ]
    }
    

    The policy enables the following permissions:

    • GetEnvironment – retrieves the details of an Amazon MWAA environment
    • CreateCLIToken – creates a CLI token request for an MWAA environment.
    • CreateWebLoginToken – creates an Airflow web UI login token request for the Amazon MWAA environment.

5. Follow the prompts to fill out tags as necessary. Choose Proceed to AWS accounts.
Proceed to AWS accounts

You have now finished configuring the Amazon MWAA application inside of AWS SSO.

Testing and validation

To test and validate the configuration:

  1. Navigate to your Okta SSO portal. Sign in with the appropriate account that is assigned to the Amazon MWAA application.
    Single sign-on
  2. To access Amazon MWAA, select the AWS Account application. This opens up the AWS Management Console in another window. Once this window opens, close it. As of this writing Amazon MWAA does not support “Auth Mode: SSO”, hence this workaround.
  3. Next, select the AWS_SSO_Amazon_MWAA application. You are redirected to the Amazon MWAA SSO Page.
  4. Choose the Sign in with AWS Management Console SSO.
    Sign in to Airflow
  5. You are redirected to the Amazon MWAA web server UI.
    Amazon MWAA web server UI

In this page, you can see all the DAGs available to you and view the DAG history. In the top-right corner, you can see that you are logged in using the AWS SSO assumed role.

Conclusion

This blog post shows you how to integrate Amazon MWAA with Okta as your managed AWS SSO implementation. You can use this solution for your own use cases and enable Okta SSO and Amazon MWAA.

To stay up to date with AWS Identity launches, see: https://aws.amazon.com/blogs/security/highlights-from-the-latest-aws-identity-launches/.

For more serverless learning resources, visit Serverless Land.

Bolster security with role-based access control in Amazon MWAA

Post Syndicated from Virendhar Sivaraman original https://aws.amazon.com/blogs/big-data/bolster-security-with-role-based-access-control-in-amazon-mwaa/

Amazon Studios invests in content that drives global growth of Amazon Prime Video and IMDb TV. Amazon Studios has a number of internal-facing applications that aim to streamline end-to-end business processes and information workflows for the entire content creation lifecycle. The Amazon Studios Data Infrastructure (ASDI) is a centralized, curated, and secure data lake that stores data, both in its original form and processed for analysis and machine learning (ML). The centralized ASDI is essential to break down data silos and combine different types of analytics, thereby allowing Amazon Studios to gain valuable insights, guide better business decisions, and innovate using the latest ML concepts.

What are the primary goals for Amazon MWAA adoption?

Amazon Managed Workflows for Apache Airflow (MWAA) is a fully managed service that makes it easier to run open-source versions of Apache Airflow on AWS. Builders at Amazon.com are engineering Amazon MWAA Directed Acyclic Graphs (DAGs) with prerequisites for provisioning the least privilege access model to the underlying services and resources, and restricting the blast radius of a given task.

Apache Airflow connections provide mechanisms for securely accessing the resources during DAG execution and are intended for coarse-grained access. Incorporating fine-grained access requires different mechanisms for implementation and code review prior to deployment. The additional challenge of codifying the infrastructure and stitching multiple systems together can also inject redundant activities when implementing fine-grained access patterns in Airflow.

How did Amazon achieve this goal?

The objective to enforce security for DAGs at its lowest possible granularity is done at the DAG’s task level. The solution aligns with integration of Amazon MWAA task security with AWS Identity and Access Management (IAM) service and AWS Security Token Service (AWS STS). The engineers customized the existing Airflow PythonOperators to tightly couple task access requirements to separately deployed IAM roles. The customized Airflow operator takes advantage of AWS STS to assume the associated IAM role. The temporary session created from AWS STS is used within PythonOperator to access the underlying resources required to run the task.

In this post, we discuss how to strengthen security in Amazon MWAA with role-based access control.

Prerequisites

To implement this solution, complete the following prerequisites:

  1. Create an AWS account with admin access.
  2. Create an Amazon MWAA environment.
    1. Note down the execution role ARN associated with the Amazon MWAA environment. This is available in the Permissions section of the environment.

  1. Create two Amazon Simple Storage Service (Amazon S3) buckets:
    1. s3://<AWS_ACCOUNT_ID>-<AWS_REGION>-mwaa-processed/
    2. s3://<AWS_ACCOUNT_ID>-<AWS_REGION>-mwaa-published/
  2. Create two IAM roles; one for each of the buckets:
    1. write_access_processed_bucket with the following policy:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:DeleteObject"
            ],
            "Resource": "arn:aws:s3:::<AWS_ACCOUNT_ID>-<AWS_REGION>-mwaa-processed/*"
        }
    ]
}
    1. write_access_published_bucket with the following policy:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:DeleteObject"
            ],
            "Resource": "arn:aws:s3:::<AWS_ACCOUNT_ID>-<AWS_REGION>-mwaa-published/*"
        }
    ]
}
  1. Update the trust relationship for the preceding two roles with the Amazon MWAA execution role obtained from Amazon MWAA environment page:
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": [
          "arn:aws:iam::<AWS_ACCOUNT_ID>:assumed-role/<MWAA-EXECUTION_ROLE>/AmazonMWAA-airflow"
        ],
        "Service": "s3.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

In the preceding policy, replace AWS_ACCOUNT_ID and MWAA-EXECUTION_ROLE with the respective account number, region and Amazon MWAA execution role.

Run the DAG

The proposed DAG has two tasks that access each of the preceding buckets created:

  • Process task – Performs a task in the processed S3 bucket, which mocks a transformation using the Python sleep() function. The last step in this task adds a control file with the current timestamp.
  • Publish task – Performs a similar transformation in the published S3 bucket, which again mocks a transformation using the Python sleep() function. The last step in this task adds a control file with the current timestamp.

The fine-grained access restriction is enforced by a custom implementation of a widely used Airflow operator: PythonOperator. The custom PythonOperator negotiates with AWS STS to trade a session using the IAM role. The session is exclusively used by the tasks’ callable to access the underlying AWS resources. The following diagram shows the sequence of events.

The source code for the preceding implementation is available in the mwaa-rbac-task GitHub repository.

The code base is set up in the following location in Amazon S3, as seen from the Amazon MWAA environment on the Amazon MWAA console.

Run the DAG and monitor its progress, as shown in the following screenshot.

After you run the DAG, the following files are created with timestamps updated:

  • s3://<AWS_ACCOUNT_ID>-<AWS_REGION>-mwaa-processed/control_file/processed.json 
    	{
    		"processed_dt": "03/05/2021 01:03:58"
    	}

  • s3://<AWS_ACCOUNT_ID>-<AWS_REGION>-mwaa-published/control_file/published.json
    	{
    		"published_dt": "03/05/2021 01:04:12"
    	}

The change in the preceding control files reflects that the tasks in the DAGs enforced the policies defined for these tasks.

Create custom Airflow Operators to support least privilege access

You can extend the demonstrated methodology for enabling fine-grained access using a customized PythonOperator to other Airflow operators and sensors as needed. For more information about how to customize operators, see Creating a custom Operator.

Conclusion

In this post, we presented a solution to bolster security in Amazon MWAA with role-based access controls. You can extend the concept to other Airflow operators in order enhance the workflow security at the task level. In addition, using the AWS Cloud Development Kit (AWS CDK) can make provisioning the Amazon MWAA environment and fine-grained IAM task roles seamless. We look forward to sharing more about fine-grained access patterns for Airflow tasks in a future post.


About the Author

Kishan Desai is a Data Engineer at Amazon Studios building a data platform to support the content creation process. He is passionate about building flexible and modular systems on AWS using serverless paradigms. Outside of work, Kishan enjoys learning new technologies, watching sports, experiencing SoCal’s great food, and spending quality time with friends and family.

 

 

Virendhar (Viru) Sivaraman is a strategic Big Data & Analytics Architect with Amazon Web Services. He is passionate about building scalable big data and analytics solutions in the cloud. Besides work, he enjoys spending time with family, hiking & mountain biking.

Field Notes: Deploying Autonomous Driving and ADAS Workloads at Scale with Amazon Managed Workflows for Apache Airflow

Post Syndicated from Hendrik Schoeneberg original https://aws.amazon.com/blogs/architecture/field-notes-deploying-autonomous-driving-and-adas-workloads-at-scale-with-amazon-managed-workflows-for-apache-airflow/

Cloud Architects developing autonomous driving and ADAS workflows are challenged by loosely distributed process steps along the tool chain in hybrid environments. This is accelerated by the need to create a holistic view of all running pipelines and jobs. Common challenges include:

  • finding and getting access to the data sources specific to your use case, understanding the data, moving and storing it,
  • cleaning and transforming it and,
  • preparing it for downstream consumption.

Verifying that your data pipeline works correctly and ensuring you provide a certain data quality while your application is being developed adds to the complexity. Furthermore, the fact that the data itself constantly changes poses more challenges. In this blog post, we show you the steps to follow from a workflow run in an Amazon Managed Workflows for Apache Airflow (MWAA) environment. All required infrastructure is created leveraging the AWS Cloud Development Kit (CDK).

We illustrate how image data collected during field operational tests (FOTs) of vehicles can be processed by:

  • detecting objects within the video stream
  • adding the bounding boxes of all detected objects to the image data
  • providing visual and metadata information for data cataloguing as well as potential applications on the bounding boxes.

We define a workflow that processes ROS bag files on Amazon S3, and extracts individual PNGs from a video stream using AWS Fargate on Amazon Elastic Container Service (Amazon ECS). Then we show how to use Amazon Rekognition to detect objects within the exported images and draw bounding boxes for the detected objects within the images. Finally, we demonstrate how users’ interface with MWAA to develop and publish their workflows. We also show methods to keep track of the performance and costs of your workflows.

Overview of the solution

Deploying Autonomous Driving & ADAS workloads at scale

Figure 1 – Architecture for deploying Autonomous Driving & ADAS workloads at scale

The preceding diagram shows our target solution architecture. It includes five components:

  • Amazon MWAA environment: we use Amazon MWAA to orchestrate our image processing workflow’s tasks. A workflow, or DAG (directed acyclic graph), is a set of task definitions and dependencies written in Python. We create a MWAA environment using CDK and define our image processing pipeline as a DAG which MWAA can orchestrate.
  • ROS bag file processing workflow: this is a workflow run on Managed Workflows for Apache Airflow. This detects and highlights objects within the camera stream of a ROS bag file. The workflow is as follows:
    • Monitors the location in Amazon S3 for the new ROS bag files.
    • If a new file gets uploaded, we extract video data from the ROS bag file’s topic that holds the video stream, extract individual PNGs from the video stream and store them on S3.
    • We use AWS Fargate on Amazon ECS  to run a containerized image extraction workload.
    • Once the extracted images are stored on S3, they are passed to Amazon Rekognition for object detection. We then provide an image or video to the Amazon Rekognition API, and the service can identify objects, people, text, scenes, and activities. Amazon Rekognition Image can return the bounding box for many object labels.
    • We use the information to highlight the detected objects within the image by drawing the bounding box.
    • The bounding boxes of the objects Amazon Rekognition detects within the extracted PNGs are rendered directly into the image using Pillow, a fork of the Python Imaging Library (PIL).
    • The resulting images are then stored in Amazon S3 and our workflow is complete.
  • Predefined DAGs: Our solution comes with a pre-defined ROS bag image processing DAG that will be copied into the Amazon MWAA environment with CDK.
  • AWS CodePipeline for automated CI/CD pipeline: CodePipeline automates the build, test, and deploy phases of your release process every time there is a code change, based on the release model you define
  • Amazon CloudWatch for monitoring and operational dashboard: With Amazon CloudWatch we can monitor important metrics of the ROS bag image processing workflow, for example. to verify the compute load, number of running workflows or tasks or to monitor and alert on processing errors.

Prerequisites

Deployment

To deploy and run the solution, we follow these steps:

  1. Configure and create an MWAA environment in AWS using CDK
  2. Copy the ROS bag file to S3
  3. Manage the ROS bag processing DAG from the Airflow UI
  4. Inspect the results
  5. Monitor the operations dashboard

Each component illustrated in the solution architecture diagram, as well as all required IAM roles and configuration parameters are created from an AWS CDK application. This is included in these example sources.

1. Configure and create an MWAA environment in AWS using CDK

  • Download the deployment templates
  • Extract the zip file to a directory
  • Open a shell and go to the extracted directory
  • Start the deployment by typing the following code:
./deploy.sh <named-profile> deploy true <region>
  • Replace <named-profile> with the named profile which you configured for your AWS CLI to point to your target AWS account.
  • Replace <region> with the Region that is associated with your named profile, for example, us-east-1.
  • As an example: if your named profile for the AWS CLI is called rosbag-processing and your associated Region is us-east-1 the resulting command would be:
./deploy.sh rosbag-processing deploy true us-east-1

After confirming the deployment, the CDK will take several minutes to synthesize the CloudFormation templates, build the Docker image and deploy the solution to the target AWS account.

Figure 2 - Screenshot of Rosbag processing stack

Figure 2 – Screenshot of Rosbag processing stack

Inspect the CloudFormation stacks being created in the AWS console. Sign into your AWS console and select the service CloudFormation in the AWS Region that you chose for your named profile (for exmaple, us-east-After the installation script has run, the CloudFormation stack overview in the console should show the following stacks:

Figure 3 - Screenshot of CloudFormation stack overview

Figure 3 – Screenshot of CloudFormation stack overview

2. Copy the ROS bag file to S3

  • You can download the ROS bag file here. After downloading you need to copy the file to the input bucket that was created during the CDK deployment in the previous step.
  • Open the AWS management console and navigate to the service S3. You should find a list of your buckets including the buckets for input data, output data and DAG definitions that were created during the CDK deployment.
Figure 4 - Screenshot showing S3 buckets created

Figure 4 – Screenshot showing S3 buckets created

  • Select the bucket with prefix rosbag-processing-stack-srcbucket and copy the ROS bag file into the bucket.
  • Now that the ROS bag file is in S3, we need to enable the image processing workflow in MWAA.

3. Manage the ROS bag processing DAG from the Airflow UI

  • In order to access the Airflow web server, open the AWS management console and navigate to the MWAA service.
  • From the overview, select the MWAA environment that was created and select Open Airflow UI as shown in the following screenshot:
Figure 5 - Screenshot showing Airflow Environments

Figure 5 – Screenshot showing Airflow Environments

The Airflow UI and the ROS bag image processing DAG should appear as in the following screenshot:

  • We can now enable the DAG by flipping the switch (1) from “Off” to “On” position. Amazon MWAA will now schedule the launch for the workflow.
  • By selecting the DAG’s name (rosbag_processing) and selecting the Graph View, you can review the workflow’s individual tasks:
Figure 6 - The Airflow UI and the ROS bag image processing DAG

Figure 6 – The Airflow UI and the ROS bag image processing DAG

Figure 7 - Dag Workflow

Figure 7 – Dag Workflow

After enabling the image processing DAG by flipping the switch to “on” in the previous step, Airflow will detect the newly uploaded ROS bag file in S3 and start processing it. You can monitor the progress from the DAG’s tree view (1), as shown in the following screenshot:

Figure 8 - Monitor progress in the DAG's tree view

Figure 8 – Monitor progress in the DAG’s tree view

Re-running the image processing pipeline

Our solution uses S3 metadata to keep track of the processing status of each ROS bag file in the input bucket in S3. If you want to re-process a ROS bag file:

  • open the AWS management console,
  • navigate to S3, click on the ROS bag file you want to re-process,
  • remove the file’s metadata tag processing.status from the properties tab:
Figure 9 - Metadata tag

Figure 9 – Metadata tag

With the metadata tag processing.status removed, Amazon MWAA picks up the file the next time the image processing workflow is launched. You can manually start the workflow by choosing Trigger DAG (1) from the Airflow web server UI.

Figure 10 - Manually start the workflow by choosing Trigger DAG

Figure 10 – Manually start the workflow by choosing Trigger DAG

4. Inspect the results

In the AWS management console navigate to S3, and select the output bucket with prefix rosbag-processing-stack-destbucket. The bucket holds both the extracted images from the ROS bag’s topic containing the video data (1) and the images with highlighted bounding boxes for the objects detected by Amazon Rekognition (2).

Figure 11 - Output S3 buckets

Figure 11 – Output S3 buckets

  • Navigate to the folder 20201005 and select an image,
  • Navigate to the folder bounding_boxes
  • Select the corresponding image. You should receive an output similar to the following two examples:
Left: Image extracted from ROS bag camera feed topic. Right: Same image with highlighted objects that Amazon Rekognition detected.

Figure 12 – Left: Image extracted from ROS bag camera feed topic. Right: Same image with highlighted objects that Amazon Rekognition detected.

Amazon CloudWatch for monitoring and creating operational dashboard

Amazon MWAA comes with CloudWatch metrics to monitor your MWAA environment. You can create dashboards for quick and intuitive operational insights. To do so:

  • Open the AWS management console and navigate to the service CloudWatch.
  • Select the menu “Dashboards” from the navigation pane and click on “Create dashboard”.
  • After providing a name for your dashboard and selecting a widget type, you can choose from a variety of metrics that MWAA provides.  This is shown in the following picture.

MWAA comes with a range of pre-defined metrics that allow you to monitor failed tasks, the health of your managed Airflow environment and helps you right-size your environment.

Figure 13 - CloudWatch metrics in a dashboard help to monitor your MWAA environment.

Figure 13 – CloudWatch metrics in a dashboard help to monitor your MWAA environment.

Clean Up

In order to delete the deployed solution from your AWS account, you must:

1. Delete all data in your S3 buckets

  • In the management console navigate to the service S3. You should see three buckets containing rosbag-processing-stack that were created during the deployment.
  • Select each bucket and delete its contents. Note: Do not delete the bucket itself.

2. Delete the solution’s CloudFormation stack

You can delete the deployed solution from your AWS account either from the management console or the command line interface.

Using the management console:

  • Open the management console and navigate to the CloudFormation service. Make sure to select the same region you deployed your solution to (for exmaple,. us-east-1).
  • Select stacks from the side menu. The solution’s CloudFormation stack is shown in the following screenshot:
Figure 14 - Cleaning up the CloudFormation Stack

Figure 14 – Cleaning up the CloudFormation Stack

Select the rosbag-processing-stack and select Delete. CloudFormation will now destroy all resources related to the ROS bag image processing pipeline.

Using the Command Line Interface (CLI):

Open a shell and navigate to the directory from which you started the deployment, then type the following:

cdk destroy --profile <named-profile>

Replace <named-profile> with the named profile which you configured for your AWS CLI to point to your target AWS account.

As an example: if your named profile for the AWS CLI is called rosbag-processing and your associated Region is us-east-1 the resulting command would be:

cdk destroy --profile rosbag-processing

After confirming that you want to destroy the CloudFormation stack, the solution’s resources will be deleted from your AWS account.

Conclusion

This blog post illustrated how to deploy and leverage workflows for running Autonomous Driving & ADAS workloads at scale. This solution was designed using fully managed AWS services. The solution was deployed on AWS using AWS CDK templates. Using MWAA we were able to set up a centralized, transparent, intuitive workflow orchestration across multiple systems. Even though the pipeline involved several steps in multiple AWS services, MWAA helped us understand and visualize the workflow, its individual tasks and dependencies.

Usually, distributed systems are hard to debug in case of errors or unexpected outcomes. However, building the ROS bag processing pipelines on MWAA allowed us to inspect the log files involved centrally from the Airflow UI, for debugging or auditing. Furthermore, with MWAA the ROS bag processing workflow is part of the code base and can be tested, refactored, versioned just like any other software artifact. By providing the CDK templates, we were able to architect a fully automated solution which you can adapt to your use case.

We hope you found this interesting and helpful and invite your comments on the solution!

Field Notes provides hands-on technical guidance from AWS Solutions Architects, consultants, and technical account managers, based on their experiences in the field solving real-world business problems for customers.

Automating Amazon CloudWatch dashboards and alarms for Amazon Managed Workflows for Apache Airflow

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/automating-amazon-cloudwatch-dashboards-and-alarms-for-amazon-managed-workflows-for-apache-airflow/

This post is written by Mark Richman, Senior Solutions Architect.

Amazon Managed Workflows for Apache Airflow (MWAA) is a fully managed service that makes it easier to run open-source versions of Apache Airflow on AWS. It allows you to build workflows to run your extract-transform-load (ETL) jobs and data pipelines.

When working with MWAA, you often need to know more about the performance of your Airflow environment to achieve full insight and observability of your system. Airflow emits a number of useful metrics to Amazon CloudWatch, which are described in the product documentation. MWAA allows customers to define CloudWatch dashboards and alarms based upon the metrics and logs that Apache Airflow emits.

Airflow exposes metrics such as number of Directed Acyclic Graph (DAG) processes, DAG bag size, number of currently running tasks, task failures, and successes. Airflow is already set up to send metrics for an MWAA environment to CloudWatch.

This blog demonstrates a solution that automatically detects any deployed Airflow environments associated with the AWS account. It builds a CloudWatch dashboard and some useful alarms for each. All source code for this blog post is available on GitHub.

You automate the creation of a CloudWatch dashboard, which displays several of these key metrics together with CloudWatch alarms. These alarms receive notifications when the metrics fall outside of the thresholds that you configure, and allow you to perform actions in response.

Prerequisites

Deploying this solution requires:

Overview

Based on AWS serverless services, this solution includes:

  1. An Amazon EventBridge rule that runs on a schedule to invoke an AWS Step Functions workflow.
  2. Step Function orchestrates several AWS Lambda functions to query the existing MWAA environments.
  3. Lambda functions will update the CloudWatch dashboard definition to include metrics such as QueuedTasks, RunningTasks, SchedulerHeartbeat, TasksPending, and TotalParseTime.
  4. CloudWatch alarms are created for unhealthy workers and heartbeat failure across all MWAA environments. These alarms are removed for any nonexistent environments.
  5. Any MWAA environments that no longer exist have their respective CW dashboards removed.

Reference architecture

EventBridge, a serverless event service, is configured to invoke a Step Functions workflow every 10 minutes. You can configure this for your preferred interval. Step Functions invokes a number of Lambda functions in parallel. If a function throws an error, the failed step in the state machine transitions to its respective failed state and the entire workflow ends.

Each of the Lambda functions performs a single task, orchestrated by Step Functions. Each function has a descriptive name for the task it performs in the workflow.

Understanding the CreateDashboardFunction function

When you deploy the AWS SAM template, the SeedDynamoDBFunction Lambda function is invoked. The function populates a DynamoDB table called DashboardTemplateTable with a CloudWatch dashboard definition. This definition is a template for any new CloudWatch dashboard created by CreateDashboardFunction.

You can see this definition in the GitHub repo:

{
  "widgets": [{
      "type": "metric",
      "x": 0,
      "y": 0,
      "width": 12,
      "height": 6,
      "properties": {
        "view": "timeSeries",
        "stacked": true,
        "metrics": [
          [
            "AmazonMWAA",
            "QueuedTasks",
            "Function",
            "Executor",
            "Environment",
            "${EnvironmentName}"
          ]
        ],
        "region": "${AWS::Region}",
        "title": "QueuedTasks ${EnvironmentName}",
        "period": 300
      }
    },
    ...
}

When this Step Functions workflow runs, it runs CreateDashboardFunction. This iterates through all the MWAA environments in the account, creating or updating its corresponding CloudWatch dashboard. You can see the code in /functions/create_dashboard/app.py:

for env in mwaa_environments:
        dashboard_name = f"Airflow-{env}"

        dashboard_body = dashboard_template.replace(
            "${AWS::Region}", os.getenv("AWS_REGION", "us-east-1")
        ).replace("${EnvironmentName}", env)

        logger.info(f"Creating/updating dashboard: {dashboard_name}")
        logger.debug(dashboard_body)

        response = cloudwatch.put_dashboard(
            DashboardName=dashboard_name, DashboardBody=dashboard_body
        )

        logger.info(json.dumps(response, indent=2))

Step Functions state machine

Here is a visualization of the Step Functions state machine:

State machine visualization

Step Functions is based on state machines and tasks. A state machine is a workflow. A task is a state in a workflow that represents a single unit of work that another AWS service performs. Each step in a workflow is a state. The state machine in this solution is defined in JSON format in the repo.

Building and deploying the example application

To build and deploy this solution:

  1. Clone the repo from GitHub:
    git clone https://github.com/aws-samples/mwaa-dashboard
    cd mwaa-dashboard
    
  2. Build the application. Since Lambda functions may depend on packages that have natively compiled programs, use the --use-container flag. This flag compiles your functions locally in a Docker container that behaves like the Lambda environment:
    sam build --use-container
  3. Deploy the application to your AWS account:
    sam deploy --guided

This command packages and deploys the application to your AWS account. It provides a series of prompts:

  • Stack Name: The name of the stack to deploy to AWS CloudFormation. This should be unique to your account and Region. This walkthrough uses mwaa-dashboard throughout this project.
  • AWS Region: The AWS Region you want to deploy your app to.
  • Confirm changes before deploy: If set to yes, any change sets will be shown to you for manual review. If set to no, the AWS SAM CLI automatically deploys application changes.
  • Respond to the remaining prompts per the SAM CLI command reference.

Updating the CloudWatch dashboard template definition in DynamoDB

The CloudWatch dashboard template definition is stored in DynamoDB. This is a one-time setup step, performed by the functions/seed_dynamodb Lambda custom resource at stack deployment time.

To override the template, you can edit the data directly in DynamoDB using the AWS Management Console. Alternatively, modify the scripts/dashboard-template.json file and update DynamoDB using the scripts/seed.py script.

cd scripts
./seed.py -t <dynamodb-table-name>
cd ..

Here, <dynamodb-table-name> is the name of the DynamoDB table created during deployment. For example:

./seed.py -t mwaa-dashboard-DashboardTemplateTable-VA2M5945RCF1

Viewing the CloudWatch dashboards and alarms

If you have any existing MWAA environments, or create new ones, a dashboard for each appears with the Airflow- prefix. If you delete an MWAA environment, the corresponding dashboard is also deleted. No CloudWatch metrics are deleted.

Upon successful completion of the Step Functions workflow, you see a list of custom dashboards. There is one for each of your MWAA environments:

Custom dashboards view

Choosing the dashboard name displays the widgets defined in the JSON described previously. Each widget corresponds to an Airflow key performance indicator (KPI). The dashboards can be customized through the AWS Management Console without any code changes.

Example dashboard

These are the metrics:

  • QueuedTasks: The number of tasks with queued state. Corresponds to the executor.queued_tasks Airflow metric.
  • TasksPending: The number of tasks pending in executor. Corresponds to the scheduler.tasks.pending Airflow metric.
  • RunningTasks: The number of tasks running in executor. Corresponds to the executor.running_tasks Airflow metric.
  • SchedulerHeartbeat: The number of check-ins Airflow performs on the scheduler job. Corresponds to the scheduler_heartbeat Airflow metrics.
  • TotalParseTime: Number of seconds taken to scan and import all DAG files once. Corresponds to the dag_processing.total_parse_time Airflow metric.

More information on all MWAA metrics available in CloudWatch can be found in the documentation. CloudWatch alarms are also created for each MWAA environment:

CloudWatch alarms list

By default, you create two alarms:

  • {environment name}-UnhealthyWorker: This alarm triggers if the number of QueuedTasks is greater than the number of RunningTasks, and the number of RunningTasks is zero, for a period of 15 minutes.
  • {environment name}-HeartbeatFail: This alarm triggers if SchedulerHeartbeat is zero for a period of five minutes.

You can configure actions in response to these alarms, such as an Amazon SNS notification to email or a Slack message.

Cleaning up

After testing this application, delete the resources created to avoid ongoing charges. You can use the AWS CLI, AWS Management Console, or the AWS APIs to delete the CloudFormation stack deployed by AWS SAM.

To delete the stack via the AWS CLI, run the following command:

aws cloudformation delete-stack --stack-name mwaa-dashboard

The log groups all share the prefix /aws/lambda/mwaa-dashboard. Delete these with the command:

aws logs delete-log-group --log-group-name <log group>

Conclusion

With Amazon MWAA, you can spend more time building workflows and less time managing and scaling infrastructure. This article shows a serverless example that automatically creates CloudWatch dashboards and alarms for all existing and new MWAA environments. With this example, you can achieve better observability for your MWAA environments.

To get started with MWAA, visit the user guide. To deploy this solution in your own AWS account, visit the GitHub repo for this article.

For more serverless learning resources, visit Serverless Land.

Orchestrating analytics jobs on Amazon EMR Notebooks using Amazon MWAA

Post Syndicated from Fei Lang original https://aws.amazon.com/blogs/big-data/orchestrating-analytics-jobs-on-amazon-emr-notebooks-using-amazon-mwaa/

In a previous post, we introduced the Amazon EMR notebook APIs, which allow you to programmatically run a notebook on both Amazon EMR Notebooks and Amazon EMR Studio (preview) without accessing the AWS web console. With the APIs, you can schedule running EMR notebooks with cron scripts, chain multiple EMR notebooks, and use orchestration services such as AWS Step Functions triggered by AWS CloudWatch Events.

In this post, we show how to use Amazon Managed Workflows for Apache Airflow (Amazon MWAA) to orchestrate analytics jobs on EMR Notebooks. We will start by walking you through the process of using AWS CloudFormation to set up an Amazon MWAA environment, which allows you to programmatically author, schedule, and monitor different sorts of workflows on Amazon EMR. We will then use this environment to run an EMR notebook example which does data analysis with Hive.

The data source for the example in this post is from the public Amazon Customer Reviews Dataset. We use the Parquet formatted dataset as the input dataset for our EMR notebook.

Apache Airflow and Amazon MWAA

Apache Airflow is an open-source platform for authoring, scheduling, and monitoring workflows. With Apache Airflow, we can define direct acyclic graphs (DAGs). DAGs describe how to run a workflow and are written in Python. For additional details on Apache Airflow, see Concepts. Many organizations build, manage, and maintain Apache Airflow on AWS using services such as Amazon Elastic Compute Cloud (Amazon EC2) or Amazon Elastic Kubernetes Service (Amazon EKS). Amazon MWAA is a fully managed service that makes it easy to run open-source versions of Apache Airflow on AWS, and to build workflows to run your extract, transform, and load (ETL) jobs and data pipelines.

Prerequisites

Before getting started, you must have the following prerequisites:

  • An AWS account that provides access to AWS services.
  • AWS Command Line Interface (AWS CLI) version 1.18.128 or later installed on your workstation.
  • An Amazon Simple Storage Service (Amazon S3) bucket that meets the following Amazon MWAA requirements:
    • The bucket must be in the same AWS Region where you create the MWAA environment.
    • The bucket name must start with airflow- and should be globally unique.
    • Bucket versioning is enabled.
    • A folder named dags must be created in the same bucket to store DAGs and associated support files.
  • An AWS Identity and Access Management (IAM) user with an access key and secret access key to configure the AWS CLI.
    • The IAM user has permissions to create an IAM role and policies, launch an EMR cluster, create an Amazon MWAA environment, and create stacks in AWS CloudFormation.
  • A possible limit increase for your account. (Usually a limit increase isn’t necessary. See AWS service quotas if you encounter a limit error while building the solution.)
  • An EMR notebook created through the Amazon EMR console, using the notebook file find_best_sellers.ipynb. See Creating a Notebook for instructions on creating an EMR notebook. Record the ID of the EMR notebook (for example, <e-*************************>); you will use this later in this post.

Architecture overview

At a high level, this solution uses Amazon MWAA with Amazon EMR to build pipelines for ETL workflow orchestration. The following diagram illustrates the solution architecture.

The following diagram illustrates the solution architecture.

We use the following services and configurations in this solution:

  • Amazon S3
  • VPC network configurations
  • VPC endpoints

Amazon S3

Amazon MWAA uses an S3 bucket to store DAGs and associated support files. You must create an S3 bucket before you can create the environment, with requirements as mentioned in the Prerequisites section. To use a bucket with an Amazon MWAA environment, you must create the bucket in the same Region where you create the environment. Refer to Create an Amazon S3 bucket for Amazon MWAA for further details.

VPC network configurations

Amazon MWAA requires a VPC network that meets the following requirements:

  • Includes two private subnets that are in two different Availability Zones within the same Region
  • Includes public subnets that are configured to route the private subnet data to the internet (via NAT gateways)

For more information, see Create the VPC network using a AWS CloudFormation template.

The Airflow UI in the Amazon MWAA environment is accessible over the internet by users granted access in the IAM policy. Amazon MWAA attaches an Application Load Balancer with an HTTPS endpoint for your web server as part of the Amazon MWAA managed service. For more information, see How it works.

VPC endpoints

VPC endpoints are highly available VPC components that enable private connections between your VPC and supported AWS services. Traffic between your VPC and the other services remains in your AWS network. For our example, we use the following VPC endpoints to ensure extra security, availability, and Amazon S3 data transfer performance:

  • An Amazon S3 gateway VPC endpoint to establish a private connection between the Amazon MWAA VPC and Amazon S3
  • An EMR interface VPC endpoint to securely route traffic directly to Amazon EMR from Amazon MWAA, instead of connecting over the internet

Setting up an Amazon MWAA environment

To make it easier to get started, we created a CloudFormation template that automatically configures and deploys the Amazon MWAA environment. The template takes care of the following tasks for you:

  • Create an Amazon MWAA execution IAM role.
  • Set up the VPC network for the Amazon MWAA environment, deploying the following resources:
    • A VPC with a pair of public and private subnets spread across two Availability Zones.
    • An internet gateway, with a default route on the public subnets.
    • A pair of NAT gateways (one in each Availability Zone), and default routes for them in the private subnets.
    • Amazon S3 gateway VPC endpoints and EMR interface VPC endpoints in the private subnets in two Availability Zones.
    • A security group to be used by the Amazon MWAA environment that only allows local inbound traffic and all outbound traffic.
  • Create an Amazon MWAA environment. For this post, we select mw1.small for the environment class and choose maximum worker count as 1. For monitoring, we choose to publish environment performance to CloudWatch Metrics. For Airflow logging configuration, we choose to send only the task logs and use log level INFO.

If you want to manually create, configure, and deploy the Amazon MWAA environment without using AWS CloudFormation, see Get started with Amazon Managed Workflows for Apache Airflow (MWAA).

Launching the CloudFormation template

To launch your stack and provision your resources, complete the following steps:

  1. Choose Launch Stack:

This automatically launches AWS CloudFormation in your AWS account with a template. It prompts you to sign in as needed. You can view the template on the AWS CloudFormation console as required. The Amazon MWAA environment is created in the same Region as you launched the CloudFormation stack. Make sure that you create the stack in your intended Region.

The CloudFormation stack requires a few parameters, as shown in the following screenshot.

The CloudFormation stack requires a few parameters, as shown in the following screenshot.

The following table describes the parameters.

Parameter Description Default Value
Stack name Enter a meaningful name for the stack. We use MWAAEmrNBDemo for this example. Replace it with your own value. None
AirflowBucketName Name of the S3 bucket to store DAGs and support files. The S3 bucket must be in the same Region where you create the environment. The name must start with airflow-. Enter the S3 bucket created as a prerequisite. We use the S3 bucket airflow-emr-demo-us-west-2 for this post. You must replace it with your own value for this field. None
EnvironmentName An MWAA environment name that is prefixed to resource names. All the resources created by this templated are named after the value saved for this field. We name our environment mwaa-emr-blog-demo for this post. Replace it with your own value for this field. mwaa-
PrivateSubnet1CIDR The IP range (CIDR notation) for the private subnet in the first Availability Zone. For more information, see AWS CloudFormation VPC stack specifications. 10.192.20.0/24
PrivateSubnet2CIDR The IP range (CIDR notation) for the private subnet in the second Availability Zone. For more information, see AWS CloudFormation VPC stack specifications.. 10.192.21.0/24
PublicSubnet1CIDR The IP range (CIDR notation) for the public subnet in the first Availability Zone. For more information, see AWS CloudFormation VPC stack specifications. 10.192.10.0/24
PublicSubnet2CIDR The IP range (CIDR notation) for the public subnet in the second Availability Zone. For more information, see AWS CloudFormation VPC stack specifications. 10.192.11.0/24
VpcCIDR The IP range (CIDR notation) for this VPC being created. For more information, see AWS CloudFormation VPC stack specifications. 10.192.0.0/16

The default values for the IP range (CIDR notation) fields refer to the AWS CloudFormation VPC stack specifications. You can make changes based on the requirements of your own network settings.

  1. Enter the parameter values from the preceding table.
  2. Review the details on the Capabilities section and select the check boxes confirming AWS CloudFormation might create IAM resources with custom names.
  3. Choose Create Stack.

Stack creation takes a few minutes. After the CloudFormation stack is complete, on the Resources tab, you can find the resources being created in this CloudFormation stack. Now, we’re ready to run our example.

Orchestrating Hive analytics jobs on EMR Notebooks using Apache Airflow

The following diagram illustrates the workflow: As a user, you first need to create the DAG file that describes how to run the analytics jobs and upload it to the dags folder under the S3 bucket specified. The DAG can be triggered in Apache Airflow UI to orchestrate the job workflow, which includes creating an EMR cluster, waiting for the cluster to be ready, running Hive analytics jobs on EMR notebooks, uploading the results to Amazon S3, and cleaning up the cluster after the job is complete.

The following diagram illustrates the workflow.

Input notebook file

Let’s take a look at the following input notebook file find_best_sellers.ipynb, which we use for our example.

Let’s take a look at the following input notebook file find_best_sellers.ipynb, which we use for our example.

find_best_sellers.ipynb is a Python script that does analysis on the public Amazon Customer Reviews Dataset. It generates the top 20 best sellers in a given list of categories over a given period of time and saves the results to the given S3 output location. For demonstration purpose only, we rank the seller simply by the sum of review star ratings from verified purchases.

The explanations of the default parameters in the first cell and each code block are included in the notebook itself.

The last line in the first cell, we have OUTPUT_LOCATION = "s3://airflow-emr-demo-us-west-2/query_output/” as a default value for the input parameter. Replace it with your own value for the output location. You can also supply a different value for this for this parameter in the Airflow Variables later.

DAG file

The DAG file test_dag.py is used to orchestrate our job flow via Apache Airflow. It performs the following tasks:

  1. Create an EMR cluster with one m5.xlarge primary and two m5.xlarge core nodes on release version 6.2.0 with Spark, Hive, Livy and JupyterEnterpriseGateway installed as applications.
  2. Wait until the cluster is up and ready.
  3. Run the notebook find_best_sellers.ipynb on the EMR cluster created in Step 1.
  4. Wait until the notebook run is complete.
  5. Clean up the EMR cluster.

Here is the full source code of the DAG:

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from time import sleep
from datetime import datetime
import boto3, time
from builtins import range
from pprint import pprint
from airflow.operators.sensors import BaseSensorOperator
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator
from airflow.contrib.sensors.emr_job_flow_sensor import EmrJobFlowSensor
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.contrib.hooks.emr_hook import EmrHook
from airflow.contrib.sensors.emr_base_sensor import EmrBaseSensor
from airflow.models import Variable
from airflow.utils import apply_defaults
from airflow.utils.dates import days_ago

# Available categories:
#
# Apparel,Automotive,Baby,Beauty,Books,Camera,Digital_Ebook_Purchase,Digital_Music_Purchase,
# Digital_Software,Digital_Video_Download,Digital_Video_Games,Electronics,Furniture,Gift_Card,
# Grocery,Health_&_Personal_Care,Home,Home_Entertainment,Home_Improvement,Jewelry,Kitchen,
# Lawn_and_Garden,Luggage,Major_Appliances,Mobile_Apps,Mobile_Electronics,Music,Musical_Instruments,
# Office_Products,Outdoors,PC,Personal_Care_Appliances,Pet_Products,Shoes,Software,Sports,Tools,
# Toys,Video,Video_DVD,Video_Games,Watches,Wireless

# =============== VARIABLES ===============
NOTEBOOK_ID = Variable.get('NOTEBOOK_ID')
NOTEBOOK_FILE_NAME = Variable.get('NOTEBOOK_FILE_NAME')
CATEGORIES_CSV = Variable.get('CATEGORIES_CSV')
REGION = Variable.get('REGION')
SUBNET_ID = Variable.get('SUBNET_ID')
EMR_LOG_URI = Variable.get('EMR_LOG_URI')
OUTPUT_LOCATION = Variable.get('OUTPUT_LOCATION')
FROM_DATE = Variable.get('FROM_DATE')
TO_DATE = Variable.get('TO_DATE')
# =========================================

JOB_FLOW_OVERRIDES = {
    'Name': 'Test-Cluster',
    'ReleaseLabel': 'emr-6.2.0',
    'Applications': [{'Name':'Spark'}, {'Name':'Hive'}, {'Name':'Livy'}, {'Name':'JupyterEnterpriseGateway'}],
    'Configurations': [
          {
            "Classification": "hive-site",
            "Properties": {
                "hive.execution.engine": "spark"
            }
        }
    ],
    'Instances': {
        'Ec2SubnetId': SUBNET_ID,
        'InstanceGroups': [
            {
                'Name': 'Master node',
                'Market': 'ON_DEMAND',
                'InstanceRole': 'MASTER',
                'InstanceType': 'm5.xlarge',
                'InstanceCount': 1,
            },
            {
                'Name': 'Core node',
                'Market': 'ON_DEMAND',
                'InstanceRole': 'CORE',
                'InstanceType': 'm5.xlarge',
                'InstanceCount': 2,
            }
        ],
        'KeepJobFlowAliveWhenNoSteps': True,
        'TerminationProtected': False,
    },
    'JobFlowRole': 'EMR_EC2_DefaultRole',
    'ServiceRole': 'EMR_DefaultRole',
    'LogUri': EMR_LOG_URI
}


class CustomEmrJobFlowSensor(EmrJobFlowSensor):
    NON_TERMINAL_STATES = ['STARTING', 'BOOTSTRAPPING', 'TERMINATING']

class NotebookExecutionSensor(EmrBaseSensor):
    NON_TERMINAL_STATES = ['START_PENDING', 'STARTING', 'RUNNING', 'FINISHING', 'STOP_PENDING', 'STOPPING']
    FAILED_STATE = ['FAILING', 'FAILED']
    template_fields = ['notebook_execution_id']
    template_ext = ()
    @apply_defaults
    def __init__(self, notebook_execution_id, *args, **kwargs):
        super(NotebookExecutionSensor, self).__init__(*args, **kwargs)
        self.notebook_execution_id = notebook_execution_id
    def get_emr_response(self):
        emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()
        self.log.info('Poking notebook execution %s', self.notebook_execution_id)
        return emr.describe_notebook_execution(NotebookExecutionId=self.notebook_execution_id)
    @staticmethod
    def state_from_response(response):
        return response['NotebookExecution']['Status']
    @staticmethod
    def failure_message_from_response(response):
        state_change_reason = response['NotebookExecution']['LastStateChangeReason']
        if state_change_reason:
            return 'Execution failed with reason: ' + state_change_reason
        return None

def start_execution(**context):
    ti = context['task_instance']
    cluster_id = ti.xcom_pull(key='return_value', task_ids='create_cluster_task')
    print("Starting an execution using cluster: " + cluster_id)
    # generate a JSON key-pair of <String : String Array>, e.g. 
    # "\"CATEGORIES\": [\"Apparel\", \"Automotive\", \"Baby\", \"Books\"]"
    categories_escaped_quotes = ""
    for category in CATEGORIES_CSV.split(','):
        categories_escaped_quotes = categories_escaped_quotes + "\"" + category + "\","
    categories_escaped_quotes = categories_escaped_quotes[:-1]
    categories_parameter = "\"CATEGORIES\" : [" + categories_escaped_quotes + "]"

    output_location_parameter = "\"OUTPUT_LOCATION\": \"" + OUTPUT_LOCATION + "\""
    from_date_parameter = "\"FROM_DATE\": \"" + FROM_DATE + "\""
    to_date_parameter = "\"TO_DATE\": \"" + TO_DATE + "\""
    parameters = f"{{ {categories_parameter}, {output_location_parameter}, {from_date_parameter}, {to_date_parameter} }}"
    emr = boto3.client('emr', region_name=REGION)
    start_resp = emr.start_notebook_execution(
        EditorId=NOTEBOOK_ID,
        RelativePath=NOTEBOOK_FILE_NAME,
        ExecutionEngine={'Id': cluster_id, 'Type': 'EMR'},
        NotebookParams=parameters,
        ServiceRole='EMR_Notebooks_DefaultRole'
    )
    execution_id = start_resp['NotebookExecutionId']
    print("Started an execution: " + execution_id)
    return execution_id



with DAG('test_dag', description='test dag', schedule_interval='0 * * * *', start_date=datetime(2020,3,30), catchup=False) as dag:
    create_cluster = EmrCreateJobFlowOperator(
        task_id='create_cluster_task',
        job_flow_overrides=JOB_FLOW_OVERRIDES,
        aws_conn_id='aws_default',
        emr_conn_id='emr_default',
    )
    cluster_sensor = CustomEmrJobFlowSensor(
        task_id='check_cluster_task',
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster_task', key='return_value') }}",
        aws_conn_id='aws_default',
    )
    start_execution = PythonOperator(
        task_id='start_execution_task', 
        python_callable=start_execution,
        provide_context=True
    )
    execution_sensor = NotebookExecutionSensor(
        task_id='check_execution_task',
        notebook_execution_id="{{ task_instance.xcom_pull(task_ids='start_execution_task', key='return_value') }}",
        aws_conn_id='aws_default',
    )

    cluster_remover = EmrTerminateJobFlowOperator(
        task_id='terminate_cluster',
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster_task', key='return_value') }}",
        aws_conn_id='aws_default',
    )
    
    create_cluster >> cluster_sensor >> start_execution >> execution_sensor >> cluster_remover

The very last line of the DAG code explains how the tasks are linked in the orchestration workflow. It’s overloading the right shift >> operator to create a dependency, meaning that the task on the left should be run first, and the output passed to the task on the right.

Instead of hard-coding the variables in the DAG code, we choose to supply these variables by importing a JSON file in the Airflow UI before actually running the DAG. This way, we can also update the variables without having to update the DAG code, which requires updating the DAG file in Amazon S3. We walk you through how to do so in the later steps. You can see the lines for VARIABLES that we repeated:

# =============== VARIABLES ===============
NOTEBOOK_ID = Variable.get('NOTEBOOK_ID')
NOTEBOOK_FILE_NAME = Variable.get('NOTEBOOK_FILE_NAME')
CATEGORIES_CSV = Variable.get('CATEGORIES_CSV')
REGION = Variable.get('REGION')
SUBNET_ID = Variable.get('SUBNET_ID')
EMR_LOG_URI = Variable.get('EMR_LOG_URI')
OUTPUT_LOCATION = Variable.get('OUTPUT_LOCATION')
FROM_DATE = Variable.get('FROM_DATE')
TO_DATE = Variable.get('TO_DATE')

We create a JSON formatted file named variables.json for our example. See the following code:

{
    "REGION": "us-west-2",
    "SUBNET_ID": "<subnet-********>",
    "EMR_LOG_URI": "s3://<S3 path for EMR logs>/",
    "NOTEBOOK_ID": "<e-*************************>",
    "NOTEBOOK_FILE_NAME": "find_best_sellers.ipynb",
    "CATEGORIES_CSV": "Apparel,Automotive,Baby,Beauty,Books",
    "FROM_DATE": "2015-08-25",
    "TO_DATE": "2015-08-31",
    "OUTPUT_LOCATION": "s3://<S3 path for query output>/"
}

To use this JSON code, you need to replace all the variable values (subnet and S3 paths) with the actual values.

Accessing Apache Airflow UI and running the workflow

To run the workflow, complete the following steps:

  1. On the Amazon MWAA console, find the new environment mwaa-emr-blog-demo we created earlier with the CloudFormation template.

On the Amazon MWAA console, find the new environment mwaa-emr-blog-demo we created earlier with the CloudFormation template.

  1. Choose Open Airflow UI.
  2. Log in as an authenticated user.

Log in as an authenticated user.

Next, we import the JSON file for the variables into Airflow UI.

As we mentioned earlier, we want to supply the variable values for our DAG definition later upon triggering the DAG in Airflow UI instead of hard-coding the values.

  1. On the Admin menu, choose Variables.
  2. Choose Browse.
  3. Choose json.
  4. Choose Import Variables.

For more information about importing variables, see Variables.

  1. Run the following command in the same directory as where file test_dag.py is to upload the DAG file to the dags folder under the S3 bucket specified for the Airflow environment. Replace <your_airflow_bucket_name> with the S3 bucket name that you created as a prerequisite:
    aws s3 cp test_dag.py s3://<your_airflow_bucket_name>/dags/

test_dag.py should automatically appear in the Airflow UI.

  1. Trigger the DAG by turning it to On

Trigger the DAG by turning it to On

  1. Choose test_dag to go to the detail page for the DAG.

On the Graph View tab, we can see the whole workflow of our pipeline and each individual task as defined in our DAG code.

On the Graph View tab, we can see the whole workflow of our pipeline and each individual task as defined in our DAG code.

  1. Optionally, to trigger the DAG, choose Trigger DAG and add the following JSON formatted configuration before activate the DAG.

Optionally, to trigger the DAG, choose Trigger DAG and add the following JSON formatted configuration before activate the DAG.

You now get an email when failure happens on any of the tasks. You can also configure to get email notification when retry happens as well.

  1. On the Amazon EMR console, find the EMR cluster created by the create_cluster_task definition.

On the Amazon EMR console, find the EMR cluster created by the create_cluster_task definition.

  1. On the Airflow UI, you can switch tabs to check the status of the workflow tasks.

After a few minutes, we can see on the Tree View tab that the workflow is complete and all the tasks are successful.

After a few minutes, we can see on the Tree View tab that the workflow is complete and all the tasks are successful.

On the Gantt tab, we can see the time distribution of all the tasks of our workflow.

On the Gantt tab, we can see the time distribution of all the tasks of our workflow.

As specified in our DAG definition, the EMR cluster is stopped when the workflow is complete.

Because we use the cron expression 0 * * * * as the scheduled running interval for our workflow, if the triggered status of the DAG is ON, it runs every hour. You need to switch the status to OFF if you don’t want it to run again.

  1. On the Amazon S3 console, view the result of our notebook job in the S3 folder.

On the Amazon S3 console, view the result of our notebook job in the S3 folder.

For example, the following screenshot is the output for the Books category that we provided as a value in the CATEGORIES parameter. As we can see, Go Set a Watchman: A Novel is the best Books seller from the week of 8-25-2015 to 8-31-2015.

As we can see, Go Set a Watchman: A Novel is the best Books seller from the week of 8-25-2015 to 8-31-2015.

Cleaning up

To avoid ongoing charges, delete the CloudFormation stack and any files in Amazon S3 that were created by running the examples in this post.

Conclusion

This post showed how to use the Amazon EMR Notebooks API and use orchestration services such as Amazon MWAA to build ETL pipelines. It demonstrated how set up a secured Amazon MWAA environment using a CloudFormation template and run a sample workflow with Apache Airflow.

If you want to learn how to run Amazon EMR applications such as PySpark with Amazon MWAA, see Running Spark Jobs on Amazon EMR with Apache Airflow.


About the Authors

Fei Lang is a senior big data architect at Amazon Web Services. She is passionate about building the right big data solution for customers. In her spare time, she enjoys the scenery of the Pacific Northwest, going for a swim, and spending time with her family.

 

 

Ray Liu is a software development engineer at AWS. Besides work, he enjoys traveling and spending time with family.

Building complex workflows with Amazon MWAA, AWS Step Functions, AWS Glue, and Amazon EMR

Post Syndicated from Dipankar Ghosal original https://aws.amazon.com/blogs/big-data/building-complex-workflows-with-amazon-mwaa-aws-step-functions-aws-glue-and-amazon-emr/

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a fully managed service that makes it easy to run open-source versions of Apache Airflow on AWS and build workflows to run your extract, transform, and load (ETL) jobs and data pipelines.

You can use AWS Step Functions as a serverless function orchestrator to build scalable big data pipelines using services such as Amazon EMR to run Apache Spark and other open-source applications on AWS in a cost-effective manner, and use AWS Glue for a serverless environment to prepare (extract and transform) and load large amounts of datasets from a variety of sources for analytics and data processing with Apache Spark ETL jobs

For production pipelines, a common use case is to read data originating from a variety of sources. This data requires transformation to extract business value and generate insights before sending to downstream applications, such as machine learning algorithms, analytics dashboards, and business reports.

This post demonstrates how to use Amazon MWAA as a primary workflow management service to create and run complex workflows and extend the directed acyclic graph (DAG) to start and monitor a state machine created using Step Functions. In Airflow, a DAG is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.

Architectural overview

The following diagram illustrates the architectural overview of the components involved in the orchestration of the workflow. This workflow uses Amazon EMR to preprocess data and starts a Step Functions state machine. The state machine transforms data using AWS Glue.

The state machine transforms data using AWS Glue.

The workflow includes the following core components:

  1. Airflow Scheduler triggers the DAG based on a schedule or manually.
  2. DAG uses PythonOperator to create an EMR cluster and waits for the cluster creation process to complete.
  3. DAG uses a custom operator EmrSubmitAndMonitorStepOperator to submit and monitor the Amazon EMR step.
  4. DAG uses PythonOperator to stop the EMR cluster when the preprocessing tasks are complete.
  5. DAG starts a Step Functions state machine and monitors it for completion using PythonOperator.

You can build complex ETL pipelines with Step Functions separately and trigger them from an Airflow DAG.

Prerequisites

Before starting, create an Amazon MWAA environment. If this is your first time using Amazon MWAA, see Introducing Amazon Managed Workflows for Apache Airflow (MWAA).

Take a note of the Amazon Simple Storage Service (Amazon S3) bucket that stores the DAGs. It’s located on the environment details page on the Amazon MWAA console.

Take a note of the Amazon Simple Storage Service (Amazon S3) bucket that stores the DAGs.

Also note the AWS Identity and Access Management (IAM) execution role. This role should be modified to allow MWAA to read and write from your S3 bucket, submit an Amazon EMR step, start a Step Functions state machine, and read from the AWS Systems Manager Parameter Store. The IAM role is available in the Permissions section of the environment details.

The IAM role is available in the Permissions section of the environment details.

The solution references Systems Manager parameters in an AWS CloudFormation template and scripts. For information on adding and removing IAM identity permissions, see Adding and removing IAM identity permissions. A sample IAM policy is also provided in the GitHub repository amazon-mwaa-complex-workflow-using-step-functions.

For this post, we use the MovieLens dataset. We concurrently convert the MovieLens CSV files to Parquet format and save them to Amazon S3 as part of preprocessing.

Setting up the state machine using Step Functions

Our solution extends the ETL pipeline to run a Step Functions state machine from the Airflow DAG. Step Functions lets you build visual workflows that enable fast translation of business requirements into technical requirements. With Step Functions, you can set up dependency management and failure handling using a JSON-based template. A workflow is a series of steps, such as tasks, choices, parallel runs, and timeouts with the output of one step acting as input into the next. For more information about other use cases, see AWS Step Functions Use Cases.

The following diagram shows the ETL process set up through a Step Functions state machine.

The following diagram shows the ETL process set up through a Step Functions state machine.

In the workflow, the Process Data step runs an AWS Glue job, and the Get Job Status step periodically checks for the job completion. The AWS Glue job reads the input datasets and creates output data for the most popular movies and top-rated movies. After the job is complete, the Run Glue Crawler step runs an AWS Glue crawler to catalog the data. The workflow also allows you to monitor and respond to failures at any stage.

Creating resources

Create your resources by following the installation instructions provided in the amazon-mwaa-complex-workflow-using-step-functions README.md.

Running the ETL workflow

To run your ETL workflow, complete the following steps:

  1. On the Amazon MWAA console, choose Open Airflow UI.
  2. Locate the mwaa_movielens_demo DAG.
  3. Turn on the DAG.

Turn on the DAG.

  1. Select the mwaa_movielens_demo DAG and choose Graph View.

This displays the overall ETL pipeline managed by Airflow.

This displays the overall ETL pipeline managed by Airflow.

  1. To view the DAG code, choose Code.

To view the DAG code, choose Code.

The code for the custom operator can be found in the amazon-mwaa-complex-workflow-using-step-functions GitHub repo. 

  1. From the Airflow UI, select the mwaa_movielens_demo DAG and choose Trigger DAG.
  2. Leave the Optional Configuration JSON box blank.

Leave the Optional Configuration JSON box blank.

When the Airflow DAG runs, the first task calls the PythonOperator to create an EMR cluster using Boto3. Boto is the AWS SDK for Python. It enables Python developers to create, configure, and manage AWS services, such as Amazon Elastic Compute Cloud (Amazon EC2) and Amazon S3. Boto provides object-oriented API, as well as low-level access to AWS services.

The second task waits until the EMR cluster is ready and in the Waiting state. As soon as the cluster is ready, the data load task runs, followed by the data preprocessing tasks, which are started in parallel using EmrSubmitAndMonitorStepOperator. Concurrency in the current Airflow DAG is set to 3, which runs three tasks in parallel. You can change the concurrency of Amazon EMR to run multiple Amazon EMR steps in parallel.

When the data preprocessing tasks are complete, the EMR cluster is stopped and the DAG starts the Step Functions state machine to initiate data transformation.

The final task in the DAG monitors the completion of the Step Functions state machine.

The DAG run should complete in approximately 10 minutes.

Verifying the DAG run

While the DAG is running, you can view the task logs.

  1. From Graph View, select any task and choose View Log.

From Graph View, select any task and choose View Log.

  1. When the DAG starts the Step Functions state machine, verify the status on the Step Functions console.

When the DAG starts the Step Functions state machine, verify the status on the Step Functions console.

  1. You can also monitor ETL process completion from the Airflow UI.

You can also monitor ETL process completion from the Airflow UI.

  1. On the Airflow UI, verify the completion from the log entries.

On the Airflow UI, verify the completion from the log entries.

Querying the data

After the successful completion of the Airflow DAG, two tables are created in the AWS Glue Data Catalog. To query the data with Amazon Athena, complete the following steps:

  1. On the Athena console, choose Databases.
  2. Select the mwaa-movielens-demo-db database.

You should see the two tables. If the tables aren’t listed, verify that the AWS Glue crawler run is complete and that the console is showing the correct Region.

  1. Run the following query:
    SELECT * FROM "mwaa-movielens-demo-db"."most_popular_movies" limit 10;

The following screenshot shows the output.

The following screenshot shows the output.

Cleaning up

To clean up the resources created as part of our CloudFormation template, delete the mwaa-demo-foundations stack. You can either use the AWS CloudFormation console or the AWS Command Line Interface (AWS CLI).

Conclusion

In this post, we used Amazon MWAA to orchestrate an ETL pipeline on Amazon EMR and AWS Glue with Step Functions. We created an Airflow DAG to demonstrate how to run data processing jobs concurrently and extended the DAG to start a Step Functions state machine to build a complex ETL pipeline. A custom Airflow operator submitted and then monitored the Amazon EMR steps synchronously.

If you have comments or feedback, please leave them in the comments section.


About the Author

Dipankar GhosalDipankar Ghosal is a Sr Data Architect at Amazon Web Services and is based out of Minneapolis, MN. He has a focus in analytics and enjoys helping customers solve their unique use cases. When he’s not working, he loves going hiking with his wife and daughter.

Introducing Amazon Managed Workflows for Apache Airflow (MWAA)

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/introducing-amazon-managed-workflows-for-apache-airflow-mwaa/

As the volume and complexity of your data processing pipelines increase, you can simplify the overall process by decomposing it into a series of smaller tasks and coordinate the execution of these tasks as part of a workflow. To do so, many developers and data engineers use Apache Airflow, a platform created by the community to programmatically author, schedule, and monitor workflows. With Airflow you can manage workflows as scripts, monitor them via the user interface (UI), and extend their functionality through a set of powerful plugins. However, manually installing, maintaining, and scaling Airflow, and at the same time handling security, authentication, and authorization for its users takes much of the time you’d rather use to focus on solving actual business problems.

For these reasons, I am happy to announce the availability of Amazon Managed Workflows for Apache Airflow (MWAA), a fully managed service that makes it easy to run open-source versions of Apache Airflow on AWS, and to build workflows to execute your extract-transform-load (ETL) jobs and data pipelines.

Airflow workflows retrieve input from sources like Amazon Simple Storage Service (S3) using Amazon Athena queries, perform transformations on Amazon EMR clusters, and can use the resulting data to train machine learning models on Amazon SageMaker. Workflows in Airflow are authored as Directed Acyclic Graphs (DAGs) using the Python programming language.

A key benefit of Airflow is its open extensibility through plugins which allows you to create tasks that interact with AWS or on-premise resources required for your workflows including AWS Batch, Amazon CloudWatch, Amazon DynamoDB, AWS DataSync, Amazon ECS and AWS Fargate, Amazon Elastic Kubernetes Service (EKS), Amazon Kinesis Firehose, AWS Glue, AWS Lambda, Amazon Redshift, Amazon Simple Queue Service (SQS), and Amazon Simple Notification Service (SNS).

To improve observability, Airflow metrics can be published as CloudWatch Metrics, and logs can be sent to CloudWatch Logs. Amazon MWAA provides automatic minor version upgrades and patches by default, with an option to designate a maintenance window in which these upgrades are performed.

You can use Amazon MWAA with these three steps:

  1. Create an environment – Each environment contains your Airflow cluster, including your scheduler, workers, and web server.
  2. Upload your DAGs and plugins to S3 – Amazon MWAA loads the code into Airflow automatically.
  3. Run your DAGs in Airflow – Run your DAGs from the Airflow UI or command line interface (CLI) and monitor your environment with CloudWatch.

Let’s see how this works in practice!

How to Create an Airflow Environment Using Amazon MWAA
In the Amazon MWAA console, I click on Create environment. I give the environment a name and select the Airflow version to use.

Then, I select the S3 bucket and the folder to load my DAG code. The bucket name must start with airflow-.

Optionally, I can specify a plugins file and a requirements file:

  • The plugins file is a ZIP file containing the plugins used by my DAGs.
  • The requirements file describes the Python dependencies to run my DAGs.

For plugins and requirements, I can select the S3 object version to use. In case the plugins or the requirements I use create a non-recoverable error in my environment, Amazon MWAA will automatically roll back to the previous working version.


I click Next to configure the advanced settings, starting with networking. Each environment runs in a Amazon Virtual Private Cloud using private subnets in two availability zones. Web server access to the Airflow UI is always protected by a secure login using AWS Identity and Access Management (IAM). However, you can choose to have web server access on a public network so that you can login over the Internet, or on a private network in your VPC. For simplicity, I select a Public network. I let Amazon MWAA create a new security group with the correct inbound and outbound rules. Optionally, I can add one or more existing security groups to fine-tune control of inbound and outbound traffic for your environment.

Now, I configure my environment class. Each environment includes a scheduler, a web server, and a worker. Workers automatically scale up and down according to my workload. We provide you a suggestion on which class to use based on the number of DAGs, but you can monitor the load on your environment and modify its class at any time.

Encryption is always enabled for data at rest, and while I can select a customized key managed by AWS Key Management Service (KMS) I will instead keep the default key that AWS owns and manages on my behalf.

For monitoring, I publish environment performance to CloudWatch Metrics. This is enabled by default, but I can disable CloudWatch Metrics after launch. For the logs, I can specify the log level and which Airflow components should send their logs to CloudWatch Logs. I leave the default to send only the task logs and use log level INFO.

I can modify the default settings for Airflow configuration options, such as default_task_retries or worker_concurrency. For now, I am not changing these values.

Finally, but most importantly, I configure the permissions that will be used by my environment to access my DAGs, write logs, and run DAGs accessing other AWS resources. I select Create a new role and click on Create environment. After a few minutes, the new Airflow environment is ready to be used.

Using the Airflow UI
In the Amazon MWAA console, I look for the new environment I just created and click on Open Airflow UI. A new browser window is created and I am authenticated with a secure login via AWS IAM.

There, I look for a DAG that I put on S3 in the movie_list_dag.py file. The DAG is downloading the MovieLens dataset, processing the files on S3 using Amazon Athena, and loading the result to a Redshift cluster, creating the table if missing.

Here’s the full source code of the DAG:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators import HttpSensor, S3KeySensor
from airflow.contrib.operators.aws_athena_operator import AWSAthenaOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
from io import StringIO
from io import BytesIO
from time import sleep
import csv
import requests
import json
import boto3
import zipfile
import io
s3_bucket_name = 'my-bucket'
s3_key='files/'
redshift_cluster='redshift-cluster-1'
redshift_db='dev'
redshift_dbuser='awsuser'
redshift_table_name='movie_demo'
test_http='https://grouplens.org/datasets/movielens/latest/'
download_http='http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'
athena_db='demo_athena_db'
athena_results='athena-results/'
create_athena_movie_table_query="""
CREATE EXTERNAL TABLE IF NOT EXISTS Demo_Athena_DB.ML_Latest_Small_Movies (
  `movieId` int,
  `title` string,
  `genres` string 
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = ',',
  'field.delim' = ','
) LOCATION 's3://pinwheeldemo1-pinwheeldagsbucketfeed0594-1bks69fq0utz/files/ml-latest-small/movies.csv/ml-latest-small/'
TBLPROPERTIES (
  'has_encrypted_data'='false',
  'skip.header.line.count'='1'
); 
"""
create_athena_ratings_table_query="""
CREATE EXTERNAL TABLE IF NOT EXISTS Demo_Athena_DB.ML_Latest_Small_Ratings (
  `userId` int,
  `movieId` int,
  `rating` int,
  `timestamp` bigint 
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = ',',
  'field.delim' = ','
) LOCATION 's3://pinwheeldemo1-pinwheeldagsbucketfeed0594-1bks69fq0utz/files/ml-latest-small/ratings.csv/ml-latest-small/'
TBLPROPERTIES (
  'has_encrypted_data'='false',
  'skip.header.line.count'='1'
); 
"""
create_athena_tags_table_query="""
CREATE EXTERNAL TABLE IF NOT EXISTS Demo_Athena_DB.ML_Latest_Small_Tags (
  `userId` int,
  `movieId` int,
  `tag` int,
  `timestamp` bigint 
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = ',',
  'field.delim' = ','
) LOCATION 's3://pinwheeldemo1-pinwheeldagsbucketfeed0594-1bks69fq0utz/files/ml-latest-small/tags.csv/ml-latest-small/'
TBLPROPERTIES (
  'has_encrypted_data'='false',
  'skip.header.line.count'='1'
); 
"""
join_tables_athena_query="""
SELECT REPLACE ( m.title , '"' , '' ) as title, r.rating
FROM demo_athena_db.ML_Latest_Small_Movies m
INNER JOIN (SELECT rating, movieId FROM demo_athena_db.ML_Latest_Small_Ratings WHERE rating > 4) r on m.movieId = r.movieId
"""
def download_zip():
    s3c = boto3.client('s3')
    indata = requests.get(download_http)
    n=0
    with zipfile.ZipFile(io.BytesIO(indata.content)) as z:       
        zList=z.namelist()
        print(zList)
        for i in zList: 
            print(i) 
            zfiledata = BytesIO(z.read(i))
            n += 1
            s3c.put_object(Bucket=s3_bucket_name, Key=s3_key+i+'/'+i, Body=zfiledata)
def clean_up_csv_fn(**kwargs):    
    ti = kwargs['task_instance']
    queryId = ti.xcom_pull(key='return_value', task_ids='join_athena_tables' )
    print(queryId)
    athenaKey=athena_results+"join_athena_tables/"+queryId+".csv"
    print(athenaKey)
    cleanKey=athena_results+"join_athena_tables/"+queryId+"_clean.csv"
    s3c = boto3.client('s3')
    obj = s3c.get_object(Bucket=s3_bucket_name, Key=athenaKey)
    infileStr=obj['Body'].read().decode('utf-8')
    outfileStr=infileStr.replace('"e"', '') 
    outfile = StringIO(outfileStr)
    s3c.put_object(Bucket=s3_bucket_name, Key=cleanKey, Body=outfile.getvalue())
def s3_to_redshift(**kwargs):    
    ti = kwargs['task_instance']
    queryId = ti.xcom_pull(key='return_value', task_ids='join_athena_tables' )
    print(queryId)
    athenaKey='s3://'+s3_bucket_name+"/"+athena_results+"join_athena_tables/"+queryId+"_clean.csv"
    print(athenaKey)
    sqlQuery="copy "+redshift_table_name+" from '"+athenaKey+"' iam_role 'arn:aws:iam::163919838948:role/myRedshiftRole' CSV IGNOREHEADER 1;"
    print(sqlQuery)
    rsd = boto3.client('redshift-data')
    resp = rsd.execute_statement(
        ClusterIdentifier=redshift_cluster,
        Database=redshift_db,
        DbUser=redshift_dbuser,
        Sql=sqlQuery
    )
    print(resp)
    return "OK"
def create_redshift_table():
    rsd = boto3.client('redshift-data')
    resp = rsd.execute_statement(
        ClusterIdentifier=redshift_cluster,
        Database=redshift_db,
        DbUser=redshift_dbuser,
        Sql="CREATE TABLE IF NOT EXISTS "+redshift_table_name+" (title	character varying, rating	int);"
    )
    print(resp)
    return "OK"
DEFAULT_ARGS = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False 
}
with DAG(
    dag_id='movie-list-dag',
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=2),
    start_date=days_ago(2),
    schedule_interval='*/10 * * * *',
    tags=['athena','redshift'],
) as dag:
    check_s3_for_key = S3KeySensor(
        task_id='check_s3_for_key',
        bucket_key=s3_key,
        wildcard_match=True,
        bucket_name=s3_bucket_name,
        s3_conn_id='aws_default',
        timeout=20,
        poke_interval=5,
        dag=dag
    )
    files_to_s3 = PythonOperator(
        task_id="files_to_s3",
        python_callable=download_zip
    )
    create_athena_movie_table = AWSAthenaOperator(task_id="create_athena_movie_table",query=create_athena_movie_table_query, database=athena_db, output_location='s3://'+s3_bucket_name+"/"+athena_results+'create_athena_movie_table')
    create_athena_ratings_table = AWSAthenaOperator(task_id="create_athena_ratings_table",query=create_athena_ratings_table_query, database=athena_db, output_location='s3://'+s3_bucket_name+"/"+athena_results+'create_athena_ratings_table')
    create_athena_tags_table = AWSAthenaOperator(task_id="create_athena_tags_table",query=create_athena_tags_table_query, database=athena_db, output_location='s3://'+s3_bucket_name+"/"+athena_results+'create_athena_tags_table')
    join_athena_tables = AWSAthenaOperator(task_id="join_athena_tables",query=join_tables_athena_query, database=athena_db, output_location='s3://'+s3_bucket_name+"/"+athena_results+'join_athena_tables')
    create_redshift_table_if_not_exists = PythonOperator(
        task_id="create_redshift_table_if_not_exists",
        python_callable=create_redshift_table
    )
    clean_up_csv = PythonOperator(
        task_id="clean_up_csv",
        python_callable=clean_up_csv_fn,
        provide_context=True     
    )
    transfer_to_redshift = PythonOperator(
        task_id="transfer_to_redshift",
        python_callable=s3_to_redshift,
        provide_context=True     
    )
    check_s3_for_key >> files_to_s3 >> create_athena_movie_table >> join_athena_tables >> clean_up_csv >> transfer_to_redshift
    files_to_s3 >> create_athena_ratings_table >> join_athena_tables
    files_to_s3 >> create_athena_tags_table >> join_athena_tables
    files_to_s3 >> create_redshift_table_if_not_exists >> transfer_to_redshift

In the code, different tasks are created using operators like PythonOperator, for generic Python code, or AWSAthenaOperator, to use the integration with Amazon Athena. To see how those tasks are connected in the workflow, you can see the latest few lines, that I repeat here (without indentation) for simplicity:

check_s3_for_key >> files_to_s3 >> create_athena_movie_table >> join_athena_tables >> clean_up_csv >> transfer_to_redshift
files_to_s3 >> create_athena_ratings_table >> join_athena_tables
files_to_s3 >> create_athena_tags_table >> join_athena_tables
files_to_s3 >> create_redshift_table_if_not_exists >> transfer_to_redshift

The Airflow code is overloading the right shift >> operator in Python to create a dependency, meaning that the task on the left should be executed first, and the output passed to the task on the right. Looking at the code, this is quite easy to read. Each of the four lines above is adding dependencies, and they are all evaluated together to execute the tasks in the right order.

In the Airflow console, I can see a graph view of the DAG to have a clear representation of how tasks are executed:

Available Now
Amazon Managed Workflows for Apache Airflow (MWAA) is available today in US East (Northern Virginia), US West (Oregon), US East (Ohio), Asia Pacific (Singapore), Asia Pacific (Toyko), Asia Pacific (Sydney), Europe (Ireland), Europe (Frankfurt), and Europe (Stockholm). You can launch a new Amazon MWAA environment from the console, AWS Command Line Interface (CLI), or AWS SDKs. Then, you can develop workflows in Python using Airflow’s ecosystem of integrations.

With Amazon MWAA, you pay based on the environment class and the workers you use. For more information, see the pricing page.

Upstream compatibility is a core tenet of Amazon MWAA. Our code changes to the AirFlow platform are released back to open source.

With Amazon MWAA you can spend more time building workflows for your engineering and data science tasks, and less time managing and scaling the infrastructure of your Airflow platform.

Learn more about Amazon MWAA and get started today!

Danilo