All posts by Hernan Garcia

Introducing Amazon MWAA micro environments for Apache Airflow

Post Syndicated from Hernan Garcia original https://aws.amazon.com/blogs/big-data/introducing-amazon-mwaa-micro-environments-for-apache-airflow/

Amazon Managed Workflows for Apache Airflow (Amazon MWAA), is a managed Apache Airflow service used to extract business insights across an organization by combining, enriching, and transforming data through a series of tasks called a workflow. It enhances infrastructure security and availability while reducing operational overhead.

Today, we’re excited to announce mw1.micro, the latest addition to Amazon MWAA environment classes. This offering is designed to provide an even more cost-effective solution for running Airflow environments in the cloud. With mw1.micro, we’re bringing the power of Amazon MWAA to teams who require a lightweight environment without compromising on essential features. In this post, we’ll explore mw1.micro characteristics, key benefits, ideal use cases, and how you can set up an Amazon MWAA environment based on this new environment class.

Customers maintain multiple MWAA environments to separate development stages, optimize resources, manage versions, enhance security, ensure redundancy, customize settings, improve scalability, and facilitate experimentation. This approach offers greater flexibility and control over workflow management. These organizations often maintain multiple AWS accounts for development, testing, and production stages, leading to increased complexity and cost. The traditional approach of using full-sized Amazon MWAA environments for development and testing can also be expensive, especially for teams working on smaller projects or proof-of-concept initiatives. Additionally, customers adopting a federated deployment model find it challenging to provide isolated environments for different teams or departments, and at the same time optimize cost. The introduction of mw1.micro addresses these pain points by offering an option that enables a more efficient resource utilization and significant cost savings.

The micro environment class

The mw1.micro configuration provides a balanced set of resources suitable for small-scale data processing and orchestration tasks. The class allocates 1 vCPU and 3GB of RAM for a scheduler/worker hybrid container. Similarly, the web server is equipped with 1 vCPU and 3 GB RAM configuration. The Amazon Elastic Container Service (Amazon ECS) tasks launched in the environment use AWS Fargate platform version 1.4.0, increasing ephemeral task storage to 20 GB.

mw1.micro environments support up to three concurrent tasks, making it ideal for sequential or lightly parallelized workflows. Additionally, it can accommodate up to 25 DAGs, providing ample capacity for organizing and managing various data pipelines and processes. This micro environment is particularly well-suited for development, testing, or small production workloads where resource optimization and cost-efficiency are primary concerns.

The following table summarizes the environment capabilities of mw1.micro.

Class/Resources Scheduler and Worker vCPU/RAM Web Server vCPU/RAM Concurrent Tasks DAG Capacity
mw1.micro 1 vCPU / 3GB 1 vCPU / 3GB 3 Up to 25

For mw1.micro, we maintain the general architecture of Amazon MWAA, and combine the Airflow scheduler and worker into a single container. For this reason, mw1.micro uses only two AWS Fargate tasks, one scheduler/worker hybrid, and one web server. The following diagram illustrates the environment architecture.

Another important change is that the meta database will now use a t4g.medium Amazon Aurora PostgreSQL-Compatible Edition instance powered by AWS Graviton2. With the Graviton2 family of processors, you get compute, storage, and networking improvements, and the reduction of your carbon footprint offered by the AWS family of processors.

Supported features

mw1.micro maintains Amazon MWAA and Airflow key functionalities that developers currently rely on:

  • You can set up a public or private web server, allowing you to control access to your Airflow UI as needed
  • You can add custom plugins and requirements, enabling you to extend Airflow’s capabilities and manage dependencies effortlessly
  • Startup scripts can be used to perform initialization tasks, making sure your environment is configured precisely to your specifications
  • The Airflow UI is fully functional, providing the same intuitive interface for managing and monitoring your workflows
  • It has the same networking features as other Amazon MWAA environment classes, such as custom URLs and shared virtual private cloud (VPC) support
  • Scheduler and worker logs remain separate in their respective Amazon CloudWatch log groups, providing ease of monitoring and troubleshooting

Considerations

The architectural decisions behind mw1.micro reflect a balance between functionality and cost-effectiveness. Here are the constraints the limited resources in mw1.micro brings:

  • The scheduler and worker are combined into a single Fargate task. Only a single scheduler/worker container is supported.
  • micro consists of a single Fargate task for the web server. The maximum number of web servers is 1.
  • The number of concurrent Airflow tasks in the worker (worker_autoscale) can be set to a maximum value of 3.

Pricing and availability

Amazon MWAA pricing dimensions remains unchanged, and you only pay for what you use:

  • The environment class
  • Metadata database storage consumed

Metadata database storage pricing remains the same. Refer to Amazon Managed Workflows for Apache Airflow Pricing for rates and more details.

Observe Amazon MWAA performance

When you start using the new environment class, it’s important to understand its behavior for maintaining optimal operation and identifying potential capacity issues. It’s essential to monitor key metrics such as metadata database memory usage, and CPU utilization of the worker/scheduler hybrid container. We recommend following the guidance described in Introducing container, database, and queue utilization metrics for Amazon MWAA to better understand the state of your environments, and get insights to right-size your resources.

Set up a new micro environment in Amazon MWAA

You can set up an Amazon MWAA micro environment in your account and preferred AWS Region using the AWS Management Console, API, or AWS Command Line Interface (AWS CLI). If you’re adopting infrastructure as code (IaC), you can automate the setup using AWS CloudFormation, the AWS Cloud Development Kit (AWS CDK), or Terraform scripts.

The Amazon MWAA micro environment class is available today in all Regions where Amazon MWAA is currently available.

Conclusion

In this post, we announced the availability of the new micro environment class in Amazon MWAA. This offering addresses the needs of teams working on smaller projects, proof-of-concept initiatives, or those requiring isolated environments for different departments. By providing a lightweight yet feature-rich solution, mw1.micro enables organizations to achieve substantial cost savings without compromising on essential functionalities.

As you explore the possibilities of mw1.micro, remember to monitor its performance using the recommended metrics to maintain optimal operation. With its availability across all Regions where Amazon MWAA is offered, your teams can now use the power of Airflow in a more streamlined and economical manner, opening up new opportunities for efficient data pipeline management and orchestration in the cloud.

For additional details and code examples on Amazon MWAA, visit the Amazon MWAA User Guide and the Amazon MWAA examples GitHub repo.

Apache, Apache Airflow, and Airflow are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries.


About the Authors

Hernan Garcia is a Senior Solutions Architect at AWS based in the Netherlands. He works in the financial services industry, supporting enterprises in their cloud adoption. He is passionate about serverless technologies, security, and compliance. He enjoys spending time with family and friends, and trying out new dishes from different cuisines.

Sriharsh Adari is a Senior Solutions Architect at AWS, where he helps customers work backward from business outcomes to develop innovative solutions on AWS. Over the years, he has helped multiple customers on data platform transformations across industry verticals. His core area of expertise includes technology strategy, data analytics, and data science. In his spare time, he enjoys playing sports, watching TV shows, and playing Tabla.

Introducing Amazon MWAA support for Apache Airflow version 2.9.2

Post Syndicated from Hernan Garcia original https://aws.amazon.com/blogs/big-data/introducing-amazon-mwaa-support-for-apache-airflow-version-2-9-2/

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow that significantly improves security and availability, and reduces infrastructure management overhead when setting up and operating end-to-end data pipelines in the cloud.

Today, we are announcing the availability of Apache Airflow version 2.9.2 environments on Amazon MWAA. Apache Airflow 2.9.2 introduces several notable enhancements, such as new API endpoints for improved dataset management, advanced scheduling options including conditional expressions for dataset dependencies, the combination of dataset and time-based schedules, and custom names in dynamic task mapping for better readability of your DAGs.

In this post, we walk you through some of these new features and capabilities, how you can use them, and how you can set up or upgrade your Amazon MWAA environments to Airflow 2.9.2.

With each new version release, the Apache Airflow community is innovating to make Airflow more data-aware, enabling you to build reactive, event-driven workflows that can accommodate changes in datasets, either between Airflow environments or in external systems. Let’s go through some of these new capabilities.

Logical operators and conditional expressions for DAG scheduling

Prior to the introduction of this capability, users faced significant limitations when working with complex scheduling scenarios involving multiple datasets. Airflow’s scheduling capabilities were restricted to logical AND combinations of datasets, meaning that a DAG run would only be created after all specified datasets were updated since the last run. This rigid approach posed challenges for workflows that required more nuanced triggering conditions, such as running a DAG when any one of several datasets was updated or when specific combinations of dataset updates occurred.

With the release of Airflow 2.9.2, you can now use logical operators (AND and OR) and conditional expressions to define intricate scheduling conditions based on dataset updates. This feature allows for granular control over workflow triggers, enabling DAGs to be scheduled whenever a specific dataset or combination of datasets is updated.

For example, in the financial services industry, a risk management process might need to be run whenever trading data from any regional market is refreshed, or when both trading and regulatory updates are available. The new scheduling capabilities available in Amazon MWAA allow you to express such complex logic using simple expressions. The following diagram illustrates the dependency we need to establish.

The following DAG code contains the logical operations to implement these dependencies:

from airflow.decorators import dag, task
from airflow.datasets import Dataset
from pendulum import datetime

trading_data_asia = Dataset("s3://trading/asia/data.parquet")
trading_data_europe = Dataset("s3://trading/europe/data.parquet")
trading_data_americas = Dataset("s3://trading/americas/data.parquet")
regulatory_updates = Dataset("s3://regulators/updates.json")

@dag(
    dag_id='risk_management_trading_data',
    start_date=datetime(2023, 5, 1),
    schedule=((trading_data_asia | trading_data_europe | trading_data_americas) & regulatory_updates),
    catchup=False
)
def risk_management_pipeline():
    @task
    def risk_analysis():
        # Task for risk analysis
        ...

    @task
    def reporting():
        # Task for reporting
        ...

    @task
    def notifications():
        # Task for notifications
        ...

    analysis = risk_analysis()
    report = reporting()
    notify = notifications()

risk_management_pipeline()

To learn more about this feature, refer to Logical operators for datasets in the Airflow documentation.

Combining dataset and time-based schedules

With Airflow 2.9.2 environments, Amazon MWAA now has a more comprehensive scheduling mechanism that combines the flexibility of data-driven execution with the consistency of time-based schedules.

Consider a scenario where your team is responsible for managing a data pipeline that generates daily sales reports. This pipeline relies on data from multiple sources. Although it’s essential to generate these sales reports on a daily basis to provide timely insights to business stakeholders, you also need to make sure the reports are up to date and reflect important data changes as soon as possible. For instance, if there’s a significant influx of orders during a promotional campaign, or if inventory levels change unexpectedly, the report should incorporate these updates to maintain relevance.

Relying solely on time-based scheduling for this type of data pipeline could lead to potential issues such as outdated information and infrastructure resource wastage.

The DatasetOrTimeSchedule feature introduced in Airflow 2.9 adds the capability to combine conditional dataset expressions with time-based schedules. This means that your workflow can be invoked not only at predefined intervals but also whenever there are updates to the specified datasets, with the specific dependency relationship among them. The following diagram illustrates how you can use this capability to accommodate such scenarios.

See the following DAG code for an example implementation:

from airflow.decorators import dag, task
from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable
from airflow.datasets import Dataset
from datetime import datetime

# Define datasets
orders_dataset = Dataset("s3://path/to/orders/data")
inventory_dataset = Dataset("s3://path/to/inventory/data")
customer_dataset = Dataset("s3://path/to/customer/data")

# Combine datasets using logical operators
combined_dataset = (orders_dataset & inventory_dataset) | customer_dataset

@dag(
    dag_id="dataset_time_scheduling",
    start_date=datetime(2024, 1, 1),
    schedule=DatasetOrTimeSchedule(
        timetable=CronTriggerTimetable("0 0 * * *", timezone="UTC"),  # Daily at midnight
        datasets=combined_dataset
    ),
    catchup=False,
)
def dataset_time_scheduling_pipeline():
    @task
    def process_orders():
        # Task logic for processing orders
        pass

    @task
    def update_inventory():
        # Task logic for updating inventory
        pass

    @task
    def update_customer_data():
        # Task logic for updating customer data
        pass

    orders_task = process_orders()
    inventory_task = update_inventory()
    customer_task = update_customer_data()

dataset_time_scheduling_pipeline()

In the example, the DAG will be run under two conditions:

  • When the time-based schedule is met (daily at midnight UTC)
  • When the combined dataset condition is met, when there are updates to both orders and inventory data, or when there are updates to customer data, regardless of the other datasets

This flexibility enables you to create sophisticated scheduling rules that cater to the unique requirements of your data pipelines, so they run when necessary and incorporate the latest data updates from multiple sources.

For more details on data-aware scheduling, refer to Data-aware scheduling in the Airflow documentation.

Dataset event REST API endpoints

Prior to the introduction of this feature, making your Airflow environment aware of changes to datasets in external systems was a challenge—there was no option to mark a dataset as externally updated. With the new dataset event endpoints feature, you can programmatically initiate dataset-related events. The REST API has endpoints to create, list, and delete dataset events.

This capability enables external systems and applications to seamlessly integrate and interact with your Amazon MWAA environment. It significantly improves your ability to expand your data pipeline’s capacity for dynamic data management.

As an example, running the following code from an external system allows you to invoke a dataset event in the target Amazon MWAA environment. This event could then be handled by downstream processes or workflows, enabling greater connectivity and responsiveness in data-driven workflows that rely on timely data updates and interactions.

curl -X POST <https://{web_server_host_name}>/api/v1/datasets/events \
   -H 'Content-Type: application/json' \
   -d '{"dataset_uri": "s3://bucket_name/bucket_key", "extra": { }}'

The following diagram illustrates how the different components in the scenario interact with each other.

To get more details on how to use the Airflow REST API in Amazon MWAA, refer to Introducing Amazon MWAA support for the Airflow REST API and web server auto scaling. To learn more about the dataset event REST API endpoints, refer to Dataset UI Enhancements in the Airflow documentation.

Airflow 2.9.2 also includes features to ease the operation and monitoring of your environments. Let’s explore some of these new capabilities.

Dag auto-pausing

Customers are using Amazon MWAA to build complex data pipelines with multiple interconnected tasks and dependencies. When one of these pipelines encounters an issue or failure, it can result in a cascade of unnecessary and redundant task runs, leading to wasted resources. This problem is particularly prevalent in scenarios where pipelines run at frequent intervals, such as hourly or daily. A common scenario is a critical pipeline that starts failing during the evening, and due to the failure, it continues to run and fails repeatedly until someone manually intervenes the next morning. This can result in dozens of unnecessary tasks, consuming valuable compute resources and potentially causing data corruption or inconsistencies.

The DAG auto-pausing feature aims to address this challenge by introducing two new configuration parameters:

  • max_consecutive_failed_dag_runs_per_dag – This is a global Airflow configuration setting. It allows you to specify the maximum number of consecutive failed DAG runs before the DAG is automatically paused.
  • max_consecutive_failed_dag_runs – This is a DAG-level argument. It overrides the previous global configuration, allowing you to set a custom threshold for each DAG.

In the following code example, we define a DAG with a single PythonOperator. The failing_task is designed to fail by raising a ValueError. The key configuration for DAG auto-pausing is the max_consecutive_failed_dag_runs parameter set in the DAG object. By setting max_consecutive_failed_dag_runs=3, we’re instructing Airflow to automatically pause the DAG after it fails three consecutive times.

from airflow.decorators import dag, task
from datetime import datetime, timedelta

@task
def failing_task():
    raise ValueError("This task is designed to fail")

@dag(
    dag_id="auto_pause",
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(minutes=1),  # Run every minute
    catchup=False,
    max_consecutive_failed_dag_runs=3,  # Set the maximum number of consecutive failed DAG runs
)
def example_dag_with_auto_pause():
    failing_task_instance = failing_task()

example_dag_with_auto_pause()

With this parameter, you can now configure your Airflow DAGs to automatically pause after a specified number of consecutive failures.

To learn more, refer to DAG Auto-pausing in the Airflow documentation.

CLI support for bulk pause and resume of DAGs

As the number of DAGs in your environment grows, managing them becomes increasingly challenging. Whether for upgrading or migrating environments, or other operational activities, you may need to pause or resume multiple DAGs. This process can become a daunting cyclical endeavor because you need to navigate through the Airflow UI, manually pausing or resuming DAGs one at a time. These manual activities are time consuming and increase the risk of human error that can result in missteps and lead to data inconsistencies or pipeline disruptions. The previous CLI commands for pausing and resuming DAGs could only handle one DAG at a time, making it inefficient.

Airflow 2.9.2 improves these CLI commands by adding the capability to treat DAG IDs as regular expressions, allowing you to pause or resume multiple DAGs with a single command. This new feature eliminates the need for repetitive manual intervention or individual DAG operations, significantly reducing the risk of human error, providing reliability and consistency in your data pipelines.

As an example, to pause all DAGs generating daily liquidity reporting using Amazon Redshift as a data source, you can use the following CLI command with a regular expression:

airflow dags pause —treat-dag-id-as-regex -y "^(redshift|daily_liquidity_reporting)"

Custom names for Dynamic Task Mapping

Dynamic Task Mapping was added in Airflow 2.3. This powerful feature allows workflows to create tasks dynamically at runtime based on data. Instead of relying on the DAG author to predict the number of tasks needed in advance, the scheduler can generate the appropriate number of copies of a task based on the output of a previous task. Of course, with great powers comes great responsibilities. By default, dynamically mapped tasks were assigned numeric indexes as names. In complex workflows involving high numbers of mapped tasks, it becomes increasingly challenging to pinpoint the specific tasks that require attention, leading to potential delays and inefficiencies in managing and maintaining your data workflows.

Airflow 2.9 introduces the map_index_template parameter, a highly requested feature that addresses the challenge of task identification in Dynamic Task Mapping. With this capability, you can now provide custom names for your dynamically mapped tasks, enhancing visibility and manageability within the Airflow UI.

See the following example:

from airflow.decorators import dag
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def process_data(data):
    # Perform data processing logic here
    print(f"Processing data: {data}")

@dag(
    dag_id="custom_task_mapping_example",
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
)
def custom_task_mapping_example():
    mapped_processes = PythonOperator.partial(
        task_id="process_data_source",
        python_callable=process_data,
        map_index_template="Processing source={{ task.op_args[0] }}",
    ).expand(op_args=[["source_a"], ["source_b"], ["source_c"]])

custom_task_mapping_example()

The key aspect in the code is the map_index_template parameter specified in the PythonOperator.partial call. This Jinja template instructs Airflow to use the values of the ops_args environment variable as the map index for each dynamically mapped task instance. In the Airflow UI, you will see three task instances with the indexes source_a, source_b, and source_c, making it straightforward to identify and track the tasks associated with each data source. In case of failures, this capability improves monitoring and troubleshooting.

The map_index_template feature goes beyond simple template rendering, offering dynamic injection capabilities into the rendering context. This functionality unlocks greater levels of flexibility and customization when naming dynamically mapped tasks.

Refer to Named mapping in the Airflow documentation to learn more about named mapping.

TaskFlow decorator for Bash commands

Writing complex Bash commands and scripts using the traditional Airflow BashOperator may bring challenges in areas such as code consistency, task dependencies definition, and dynamic command generation. The new @task.bash decorator addresses these challenges, allowing you to define Bash statements using Python functions, making the code more readable and maintainable. It seamlessly integrates with Airflow’s TaskFlow API, enabling you to define dependencies between tasks and create complex workflows. You can also use Airflow’s scheduling and monitoring capabilities while maintaining a consistent coding style.

The following sample code showcases how the @task.bash decorator simplifies the integration of Bash commands into DAGs, while using the full capabilities of Python for dynamic command generation and data processing:

from airflow.decorators import dag, task
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Sample customer data
customer_data = """
id,name,age,city
1,John Doe,35,New York
2,Jane Smith,42,Los Angeles
3,Michael Johnson,28,Chicago
4,Emily Williams,31,Houston
5,David Brown,47,Phoenix
"""

# Sample order data
order_data = """
order_id,customer_id,product,quantity,price
101,1,Product A,2,19.99
102,2,Product B,1,29.99
103,3,Product A,3,19.99
104,4,Product C,2,14.99
105,5,Product B,1,29.99
"""

@dag(
    dag_id='task-bash-customer_order_analysis',
    default_args=default_args,
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(days=1),
    catchup=False,
)
def customer_order_analysis_dag():
    @task.bash
    def clean_data():
        # Clean customer data
        customer_cleaning_commands = """
            echo '{}' > cleaned_customers.csv
            cat cleaned_customers.csv | sed 's/,/;/g' > cleaned_customers.csv
            cat cleaned_customers.csv | awk 'NR > 1' > cleaned_customers.csv
        """.format(customer_data)

        # Clean order data
        order_cleaning_commands = """
            echo '{}' > cleaned_orders.csv
            cat cleaned_orders.csv | sed 's/,/;/g' > cleaned_orders.csv
            cat cleaned_orders.csv | awk 'NR > 1' > cleaned_orders.csv
        """.format(order_data)

        return customer_cleaning_commands + "\n" + order_cleaning_commands

    @task.bash
    def transform_data(cleaned_customers, cleaned_orders):
        # Transform customer data
        customer_transform_commands = """
            cat {cleaned_customers} | awk -F';' '{{printf "%s,%s,%s\\n", $1, $2, $3}}' > transformed_customers.csv
        """.format(cleaned_customers=cleaned_customers)

        # Transform order data
        order_transform_commands = """
            cat {cleaned_orders} | awk -F';' '{{printf "%s,%s,%s,%s,%s\\n", $1, $2, $3, $4, $5}}' > transformed_orders.csv
        """.format(cleaned_orders=cleaned_orders)

        return customer_transform_commands + "\n" + order_transform_commands

    @task.bash
    def analyze_data(transformed_customers, transformed_orders):
        analysis_commands = """
            # Calculate total revenue
            total_revenue=$(awk -F',' '{{sum += $5 * $4}} END {{printf "%.2f", sum}}' {transformed_orders})
            echo "Total revenue: $total_revenue"

            # Find customers with multiple orders
            customers_with_multiple_orders=$(
                awk -F',' '{{orders[$2]++}} END {{for (c in orders) if (orders[c] > 1) printf "%s,", c}}' {transformed_orders}
            )
            echo "Customers with multiple orders: $customers_with_multiple_orders"

            # Find most popular product
            popular_product=$(
                awk -F',' '{{products[$3]++}} END {{max=0; for (p in products) if (products[p] > max) {{max=products[p]; popular=p}}}} END {{print popular}}'
            {transformed_orders})
            echo "Most popular product: $popular_product"
        """.format(transformed_customers=transformed_customers, transformed_orders=transformed_orders)

        return analysis_commands

    cleaned_data = clean_data()
    transformed_data = transform_data(cleaned_data, cleaned_data)
    analysis_results = analyze_data(transformed_data, transformed_data)

customer_order_analysis_dag()

You can learn more about the @task.bash decorator in the Airflow documentation.

Set up a new Airflow 2.9.2 environment in Amazon MWAA

You can initiate the setup in your account and preferred AWS Region using the AWS Management Console, API, or AWS Command Line Interface (AWS CLI). If you’re adopting infrastructure as code (IaC), you can automate the setup using AWS CloudFormation, the AWS Cloud Development Kit (AWS CDK), or Terraform scripts.

Upon successful creation of an Airflow 2.9 environment in Amazon MWAA, certain packages are automatically installed on the scheduler and worker nodes. For a complete list of installed packages and their versions, refer to Apache Airflow provider packages installed on Amazon MWAA environments. You can install additional packages using a requirements file.

Upgrade from older versions of Airflow to version 2.9.2

You can take advantage of these latest capabilities by upgrading your older Airflow version 2.x-based environments to version 2.9 using in-place version upgrades. To learn more about in-place version upgrades, refer to Upgrading the Apache Airflow version or Introducing in-place version upgrades with Amazon MWAA.

Conclusion

In this post, we announced the availability of Apache Airflow 2.9 environments in Amazon MWAA. We discussed how some of the latest features added in the release enable you to design more reactive, event-driven workflows, such as DAG scheduling based on the result of logical operations, and the availability of endpoints in the REST API to programmatically create dataset events. We also provided some sample code to show the implementation in Amazon MWAA.

For the complete list of changes, refer to Airflow’s release notes. For additional details and code examples on Amazon MWAA, visit the Amazon MWAA User Guide and the Amazon MWAA examples GitHub repo.

Apache, Apache Airflow, and Airflow are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries.


About the authors

Hernan Garcia is a Senior Solutions Architect at AWS, based out of Amsterdam, working with enterprises in the Financial Services Industry. He specializes in application modernization and supports customers in the adoption of serverless technologies.

Parnab Basak is a Solutions Architect and a Serverless Specialist at AWS. He specializes in creating new solutions that are cloud native using modern software development practices like serverless, DevOps, and analytics. Parnab works closely in the analytics and integration services space helping customers adopt AWS services for their workflow orchestration needs.

Introducing Amazon MWAA larger environment sizes

Post Syndicated from Hernan Garcia original https://aws.amazon.com/blogs/big-data/introducing-amazon-mwaa-larger-environment-sizes/

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed service for Apache Airflow that streamlines the setup and operation of the infrastructure to orchestrate data pipelines in the cloud. Customers use Amazon MWAA to manage the scalability, availability, and security of their Apache Airflow environments. As they design more intensive, complex, and ever-growing data processing pipelines, customers have asked us for additional underlying resources to provide greater concurrency and capacity for their tasks and workflows.

To address this, today, we are announcing the availability of larger environment classes in Amazon MWAA. In this post, we dive into the capabilities of these new XL and 2XL environments, the scenarios they are well suited for, and how you can set up or upgrade your existing Amazon MWAA environment to take advantage of the increased resources.

Current challenges

When you create an Amazon MWAA environment, a set of managed Amazon Elastic Container Service (Amazon ECS) with AWS Fargate containers are provisioned with defined virtual CPUs and RAM.

As you work with larger, complex, resource-intensive workloads, or run thousands of Directed Acyclic Graphs (DAGs) per day, you may start exhausting CPU availability on schedulers and workers, or reaching memory limits in workers. Running Apache Airflow at scale puts proportionally greater load on the Airflow metadata database, sometimes leading to CPU and memory issues on the underlying Amazon Relational Database Service (Amazon RDS) cluster. A resource-starved metadata database may lead to dropped connections from your workers, failing tasks prematurely.

To improve performance and resiliency of your tasks, consider following Apache Airflow best practices to author DAGs. As an alternative, you can create multiple Amazon MWAA environments to distribute workloads. However, this requires additional engineering and management effort.

New environment classes

With today’s release, you can now create XL and 2XL environments in Amazon MWAA in addition to the existing environment classes. They have two and four times the compute, and three and six times the memory, respectively, of the current large Amazon MWAA environment instance class. These instances add compute and RAM linearly to directly improve capacity and performance of all Apache Airflow components. The following table summarizes the environment capabilities.

. Scheduler and Worker CPU / RAM

Web Server

CPU / RAM

Concurrent Tasks DAG Capacity
mw1.xlarge 8 vCPUs / 24 GB 4 vCPUs / 12 GB 40 tasks (default) Up to 2000
mw1.2xlarge 16 vCPUs / 48 GB 8 vCPUs / 24 GB 80 tasks (default) Up to 4000

With the introduction of these larger environments, your Amazon Aurora metadata database will now use larger, memory-optimized instances powered by AWS Graviton2. With the Graviton2 family of processors, you get compute, storage, and networking improvements, and the reduction of your carbon footprint offered by the AWS family of processors.

Pricing

Amazon MWAA pricing dimensions remains unchanged, and you only pay for what you use:

  • The environment class
  • Additional worker instances
  • Additional scheduler instances
  • Metadata database storage consumed

You now get two additional options in the first three dimensions: XL and 2XL for environment class, additional workers, and schedulers instances. Metadata database storage pricing remains the same. Refer to Amazon Managed Workflows for Apache Airflow Pricing for rates and more details.

Observe Amazon MWAA performance to plan scaling to larger environments

Before you start using the new environment classes, it’s important to understand if you are in a scenario that relates to capacity issues, such as metadata database out of memory, or workers or schedulers running at high CPU usage. Understanding the performance of your environment resources is key to troubleshooting issues related to capacity. We recommend following the guidance described in Introducing container, database, and queue utilization metrics for the Amazon MWAA environment to better understand the state of Amazon MWAA environments, and get insights to right-size your instances.

In the following test, we simulate a high load scenario, use the CloudWatch observability metrics to identify common problems, and make an informed decision to plan scaling to larger environments to mitigate the issues.

During our tests, we ran a complex DAG that dynamically creates over 500 tasks and uses external sensors to wait for a task completion in a different DAG. After running on an Amazon MWAA large environment class with auto scaling set up to a maximum of 10 worker nodes, we noticed the following metrics and values in the CloudWatch dashboard.

The worker nodes have reached maximum CPU capacity, causing the number of queued tasks to keep increasing. The metadata database CPU utilization has peaked at over 65% capacity, and the available database free memory has been reduced. In this situation, we could further increase the worker nodes to scale, but that would put additional load on the metadata database CPU. This might lead to a drop in the number of worker database connections and available free database memory.

With new environment classes, you can vertically scale to increase available resources by editing the environment and selecting a higher class of environment, as shown in the following screenshot.

From the list of environments, we select the one in use for this test. Choose Edit to navigate to the Configure advanced settings page, and select the appropriate xlarge or 2xlarge environment as required.

After you save the change, the environment upgrade will take 20–30 minutes to complete. Any running DAG that got interrupted during the upgrade is scheduled for a retry, depending on the way you configured the retries for your DAGs. You can now choose to invoke them manually or wait for the next scheduled run.

After we upgraded the environment class, we tested the same DAG and observed the metrics were showing improved values because more resources are now available. With this XL environment, you can run more tasks on fewer worker nodes, and therefore the number of queued tasks kept decreasing. Alternately, if you have tasks that require more memory and/or CPU, you can reduce the tasks per worker, but still achieve a high number of tasks per worker with a larger environment size. For example, if you have a large environment where the worker node CPU is maxed out with celery.worker_autoscale (the Airflow configuration that defines the number of tasks per worker) Set at 20,20, you can increase to an XL environment and set celery.worker_autoscale to 20,20 on the XL, rather than the default 40 tasks per worker on an XL environment and the CPU load should reduce significantly.

Set up a new XL environment in Amazon MWAA

You can get started with Amazon MWAA in your account and preferred AWS Region using the AWS Management Console, API, or AWS Command Line Interface (AWS CLI). If you’re adopting infrastructure as code (IaC), you can automate the setup using AWS CloudFormation, the AWS Cloud Development Kit (AWS CDK), or Terraform scripts.

Amazon MWAA XL and 2XL environment classes are available today in all Regions where Amazon MWAA is currently available.

Conclusion

Today, we are announcing the availability of two new environment classes in Amazon MWAA. With XL and 2XL environment classes, you can orchestrate larger volumes of complex or resource-intensive workflows. If you are running DAGs with a high number of dependencies, running thousands of DAGs across multiple environments, or in a scenario that requires you to heavily use workers for compute, you can now overcome the related capacity issues by increasing your environment resources in a few straightforward steps.

In this post, we discussed the capabilities of the two new environment classes, including pricing and some common resource constraint problems they solve. We provided guidance and an example of how to observe your existing environments to plan scaling to XL or 2XL, and we described how you can upgrade existing environments to use the increased resources.

For additional details and code examples on Amazon MWAA, visit the Amazon MWAA User Guide and the Amazon MWAA examples GitHub repo.

Apache, Apache Airflow, and Airflow are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries.


About the Authors

Hernan Garcia is a Senior Solutions Architect at AWS based in the Netherlands. He works in the financial services industry, supporting enterprises in their cloud adoption. He is passionate about serverless technologies, security, and compliance. He enjoys spending time with family and friends, and trying out new dishes from different cuisines.

Jeetendra Vaidya is a Senior Solutions Architect at AWS, bringing his expertise to the realms of AI/ML, serverless, and data analytics domains. He is passionate about assisting customers in architecting secure, scalable, reliable, and cost-effective solutions.

Sriharsh Adari is a Senior Solutions Architect at AWS, where he helps customers work backward from business outcomes to develop innovative solutions on AWS. Over the years, he has helped multiple customers on data platform transformations across industry verticals. His core area of expertise includes technology strategy, data analytics, and data science. In his spare time, he enjoys playing sports, watching TV shows, and playing Tabla.

Introducing Apache Airflow version 2.6.3 support on Amazon MWAA

Post Syndicated from Hernan Garcia original https://aws.amazon.com/blogs/big-data/introducing-apache-airflow-version-2-6-3-support-on-amazon-mwaa/

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow that makes it simple to set up and operate end-to-end data pipelines in the cloud. Trusted across various industries, Amazon MWAA helps organizations like Siemens, ENGIE, and Choice Hotels International enhance and scale their business workflows, while significantly improving security and reducing infrastructure management overhead.

Today, we are announcing the availability of Apache Airflow version 2.6.3 environments. If you’re currently running Apache Airflow version 2.x, you can seamlessly upgrade to v2.6.3 using in-place version upgrades, thereby retaining your workflow run history and environment configurations.

In this post, we delve into some of the new features and capabilities of Apache Airflow v2.6.3 and how you can set up or upgrade your Amazon MWAA environment to accommodate this version as you orchestrate your workflows in the cloud at scale.

New feature: Notifiers

Airflow now gives you an efficient way to create reusable and standardized notifications to handle systemic errors and failures. Notifiers introduce a new object in Airflow, designed to be an extensible layer for adding notifications to DAGs. This framework can send messages to external systems when a task instance or an individual DAG run changes its state. You can build notification logic from a new base object and call it directly from your DAG files. The BaseNotifier is an abstract class that provides a basic structure for sending notifications in Airflow using the various on_*__callback. It is intended for providers to extend and customize this for their specific needs.

Using this framework, you can build custom notification logic directly within your DAG files. For instance, notifications can be sent through email, Slack, or Amazon Simple Notification Service (Amazon SNS) based on the state of a DAG (on_failure, on_success, and so on). You can also create your own custom notifier that updates an API or posts a file to your storage system of choice.

For details on how to create and use a notifier, refer to Creating a notifier.

New feature: Managing tasks stuck in a queued state

Apache Airflow v2.6.3 brings a significant improvement to address the long-standing issue of tasks getting stuck in the queued state when using the CeleryExecutor. In a typical Apache Airflow workflow, tasks progress through a lifecycle, moving from the scheduled state to the queued state, and eventually to the running state. However, tasks can occasionally remain in the queued state longer than expected due to communication issues among the scheduler, the executor, and the worker. In Amazon MWAA, customers have experienced such tasks being queued for up to 12 hours due to how it utilizes the native integration of Amazon Simple Queue Service (Amazon SQS) with CeleryExecutor.

To mitigate this issue, Apache Airflow v2.6.3 introduced a mechanism that checks the Airflow database for tasks that have remained in the queued state beyond a specified timeout, defaulting to 600 seconds. This default can be modified using the environment configuration parameter scheduler.task_queued_timeout. The system then retries such tasks if retries are still available or fails them otherwise, ensuring that your data pipelines continue to function smoothly.

Notably, this update deprecates the previously used celery.stalled_task_timeout and celery.task_adoption_timeout settings, and consolidates their functionalities into a single configuration, scheduler.task_queued_timeout. This enables more effective management of tasks that remain in the queued state. Operators can also configure scheduler.task_queued_timeout_check_interval, which controls how frequently the system checks for tasks that have stayed in the queued state beyond the defined timeout.

For details on how to use task_queued_timeout, refer to the official Airflow documentation.

New feature: A new continuous timetable and support for continuous schedule

With prior versions of Airflow, to run a DAG continuously in a loop, you had to use the TriggerDagRunOperator to rerun the DAG after the last task is finished. With Apache Airflow v2.6.3, you can now run a DAG continuously with a predefined timetable. The simplifies scheduling for continual DAG runs. The new ContinuousTimetable construct will create one continuous DAG run, respecting start_date and end_date, with the new run starting as soon as the previous run has completed, regardless of whether the previous run has succeeded or failed. Using a continuous timetable is especially useful when sensors are used to wait for highly irregular events from external data tools.

You can bound the degree of parallelism to ensure that only one DAG is running at any given time with the max_active_runs parameter:

@dag(
    start_date=datetime(2023, 5, 9),
    schedule="@continuous",
    max_active_runs=1,  
    catchup=False,
)

New feature: Trigger the DAG UI extension with flexible user form concept

Prior to Apache Airflow v2.6.3, you could provide parameters in JSON structure through the Airflow UI for custom workflow runs. You had to model, check, and understand the JSON and enter parameters manually without the option to validate them before triggering a workflow. With Apache Airflow v2.6.3, when you choose Trigger DAG w/ config, a trigger UI form is rendered based on the predefined DAG Params. For your ad hoc, testing, or custom runs, this simplifies the DAG’s parameter entry. If the DAG has no parameters defined, a JSON entry mask is shown. The form elements can be defined with the Param class and attributes define how a form field is displayed.

For an example DAG the following form is generated by DAG Params.

Set Up a New Apache Airflow v2.6.3 Environment

You can set up a new Apache Airflow v2.6.3 environment in your account and preferred Region using the AWS Management Console, API, or AWS Command Line Interface (AWS CLI). If you’re adopting infrastructure as code (IaC), you can automate the setup using either AWS CloudFormation, the AWS Cloud Development Kit (AWS CDK), or Terraform scripts.

When you have successfully created an Apache Airflow v2.6.3 environment in Amazon MWAA, the following packages are automatically installed on the scheduler and worker nodes along with other provider packages:

apache-airflow-providers-amazon==8.2.0

python==3.10.8

For a complete list of provider packages installed, refer to Apache Airflow provider packages installed on Amazon MWAA environments.

Upgrade from older versions of Apache Airflow to Apache Airflow v2.6.3

You can perform in-place version upgrades of your existing Amazon MWAA environments to update your older Apache Airflow v2.x-based environments to v2.6.3. To learn more about in-place version upgrades, refer to Upgrading the Apache Airflow version or Introducing in-place version upgrades with Amazon MWAA.

Conclusion

In this post, we talked about some of the new features of Apache Airflow v2.6.3 and how you can get started using them in Amazon MWAA. Try out these new features like notifiers and continuous timetables, and other enhancements to improve your data orchestration pipelines.

For additional details and code examples on Amazon MWAA, visit the Amazon MWAA User Guide  and the Amazon MWAA examples GitHub repo.

Apache, Apache Airflow, and Airflow are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries.


About the Authors

Hernan Garcia is a Senior Solutions Architect at AWS, based out of Amsterdam, working in the Financial Services Industry since 2018. He specializes in application modernization and supports his customers in the adoption of cloud operating models and serverless technologies.

Parnab Basak is a Solutions Architect and a Serverless Specialist at AWS. He specializes in creating new solutions that are cloud native using modern software development practices like serverless, DevOps, and analytics. Parnab works closely in the analytics and integration services space helping customers adopt AWS services for their workflow orchestration needs.

Shubham Mehta is an experienced product manager with over eight years of experience and a proven track record of delivering successful products. In his current role as a Senior Product Manager at AWS, he oversees Amazon Managed Workflows for Apache Airflow (Amazon MWAA) and spearheads the Apache Airflow open-source contributions to further enhance the product’s functionality.