All posts by Chandan Rupakheti

Introducing simplified interaction with the Airflow REST API in Amazon MWAA

Post Syndicated from Chandan Rupakheti original https://aws.amazon.com/blogs/big-data/introducing-simplified-interaction-with-the-airflow-rest-api-in-amazon-mwaa/

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a fully managed service that builds upon Apache Airflow, offering its benefits while eliminating the need for you to set up, operate, and maintain the underlying infrastructure, reducing operational overhead while increasing security and resilience.

Today, we are excited to announce an enhancement to the Amazon MWAA integration with the Airflow REST API. This improvement streamlines the ability to access and manage your Airflow environments and their integration with external systems, and allows you to interact with your workflows programmatically. The Airflow REST API facilitates a wide range of use cases, from centralizing and automating administrative tasks to building event-driven, data-aware data pipelines.

In this post, we discuss the enhancement and present several use cases that the enhancement unlocks for your Amazon MWAA environment.

Airflow REST API

The Airflow REST API is a programmatic interface that allows you to interact with Airflow’s core functionalities. It’s a set of HTTP endpoints to perform operations such as invoking Directed Acyclic Graphs (DAGs), checking task statuses, retrieving metadata about workflows, managing connections and variables, and even initiating dataset-related events, without directly accessing the Airflow web interface or command line tools.

Before today, Amazon MWAA provided the foundation for interacting with the Airflow REST API. Though functional, the process of obtaining and managing access tokens and session cookies added complexity to the workflow. Amazon MWAA now supports a simplified mechanism for interacting with the Airflow REST API using AWS credentials, significantly reducing complexity and improving overall usability.

Enhancement overview

The new InvokeRestApi capability allows you to run Airflow REST API requests with a valid SigV4 signature using your existing AWS credentials. This feature is now available to all Amazon MWAA environments (2.4.3+) in supported Amazon MWAA AWS Regions. By acting as an intermediary, this REST API processes requests on behalf of users, requiring only the environment name and API request payload as inputs.

Integrating with the Airflow REST API through the enhanced Amazon MWAA API provides several key benefits:

  • Simplified integration – The new InvokeRestApi capability in Amazon MWAA removes the complexity of managing access tokens and session cookies, making it straightforward to interact with the Airflow REST API.
  • Improved usability – By acting as an intermediary, the enhanced API delivers Airflow REST API execution results directly to the client, reducing complexity and improving overall usability.
  • Automated administration – The simplified REST API access enables automating various administrative and management tasks, such as managing Airflow variables, connections, slot pools, and more.
  • Event-driven architectures – The enhanced API facilitates seamless integration with external events, enabling the triggering of Airflow DAGs based on these events. This supports the growing emphasis on event-driven data pipelines.
  • Data-aware scheduling – Using the dataset-based scheduling feature in Airflow, the enhanced API enables the Amazon MWAA environment to manage the incoming workload and scale resources accordingly, improving the overall reliability and efficiency of event-driven pipelines.

In the following sections, we demonstrate how to use the enhanced API in various use cases.

How to use the enhanced Amazon MWAA API

The following code snippet shows the general request format for the enhanced REST API:

POST /restapi/Name HTTP/1.1
Content-type: application/json

{
    Name: String,
    Method: String,
    Path: String,
    QueryParameters: Json,
    Body: Json
}

The Name of the Amazon MWAA environment, the Path of the Airflow REST API endpoint to be called, and the HTTP Method to use are the required parameters, whereas QueryParameters and Body are optional and can be used as needed in the API calls.

The following code snippet shows the general response format:

{
    RestApiStatusCode: Number,
    RestApiResponse: Json
}

The RestApiStatusCode represents the HTTP status code returned by the Airflow REST API call, and the RestApiResponse contains the response payload from the Airflow REST API.

The following sample code snippet showcases how to update the description field of an Airflow variable using the enhanced integration. The call uses the AWS Python SDK to invoke the Airflow REST API for the task.

import boto3

# Create a boto3 client
mwaa_client = boto3.client("mwaa")

# Call the enhanced REST API using boto3 client
# Using QueryParameters, you can selectively specify the field to be updated
# Without QueryParameters, all fields will be updated
response = mwaa_client.invoke_rest_api(
    Name="<your-environment-name>",
    Method="PATCH",
    Path=f"/variables/<your-variable-key>",
    Body={
        "key": "<key>",
        "value": "<value>",
        "description": "<description>"
    },
    QueryParameters={
        "update_mask": ["description"]
    }
)

# Access the outputs of the REST call
status_code = response["RestApiStatusCode"]
result = response['RestApiResponse']

To make the invoke_rest_api SDK call, the calling client should have an AWS Identity and Access Management (IAM) principal of airflow:InvokeRestAPI attached to call the requisite environment. The permission can be scoped to specific Airflow roles (Admin, Op, User, Viewer, or Public) to control access levels.

This simple yet powerful REST API supports various use cases for your Amazon MWAA environments. Let’s review some important ones in the subsequent sections.

Automate administration and management tasks

Prior to this launch, to automate configurations and setup of resources such as variables, connections, slot pools, and more, you had to develop a lengthy boilerplate code to make API requests to the Amazon MWAA web servers. You had to handle the cookie and session management in the process. You can simplify this automation with the new enhanced REST API support.

For this example, let’s assume you want to automate maintaining your Amazon MWAA environment variables. You need to perform API operations such as create, read, update, and delete on Airflow variables to achieve this task. The following is a simple Python client to do so (mwaa_variables_client.py):

import boto3

# Client for managing MWAA environment variables
class MWAAVariablesClient:
    # Initialize the client with environment name and optional MWAA boto3 client
    def __init__(self, env_name, mwaa_client=None):
        self.env_name = env_name
        self.client = mwaa_client or boto3.client("mwaa")

    # List all variables in the MWAA environment
    def list(self):
        response = self.client.invoke_rest_api(
            Name=self.env_name,
            Method="GET",
            Path="/variables"
        )

        output = response['RestApiResponse']['variables']
        return output

    # Get a specific variable by key
    def get(self, key):
        response = self.client.invoke_rest_api(
            Name=self.env_name,
            Method="GET",
            Path=f"/variables/{key}"
        )

        return response['RestApiResponse']

    # Create a new variable with key, value, and optional description
    def create(self, key, value, description=None):
        response = self.client.invoke_rest_api(
            Name=self.env_name,
            Method="POST",
            Path="/variables",
            Body={
                "key": key,
                "value": value,
                "description": description
            }
        )

        return response['RestApiResponse']
    
    # Update an existing variable's value and description
    def update(self, key, value, description, query_parameters=None):
        response = self.client.invoke_rest_api(
            Name=self.env_name,
            Method="PATCH",
            Path=f"/variables/{key}",
            Body={
                "key": key,
                "value": value,
                "description": description
            },
            QueryParameters=query_parameters
        )

        return response['RestApiResponse']

    # Delete a variable by key
    def delete(self, key):
        response = self.client.invoke_rest_api(
            Name=self.env_name,
            Method="DELETE",
            Path=f"/variables/{key}"
        )
        return response['RestApiStatusCode']

if __name__ == "__main__":
    client = MWAAVariablesClient("<your-mwaa-environment-name>")

    print("\nCreating a test variable ...")
    response = client.create(
        key="test",
        value="Test value",
        description="Test description"
    )
    print(response)

    print("\nListing all variables ...")
    variables = client.list()
    print(variables)

    print("\nGetting the test variable ...")
    response = client.get("test")
    print(response)

    print("\nUpdating the value and description of test variable ...")
    response = client.update(
        key="test",
        value="Updated Value",
        description="Updated description"
    )
    print(response)

    print("\nUpdating only description of test variable ...")
    response = client.update(
        key="test", 
        value="Updated Value", 
        description="Yet another updated description", 
        query_parameters={ "update_mask": ["description"] }
    )
    print(response)

    print("\nDeleting the test variable ...")
    response_code = client.delete("test")
    print(f"Response code: {response_code}")

    print("\nFinally, getting the deleted test variable ...")
    try:
        response = client.get("test")
        print(response)
    except Exception as e:
        print(e.response["RestApiResponse"])

Assuming that you have configured your terminal with appropriate AWS credentials, you can run the preceding Python script to achieve the following results:

$python mwaa_variables_client.py 

Creating a test variable ...
{'description': 'Test description', 'key': 'test', 'value': 'Test value'}

Listing all variables ...
[{'key': 'test', 'value': 'Test value'}]

Getting the test variable ...
{'key': 'test', 'value': 'Test value'}

Updating the value and description of test variable ...
{'description': 'Updated description', 'key': 'test', 'value': 'Updated Value'}

Updating only description of test variable ...
{'description': 'Yet another updated description', 'key': 'test', 'value': 'Updated Value'}

Deleting the test variable ...
Response code: 204

Finally, getting the deleted test variable ...
{'detail': 'Variable does not exist', 'status': 404, 'title': 'Variable not found', 'type': 'https://airflow.apache.org/docs/apache-airflow/2.8.1/stable-rest-api-ref.html#section/Errors/NotFound'}

Let’s further explore other useful use cases.

Build event-driven data pipelines

The Airflow community has been actively innovating to enhance the platform’s data awareness, enabling you to build more dynamic and responsive workflows. When we announced support for version 2.9.2 in Amazon MWAA, we introduced capabilities that allow pipelines to react to changes in datasets, both within Airflow environments and in external systems. The new simplified integration with the Airflow REST API makes the implementation of data-driven pipelines more straightforward.

Consider a use case where you need to run a pipeline that uses input from an external event. The following sample DAG runs a bash command supplied as a parameter (any_bash_command.py):

"""
This DAG allows you to execute a bash command supplied as a parameter to the DAG.
The command is passed as a parameter called `command` in the DAG configuration.
"""

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models.param import Param

from datetime import datetime

with DAG(
    dag_id="any_bash_command", 
    schedule=None, 
    start_date=datetime(2022, 1, 1), 
    catchup=False,
    params={
        "command": Param("env", type="string")
    },
) as dag:
    cli_command = BashOperator(
        task_id="triggered_bash_command",
        bash_command="{{ dag_run.conf['command'] }}"
    )

With the help of the enhanced REST API, you can create a client that can invoke this DAG, supplying the bash command of your choice as follows (mwaa_dag_run_client.py):

import boto3

# Client for triggering DAG runs in Amazon MWAA
class MWAADagRunClient:
    # Initialize the client with MWAA environment name and optional MWAA boto3 client
    def __init__(self, env_name, mwaa_client=None):
        self.env_name = env_name
        self.client = mwaa_client or boto3.client("mwaa")

    # Trigger a DAG run with specified parameters
    def trigger_run(self, 
            dag_id, 
            dag_run_id=None,
            logical_date=None,
            data_interval_start=None,
            data_interval_end=None,
            note=None,
            conf=None,
    ):
        body = {}
        if dag_run_id:
            body["dag_run_id"] = dag_run_id
        if logical_date:
            body["logical_date"] = logical_date
        if data_interval_start:
            body["data_interval_start"] = data_interval_start
        if data_interval_end:
            body["data_interval_end"] = data_interval_end
        if note:
            body["note"] = note
        body["conf"] = conf or {}            

        response = self.client.invoke_rest_api(
            Name=self.env_name,
            Method="POST",
            Path=f"/dags/{dag_id}/dagRuns",
            Body=body
        )
        return response['RestApiResponse']

if __name__ == "__main__":
    client = MWAADagRunClient("<your-mwaa-environment-name>")
	
    print("\nTriggering a dag run ...")
    result = client.trigger_run(
        dag_id="any_bash_command", 
        conf={
            "command": "echo 'Hello from external trigger!'"
        }
    )
    print(result)

The following snippet shows a sample run of the script:

$python mwaa_dag_run_client.py
Triggering a dag run ...
{'conf': {'command': "echo 'Hello from external trigger!'"}, 'dag_id': 'any_bash_command', 'dag_run_id': 'manual__2024-10-21T16:56:09.852908+00:00', 'data_interval_end': '2024-10-21T16:56:09.852908+00:00', 'data_interval_start': '2024-10-21T16:56:09.852908+00:00', 'execution_date': '2024-10-21T16:56:09.852908+00:00', 'external_trigger': True, 'logical_date': '2024-10-21T16:56:09.852908+00:00', 'run_type': 'manual', 'state': 'queued'}

On the Airflow UI, the trigger_bash_command task shows the following execution log:

[2024-10-21, 16:56:12 UTC] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2024-10-21, 16:56:12 UTC] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2024-10-21, 16:56:12 UTC] {subprocess.py:75} INFO - Running command: ['/usr/bin/bash', '-c', "echo 'Hello from external trigger!'"]
[2024-10-21, 16:56:12 UTC] {subprocess.py:86} INFO - Output:
[2024-10-21, 16:56:12 UTC] {subprocess.py:93} INFO - Hello from external trigger!
[2024-10-21, 16:56:12 UTC] {subprocess.py:97} INFO - Command exited with return code 0
[2024-10-21, 16:56:12 UTC] {taskinstance.py:340} ▶ Post task execution logs

You can further expand this example to more useful event-driven architectures. Let’s expand the use case to run your data pipeline and perform extract, transform, and load (ETL) jobs when a new file lands in an Amazon Simple Storage Service (Amazon S3) bucket in your data lake. The following diagram illustrates one architectural approach.

In the context of invoking a DAG through an external input, the AWS Lambda function would have no knowledge of how busy the Amazon MWAA web server is, potentially leading to the function overwhelming the Amazon MWAA web server by processing a large number of files in a short timeframe.

One way to regulate the file processing throughput would be to introduce an Amazon Simple Queue Service (Amazon SQS) queue between the S3 bucket and the Lambda function, which can help with rate limiting the API requests to the web server. You can achieve this by configuring maximum concurrency for Lambda for the SQS event source. However, the Lambda function would still be unaware of the processing capacity available in the Amazon MWAA environment to run the DAGs.

In addition to the SQS queue, to help afford the Amazon MWAA environment manage the load natively, you can use the Airflow’s data-aware scheduling feature using datasets. This approach involves using the enhanced Amazon MWAA REST API to create dataset events, which are then used by the Airflow scheduler to schedule the DAG natively. This way, the Amazon MWAA environment can effectively batch the dataset events and scale resources based on the load. Let’s explore this approach in more detail.

Configure data-aware scheduling

Consider the following DAG that showcases a framework for an ETL pipeline (data_aware_pipeline.py). It uses a dataset for scheduling.

"""
DAG to run the given ETL pipeline based on the datalake dataset event.
"""
from airflow import DAG, Dataset
from airflow.decorators import task

from datetime import datetime

# Create a dataset
datalake = Dataset("datalake")

# Return a list of S3 file URIs from the supplied dataset events 
def get_resources(dataset_uri, triggering_dataset_events=None):
    events = triggering_dataset_events[dataset_uri] if triggering_dataset_events else []
    s3_uris = list(map(lambda e: e.extra["uri"], events))
    return s3_uris

with DAG(
    dag_id="data_aware_pipeline",
    schedule=[datalake],
    start_date=datetime(2022, 1, 1), 
    catchup=False
):
    @task
    def extract(triggering_dataset_events=None):
        resources = get_resources("datalake", triggering_dataset_events)
        for resource in resources:
            print(f"Running data extraction for {resource} ...")

    @task
    def transform(triggering_dataset_events=None):
        resources = get_resources("datalake", triggering_dataset_events)
        for resource in resources:
            print(f"Running data transformation for {resource} ...")

    @task()
    def load(triggering_dataset_events=None):
        resources = get_resources("datalake", triggering_dataset_events)
        for resource in resources:
            print(f"Loading finalized data for {resource} ...")
    
    extract() >> transform() >> load()

In the preceding code snippet, a Dataset object called datalake is used to schedule the DAG. The get_resources function extracts the extra information that contains the locations of the newly added files in the S3 data lake. Upon receiving dataset events, the Amazon MWAA environment batches the dataset events based on the load and schedules the DAG to handle them appropriately. The modified architecture to support the data-aware scheduling is presented below.

The following is a simplified client that can create a dataset event through the enhanced REST API (mwaa_dataset_client.py):

import boto3

# Client for interacting with MWAA datasets
class MWAADatasetClient:
    # Initialize the client with environment name and optional MWAA boto3 client
    def __init__(self, env_name, mwaa_client=None):
        self.env_name = env_name
        self.client = mwaa_client or boto3.client("mwaa")

    # Create a dataset event in the MWAA environment
    def create_event(self, dataset_uri, extra=None):
        body = {
            "dataset_uri": dataset_uri
        }
        if extra:
            body["extra"] = extra

        response = self.client.invoke_rest_api(
            Name=self.env_name,
            Method="POST",
            Path="/datasets/events",
            Body=body
        )
        return response['RestApiResponse']

The following is a code snippet for the Lambda function in the preceding architecture to generate the dataset event, assuming the function is configured to handle one S3 PUT event at a time (dataset_event_lambda.py):

import os
import json

from mwaa_dataset_client import MWAADatasetClient

environment = os.environ["MWAA_ENV_NAME"]
client = MWAADatasetClient(environment)

def handler(event, context):
    # Extract S3 file URI from SQS record
    record = event["Records"][0]
    bucket = record["s3"]["bucket"]["name"]
    key = record["s3"]["object"]["key"]
    s3_file_uri = f"s3://{bucket}/{key}"

    # Create a dataset event
    result = client.create_event(
        dataset_uri="datalake",
        extra={"uri": s3_file_uri}
    )

    return {
        "statusCode": 200,
        "body": json.dumps(result)
    }

As new files get dropped into the S3 bucket, the Lambda function will generate a dataset event per file, passing in the Amazon S3 location of the newly added files. The Amazon MWAA environment will schedule the ETL pipeline upon receiving the dataset events. The following diagram illustrates a sample run of the ETL pipeline on the Airflow UI.

The following snippet shows the execution log of the extract task from the pipeline. The log shows how the Airflow scheduler batched three dataset events together to handle the load.

[2024-10-21, 16:47:15 UTC] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2024-10-21, 16:47:15 UTC] {logging_mixin.py:190} INFO - Running data extraction for s3://example-bucket/path/to/file1.csv ...
[2024-10-21, 16:47:15 UTC] {logging_mixin.py:190} INFO - Running data extraction for s3://example-bucket/path/to/file2.csv ...
[2024-10-21, 16:47:15 UTC] {logging_mixin.py:190} INFO - Running data extraction for s3://example-bucket/path/to/file3.csv ...
[2024-10-21, 16:47:15 UTC] {python.py:240} INFO - Done. Returned value was: None
[2024-10-21, 16:47:16 UTC] {taskinstance.py:340} ▶ Post task execution logs

In this way, you can use the enhanced REST API to create data-aware, event-driven pipelines.

Considerations

When implementing solutions using the enhanced Amazon MWAA REST API, it’s important to consider the following:

  • IAM permissions – Make sure the IAM principal making the invoke_rest_api SDK call has the airflow:InvokeRestAPI permission on the Amazon MWAA resource. To control access levels, the permission can be scoped to specific Airflow roles (Admin, Op, User, Viewer, or Public).
  • Error handling – Implement robust error handling mechanisms to handle various HTTP status codes and error responses from the Airflow REST API.
  • Monitoring and logging – Set up appropriate monitoring and logging to track the performance and reliability of your API-based integrations and data pipelines.
  • Versioning and compatibility – Monitor the versioning of the Airflow REST API and the Amazon MWAA service to make sure your integrations remain compatible with any future changes.
  • Security and compliance – Adhere to your organization’s security and compliance requirements when integrating external systems with Amazon MWAA and handling sensitive data.

You can start using the simplified integration with the Airflow REST API in your Amazon MWAA environments with Airflow version 2.4.3 or greater, in all currently supported Regions.

Conclusion

The enhanced integration between Amazon MWAA and the Airflow REST API represents a significant improvement in the ease of interacting with Airflow’s core functionalities. This new capability opens up a wide range of use cases, from centralizing and automating administrative tasks, improving overall usability, to building event-driven, data-aware data pipelines.

As you explore this new feature, consider the various use cases and best practices outlined in this post. By using the new InvokeRestApi, you can streamline your data management processes, enhance operational efficiency, and drive greater value from your data-driven systems.


About the Authors

Chandan Rupakheti is a Senior Solutions Architect at AWS. His main focus at AWS lies in the intersection of analytics, serverless, and AdTech services. He is a passionate technical leader, researcher, and mentor with a knack for building innovative solutions in the cloud. Outside of his professional life, he loves spending time with his family and friends, and listening to and playing music.

Hernan Garcia is a Senior Solutions Architect at AWS based out of Amsterdam. He has worked in the financial services industry since 2018, specializing in application modernization and supporting customers in their adoption of the cloud with a focus on serverless technologies.

Disaster recovery strategies for Amazon MWAA – Part 2

Post Syndicated from Chandan Rupakheti original https://aws.amazon.com/blogs/big-data/disaster-recovery-strategies-for-amazon-mwaa-part-2/

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a fully managed orchestration service that makes it straightforward to run data processing workflows at scale. Amazon MWAA takes care of operating and scaling Apache Airflow so you can focus on developing workflows. However, although Amazon MWAA provides high availability within an AWS Region through features like Multi-AZ deployment of Airflow components, recovering from a Regional outage requires a multi-Region deployment.

In Part 1 of this series, we highlighted challenges for Amazon MWAA disaster recovery and discussed best practices to improve resiliency. In particular, we discussed two key strategies: backup and restore and warm standby. In this post, we dive deep into the implementation for both strategies and provide a deployable solution to realize the architectures in your own AWS account.

The solution for this post is hosted on GitHub. The README in the repository offers tutorials as well as further workflow details for both backup and restore and warm standby strategies.

Backup and restore architecture

The backup and restore strategy involves periodically backing up Amazon MWAA metadata to Amazon Simple Storage Service (Amazon S3) buckets in the primary Region. The backups are replicated to an S3 bucket in the secondary Region. In case of a failure in the primary Region, a new Amazon MWAA environment is created in the secondary Region and hydrated with the backed-up metadata to restore the workflows.

The project uses the AWS Cloud Development Kit (AWS CDK) and is set up like a standard Python project. Refer to the detailed deployment steps in the README file to deploy it in your own accounts.

The following diagram shows the architecture of the backup and restore strategy and its key components:

  • Primary Amazon MWAA environment – The environment in the primary Region hosts the workflows
  • Metadata backup bucket – The bucket in the primary Region stores periodic backups of Airflow metadata tables
  • Replicated backup bucket – The bucket in the secondary Region syncs metadata backups through Amazon S3 cross-Region replication
  • Secondary Amazon MWAA environment – This environment is created on-demand during recovery in the secondary Region
  • Backup workflow – This workflow periodically backups up Airflow metadata to the S3 buckets in the primary Region
  • Recovery workflow – This workflow monitors the primary Amazon MWAA environment and initiates failover when needed in the secondary Region

 

The backup restore architecture

Figure 1: The backup restore architecture

There are essentially two workflows that work in conjunction to achieve the backup and restore functionality in this architecture. Let’s explore both workflows in detail and the steps as outlined in Figure 1.

Backup workflow

The backup workflow is responsible for periodically taking a backup of your Airflow metadata tables and storing them in the backup S3 bucket. The steps are as follows:

  • [1.a] You can deploy the provided solution from your continuous integration and delivery (CI/CD) pipeline. The pipeline includes a DAG deployed to the DAGs S3 bucket, which performs backup of your Airflow metadata. This is the bucket where you host all of your DAGs for your environment.
  • [1.b] The solution enables cross-Region replication of the DAGs bucket. Any new changes to the primary Region bucket, including DAG files, plugins, and requirements.txt files, are replicated to the secondary Region DAGs bucket. However, for existing objects, a one-time replication needs to be performed using S3 Batch Replication.
  • [1.c] The DAG deployed to take metadata backup runs periodically. The metadata backup doesn’t include some of the auto-generated tables and the list of tables to be backed up is configurable. By default, the solution backs up variable, connection, slot pool, log, job, DAG run, trigger, task instance, and task fail tables. The backup interval is also configurable and should be based on the Recovery Point Objective (RPO), which is the data loss time during a failure that can be sustained by your business.
  • [1.d] Similar to the DAGs bucket, the backup bucket is also synced using cross-Region replication, through which the metadata backup becomes available in the secondary Region.

Recovery workflow

The recovery workflow runs periodically in the secondary Region monitoring the primary Amazon MWAA environment. It has two functions:

  • Store the environment configuration of the primary Amazon MWAA environment in the secondary backup bucket, which is used to recreate an identical Amazon MWAA environment in the secondary Region during failure
  • Perform the failover when a failure is detected

The following are the steps for when the primary Amazon MWAA environment is healthy (see Figure 1):

  • [2.a] The Amazon EventBridge scheduler starts the AWS Step Functions workflow on a provided schedule.
  • [2.b] The workflow, using AWS Lambda, checks Amazon CloudWatch in the primary Region for the SchedulerHeartbeat metrics of the primary Amazon MWAA environment. The environment in the primary Region sends heartbeats to CloudWatch every 5 seconds by default. However, to not invoke a recovery workflow spuriously, we use a default aggregation period of 5 minutes to check the heartbeat metrics. Therefore, it can take up to 5 minutes to detect a primary environment failure.
  • [2.c] Assuming that the heartbeat was detected in 2.b, the workflow makes the cross-Region GetEnvironment call to the primary Amazon MWAA environment.
  • [2.d] The response from the GetEnvironment call is stored in the secondary backup S3 bucket to be used in case of a failure in the subsequent iterations of the workflow. This makes sure the latest configuration of your primary environment is used to recreate a new environment in the secondary Region. The workflow completes successfully after storing the configuration.

The following are the steps for the case when the primary environment is unhealthy (see Figure 1):

  • [2.a] The EventBridge scheduler starts the Step Functions workflow on a provided schedule.
  • [2.b] The workflow, using Lambda, checks CloudWatch in the primary Region for the scheduler heartbeat metrics and detects failure. The scheduler heartbeat check using the CloudWatch API is the recommended approach to detect failure. However, you can implement a custom strategy for failure detection in the Lambda function such as deploying a DAG to periodically send custom metrics to CloudWatch or other data stores as heartbeats and using the function to check that metrics. With the current CloudWatch-based strategy, the unavailability of the CloudWatch API may spuriously invoke the recovery flow.
  • [2.c] Skipped
  • [2.d] The workflow reads the previously stored environment details from the backup S3 bucket.
  • [2.e] The environment details read from the previous step is used to recreate an identical environment in the secondary Region using the CreateEnvironment API call. The API also needs other secondary Region specific configurations such as VPC, subnets, and security groups that are read from the user-supplied configuration file or environment variables during the solution deployment. The workflow in a polling loop waits until the environment becomes available and invokes the DAG to restore metadata from the backup S3 bucket. This DAG is deployed to the DAGs S3 bucket as a part of the solution deployment.
  • [2.f] The DAG for restoring metadata completes hydrating the newly created environment and notifies the Step Functions workflow of completion using the task token integration. The new environment now starts running the active workflows and the recovery completes successfully.

Considerations

Consider the following when using the backup and restore method:

  • Recovery Time Objective – From failure detection to workflows running in the secondary Region, failover can take over 30 minutes. This includes new environment creation, Airflow startup, and metadata restore.
  • Cost – This strategy avoids the overhead of running a passive environment in the secondary Region. Costs are limited to periodic backup storage, cross-Region data transfer charges, and minimal compute for the recovery workflow.
  • Data loss – The RPO depends on the backup frequency. There is a design trade-off to consider here. Although shorter intervals between backups can minimize potential data loss, too frequent backups can adversely affect the performance of the metadata database and consequently the primary Airflow environment. Also, the solution can’t recover an actively running workflow midway. All active workflows are started fresh in the secondary Region based on the provided schedule.
  • Ongoing management – The Amazon MWAA environment and dependencies are automatically kept in sync across Regions in this architecture. As specified in the Step 1.b of the backup workflow, the DAGs S3 bucket will need a one-time deployment of the existing resources for the solution to work.

Warm standby architecture

The warm standby strategy involves deploying identical Amazon MWAA environments in two Regions. Periodic metadata backups from the primary Region are used to rehydrate the standby environment in case of failover.

The project uses the AWS CDK and is set up like a standard Python project. Refer to the detailed deployment steps in the README file to deploy it in your own accounts.

The following diagram shows the architecture of the warm standby strategy and its key components:

  • Primary Amazon MWAA environment – The environment in the primary Region hosts the workflows during normal operation
  • Secondary Amazon MWAA environment – The environment in the secondary Region acts as a warm standby ready to take over at any time
  • Metadata backup bucket – The bucket in the primary Region stores periodic backups of Airflow metadata tables
  • Replicated backup bucket – The bucket in the secondary Region syncs metadata backups through S3 Cross-Region Replication.
  • Backup workflow – This workflow periodically backups up Airflow metadata to the S3 buckets in both Regions
  • Recovery workflow – This workflow monitors the primary environment and initiates failover to the secondary environment when needed

 

The warm standby architecture

Figure 2: The warm standby architecture

Similar to the backup and restore strategy, the backup workflow (Steps 1a–1d) periodically backups up critical Amazon MWAA metadata to S3 buckets in the primary Region, which is synced in the secondary Region.

The recovery workflow runs periodically in the secondary Region monitoring the primary environment. On failure detection, it initiates the failover procedure. The steps are as follows (see Figure 2):

  • [2.a] The EventBridge scheduler starts the Step Functions workflow on a provided schedule.
  • [2.b] The workflow checks CloudWatch in the primary Region for the scheduler heartbeat metrics and detects failure. If the primary environment is healthy, the workflow completes without further actions.
  • [2.c] The workflow invokes the DAG to restore metadata from the backup S3 bucket.
  • [2.d] The DAG for restoring metadata completes hydrating the passive environment and notifies the Step Functions workflow of completion using the task token integration. The passive environment starts running the active workflows on the provided schedules.

Because the secondary environment is already warmed up, the failover is faster with recovery times in minutes.

Considerations

Consider the following when using the warm standby method:

  • Recovery Time Objective – With a warm standby ready, the RTO can be as low as 5 minutes. This includes just the metadata restore and reenabling DAGs in the secondary Region.
  • Cost – This strategy has an added cost of running similar environments in two Regions at all times. With auto scaling for workers, the warm instance can maintain a minimal footprint; however, the web server and scheduler components of Amazon MWAA will remain active in the secondary environment at all times. The trade-off is significantly lower RTO.
  • Data loss – Similar to the backup and restore model, the RPO depends on the backup frequency. Faster backup cycles minimize potential data loss but can adversely affect performance of the metadata database and consequently the primary Airflow environment.
  • Ongoing management – This approach comes with some management overhead. Unlike the backup and restore strategy, any changes to the primary environment configurations need to be manually reapplied to the secondary environment to keep the two environments in sync. Automated synchronization of the secondary environment configurations is a future work.

Shared considerations

Although the backup and restore and warm standby strategies differ in their implementation, they share some common considerations:

  • Periodically test failover to validate recovery procedures, RTO, and RPO.
  • Enable Amazon MWAA environment logging to help debug issues during failover.
  • Use the AWS CDK or AWS CloudFormation to manage the infrastructure definition. For more details, see the following GitHub repo or Quick start tutorial for Amazon Managed Workflows for Apache Airflow, respectively.
  • Automate deployments of environment configurations and disaster recovery workflows through CI/CD pipelines.
  • Monitor key CloudWatch metrics like SchedulerHeartbeat to detect primary environment failures.

Conclusion

In this series, we discussed how backup and restore and warm standby strategies offer configurable data protection based on your RTO, RPO, and cost requirements. Both use periodic metadata replication and restoration to minimize the area of effect of Regional outages.

Which strategy resonates more with your use case? Feel free to try out our solution and share any feedback or questions in the comments section!


About the Authors

Chandan RupakhetiChandan Rupakheti is a Senior Solutions Architect at AWS. His main focus at AWS lies in the intersection of Analytics, Serverless, and AdTech services. He is a passionate technical leader, researcher, and mentor with a knack for building innovative solutions in the cloud. Outside of his professional life, he loves spending time with his family and friends besides listening and playing music.

Parnab Basak is a Senior Solutions Architect and a Serverless Specialist at AWS. He specializes in creating new solutions that are cloud native using modern software development practices like serverless, DevOps, and analytics. Parnab works closely in the analytics and integration services space helping customers adopt AWS services for their workflow orchestration needs.