Customers who use Amazon Managed Workflows for Apache Airflow (Amazon MWAA) often need Python dependencies that are hosted in private code repositories. Many customers opt for public network access mode for its ease of use and ability to make outbound Internet requests, all while maintaining secure access. However, private code repositories may not be accessible via the Internet. It’s also a best practice to only install Python dependencies where they are needed. You can use Amazon MWAA startup scripts to selectively install Python dependencies required for running code on workers, while avoiding issues due to web server restrictions.
This post demonstrates a method to selectively install Python dependencies based on the Amazon MWAA component type (web server, scheduler, or worker) from a Git repository only accessible from your virtual private cloud (VPC).
Solution overview
This solution focuses on using a private Git repository to selectively install Python dependencies, although you can use the same pattern demonstrated in this post with private Python package indexes such as AWS CodeArtifact. For more information, refer to Amazon MWAA with AWS CodeArtifact for Python dependencies.
The Amazon MWAA architecture allows you to choose a web server access mode to control whether the web server is accessible from the internet or only from your VPC. You can also control whether your workers, scheduler, and web servers have access to the internet through your customer VPC configuration. In this post, we demonstrate an environment such as the one shown in the following diagram, where the environment is using public network access mode for the web servers, and the Apache Airflow workers and schedulers don’t have a route to the internet from your VPC.
There are up to four potential networking configurations for an Amazon MWAA environment:
Public routing and public web server access mode
Private routing and public web server access mode (pictured in the preceding diagram)
Public routing and private web server access mode
Private routing and private web server access mode
We focus on one networking configuration for this post, but the fundamental concepts are applicable for any networking configuration.
The solution we walk through relies on the fact that Amazon MWAA runs a startup script (startup.sh) during startup on every individual Apache Airflow component (worker, scheduler, and web server) before installing requirements (requirements.txt) and initializing the Apache Airflow process. This startup script is used to set an environment variable, which is then referenced in the requirements.txt file to selectively install libraries.
The following steps allow us to accomplish this:
Create and install the startup script (startup.sh) in the Amazon MWAA environment. This script sets the environment variable for selectively installing dependencies.
Create and install global Python dependencies (requirements.txt) in the Amazon MWAA environment. This file contains the global dependencies required by all Amazon MWAA components.
Create and install component-specific Python dependencies in the Amazon MWAA environment. This step involves creating separate requirements files for each component type (worker, scheduler, web server) to selectively install the necessary dependencies.
Prerequisites
For this walkthrough, you should have the following prerequisites:
Create and install the startup script in the Amazon MWAA environment
Create the startup.sh file using the following example code:
#!/bin/sh
echo "Printing Apache Airflow component"
echo $MWAA_AIRFLOW_COMPONENT
if [[ "${MWAA_AIRFLOW_COMPONENT}" != "webserver" ]]
then
sudo yum -y install libaio
fi
if [[ "${MWAA_AIRFLOW_COMPONENT}" == "webserver" ]]
then
echo "Setting extended python requirements for webservers"
export EXTENDED_REQUIREMENTS="webserver_reqs.txt"
fi
if [[ "${MWAA_AIRFLOW_COMPONENT}" == "worker" ]]
then
echo "Setting extended python requirements for workers"
export EXTENDED_REQUIREMENTS="worker_reqs.txt"
fi
if [[ "${MWAA_AIRFLOW_COMPONENT}" == "scheduler" ]]
then
echo "Setting extended python requirements for schedulers"
export EXTENDED_REQUIREMENTS="scheduler_reqs.txt"
fi
Upload startup.sh to the S3 bucket for your Amazon MWAA environment:
Browse the CloudWatch log streams for your workers and view the worker_console log. Notice the startup script is now running and setting the environment variable.
Create and install global Python dependencies in the Amazon MWAA environment
Your requirements file must include a –constraint statement to make sure the packages listed in your requirements are compatible with the version of Apache Airflow you are using. The statement beginning with -r references the environment variable you set in your startup.sh script based on the component type.
The following code is an example of the requirements.txt file:
Create and install component-specific Python dependencies in the Amazon MWAA environment
For this example, we want to install the Python package scrapy on workers and schedulers from our private Git repository. We also want to install pprintpp on the web server from the public Python packages indexes. To accomplish that, we need to create the following files (we provide example code):
Browse the CloudWatch log streams for your workers and view the requirements_install log. Notice the startup script is now running and setting the environment variable.
Conclusion
In this post, we demonstrated a method to selectively install Python dependencies based on the Amazon MWAA component type (web server, scheduler, or worker) from a Git repository only accessible from your VPC.
We hope this post provided you with a better understanding of how startup scripts and Python dependency management work in an Amazon MWAA environment. You can implement other variations and configurations using the concepts outlined in this post, depending on your specific network setup and requirements.
About the Author
Tim Wilhoit is a Sr. Solutions Architect for the Department of Defense at AWS. Tim has over 20 years of enterprise IT experience. His areas of interest are serverless computing and ML/AI. In his spare time, Tim enjoys spending time at the lake and rooting on the Oklahoma State Cowboys. Go Pokes!
In this blog post, we will highlight how ZS Associates used multiple AWS services to build a highly scalable, highly performant, clinical document search platform. This platform is an advanced information retrieval system engineered to assist healthcare professionals and researchers in navigating vast repositories of medical documents, medical literature, research articles, clinical guidelines, protocol documents, activity logs, and more. The goal of this search platform is to locate specific information efficiently and accurately to support clinical decision-making, research, and other healthcare-related activities by combining queries across all the different types of clinical documentation.
ZS is a management consulting and technology firm focused on transforming global healthcare. We use leading-edge analytics, data, and science to help clients make intelligent decisions. We serve clients in a wide range of industries, including pharmaceuticals, healthcare, technology, financial services, and consumer goods. We developed and host several applications for our customers on Amazon Web Services (AWS). ZS is also an AWS Advanced Consulting Partner as well as an Amazon Redshift Service Delivery Partner. As it relates to the use case in the post, ZS is a global leader in integrated evidence and strategy planning (IESP), a set of services that help pharmaceutical companies to deliver a complete and differentiated evidence package for new medicines.
ZS uses several AWS service offerings across the variety of their products, client solutions, and services. AWS services such as Amazon Neptune and Amazon OpenSearch Service form part of their data and analytics pipelines, and AWS Batch is used for long-running data and machine learning (ML) processing tasks.
Clinical data is highly connected in nature, so ZS used Neptune, a fully managed, high performance graph database service built for the cloud, as the database to capture the ontologies and taxonomies associated with the data that formed the supporting a knowledge graph. For our search requirements, We have used OpenSearch Service, an open source, distributed search and analytics suite.
About the clinical document search platform
Clinical documents comprise of a wide variety of digital records including:
Study protocols
Evidence gaps
Clinical activities
Publications
Within global biopharmaceutical companies, there are several key personas who are responsible to generate evidence for new medicines. This evidence supports decisions by payers, health technology assessments (HTAs), physicians, and patients when making treatment decisions. Evidence generation is rife with knowledge management challenges. Over the life of a pharmaceutical asset, hundreds of studies and analyses are completed, and it becomes challenging to maintain a good record of all the evidence to address incoming questions from external healthcare stakeholders such as payers, providers, physicians, and patients. Furthermore, almost none of the information associated with evidence generation activities (such as health economics and outcomes research (HEOR), real-world evidence (RWE), collaboration studies, and investigator sponsored research (ISR)) exists as structured data; instead, the richness of the evidence activities exists in protocol documents (study design) and study reports (outcomes). Therein lies the irony—teams who are in the business of knowledge generation struggle with knowledge management.
ZS unlocked new value from unstructured data for evidence generation leads by applying large language models (LLMs) and generative artificial intelligence (AI) to power advanced semantic search on evidence protocols. Now, evidence generation leads (medical affairs, HEOR, and RWE) can have a natural-language, conversational exchange and return a list of evidence activities with high relevance considering both structured data and the details of the studies from unstructured sources.
Overview of solution
The solution was designed in layers. The document processing layer supports document ingestion and orchestration. The semantic search platform (application) layer supports backend search and the user interface. Multiple different types of data sources, including media, documents, and external taxonomies, were identified as relevant for capture and processing within the semantic search platform.
Document processing solution framework layer
All components and sub-layers are orchestrated using Amazon Managed Workflows for Apache Airflow. The pipeline in Airflow is scaled automatically based on the workload using Batch. We can broadly divide layers here as shown in the following figure:
Document Processing Solution Framework Layers
Data crawling:
In the data crawling layer, documents are retrieved from a specified source SharePoint location and deposited into a designated Amazon Simple Storage Service (Amazon S3) bucket. These documents could be in variety of formats, such as PDF, Microsoft Word, and Excel, and are processed using format-specific adapters.
Data ingestion:
The data ingestion layer is the first step of the proposed framework. At this later, data from a variety of sources smoothly enters the system’s advanced processing setup. In the pipeline, the data ingestion process takes shape through a thoughtfully structured sequence of steps.
These steps include creating a unique run ID each time a pipeline is run, managing natural language processing (NLP) model versions in the versioning table, identifying document formats, and ensuring the health of NLP model services with a service health check.
The process then proceeds with the transfer of data from the input layer to the landing layer, creation of dynamic batches, and continuous tracking of document processing status throughout the run. In case of any issues, a failsafe mechanism halts the process, enabling a smooth transition to the NLP phase of the framework.
Database ingestion:
The reporting layer processes the JSON data from the feature extraction layer and converts it into CSV files. Each CSV file contains specific information extracted from dedicated sections of documents. Subsequently, the pipeline generates a triple file using the data from these CSV files, where each set of entities signifies relationships in a subject-predicate-object format. This triple file is intended for ingestion into Neptune and OpenSearch Service. In the full document embedding module, the document content is segmented into chunks, which are then transformed into embeddings using LLMs such as llama-2 and BGE. These embeddings, along with metadata such as the document ID and page number, are stored in OpenSearch Service. We use various chunking strategies to enhance text comprehension. Semantic chunking divides text into sentences, grouping them into sets, and merges similar ones based on embeddings.
Agentic chunking uses LLMs to determine context-driven chunk sizes, focusing on proposition-based division and simplifying complex sentences. Additionally, context and document aware chunking adapts chunking logic to the nature of the content for more effective processing.
NLP:
The NLP layer serves as a crucial component in extracting specific sections or entities from documents. The feature extraction stage proceeds with localization, where sections are identified within the document to narrow down the search space for further tasks like entity extraction. LLMs are used to summarize the text extracted from document sections, enhancing the efficiency of this process. Following localization, the feature extraction step involves extracting features from the identified sections using various procedures. These procedures, prioritized based on their relevance, use models like Llama-2-7b, mistral-7b, Flan-t5-xl, and Flan-T5-xxl to extract important features and entities from the document text.
The auto-mapping phase ensures consistency by mapping extracted features to standard terms present in the ontology. This is achieved through matching the embeddings of extracted features with those stored in the OpenSearch Service index. Finally, in the Document Layout Cohesion step, the output from the auto-mapping phase is adjusted to aggregate entities at the document level, providing a cohesive representation of the document’s content.
Semantic search platform application layer
This layer, shown in the following figure, uses Neptune as the graph database and OpenSearch Service as the vector engine.
Semantic search platform application layer
Amazon OpenSearch Service:
OpenSearch Service served the dual purpose of facilitating full-text search and embedding-based semantic search. The OpenSearch Service vector engine capability helped to drive Retrieval-Augmented Generation (RAG) workflows using LLMs. This helped to provide a summarized output for search after the retrieval of a relevant document for the input query. The method used for indexing embeddings was FAISS.
OpenSearch Service domain details:
Version of OpenSearch Service: 2.9
Number of nodes: 1
Instance type: r6g.2xlarge.search
Volume size: Gp3: 500gb
Number of Availability Zones: 1
Dedicated master node: Enabled
Number of Availability Zones: 3
No of master Nodes: 3
Instance type(Master Node) : r6g.large.search
To determine the nearest neighbor, we employ the Hierarchical Navigable Small World (HNSW) algorithm. We used the FAISS approximate k-NN library for indexing and searching and the Euclidean distance (L2 norm) for distance calculation between two vectors.
Amazon Neptune:
Neptune enables full-text search (FTS) through the integration with OpenSearch Service. A native streaming service for enabling FTS provided by AWS was established to replicate data from Neptune to OpenSearch Service. Based on the business use case for search, a graph model was defined. Considering the graph model, subject matter experts from the ZS domain team curated custom taxonomy capturing hierarchical flow of classes and sub-classes pertaining to clinical data. Open source taxonomies and ontologies were also identified, which would be part of the knowledge graph. Sections and entities were identified to be extracted from clinical documents. An unstructured document processing pipeline developed by ZS processed the documents in parallel and populated triples in RDF format from documents for Neptune ingestion.
The triples are created in such a way that semantically similar concepts are linked—hence creating a semantic layer for search. After the triples files are created, they’re stored in an S3 bucket. Using the Neptune Bulk Loader, we were able to load millions of triples to the graph.
Neptune ingests both structured and unstructured data, simplifying the process to retrieve content across different sources and formats. At this point, we were able to discover previously unknown relationships between the structured and unstructured data, which was then made available to the search platform. We used SPARQL query federation to return results from the enriched knowledge graph in the Neptune graph database and integrated with OpenSearch Service.
Neptune was able to automatically scale storage and compute resources to accommodate growing datasets and concurrent API calls. Presently, the application sustains approximately 3,000 daily active users. Concurrently, there is an observation of approximately 30–50 users initiating queries simultaneously within the application environment. The Neptune graph accommodates a substantial repository of approximately 4.87 million triples. The triples count is increasing because of our daily and weekly ingestion pipeline routines.
Neptune configuration:
Instance Class: db.r5d.4xlarge
Engine version: 1.2.0.1
LLMs:
Large language models (LLMs) like Llama-2, Mistral and Zephyr are used for extraction of sections and entities. Models like Flan-t5 were also used for extraction of other similar entities used in the procedures. These selected segments and entities are crucial for domain-specific searches and therefore receive higher priority in the learning-to-rank algorithm used for search.
Additionally, LLMs are used to generate a comprehensive summary of the top search results.
The LLMs are hosted on Amazon Elastic Kubernetes Service (Amazon EKS) with GPU-enabled node groups to ensure rapid inference processing. We’re using different models for different use cases. For example, to generate embeddings we deployed a BGE base model, while Mistral, Llama2, Zephyr, and others are used to extract specific medical entities, perform part extraction, and summarize search results. By using different LLMs for distinct tasks, we aim to enhance accuracy within narrow domains, thereby improving the overall relevance of the system.
Fine tuning :
Already fine-tuned models on pharma-specific documents were used. The models used were:
PharMolix/BioMedGPT-LM-7B (finetuned LLAMA-2 on medical)
emilyalsentzer/Bio_ClinicalBERT
stanford-crfm/BioMedLM
microsoft/biogpt
Re ranker, sorter, and filter stage:
Remove any stop words and special characters from the user input query to ensure a clean query. Upon pre-processing the query, create combinations of search terms by forming combinations of terms with varying n-grams. This step enriches the search scope and improves the chances of finding relevant results. For instance, if the input query is “machine learning algorithms,” generating n-grams could result in terms like “machine learning,” “learning algorithms,” and “machine learning algorithms”. Run the search terms simultaneously using the search API to access both Neptune graph and OpenSearch Service indexes. This hybrid approach broadens the search coverage, tapping into the strengths of both data sources. Specific weight is assigned to each result obtained from the data sources based on the domain’s specifications. This weight reflects the relevance and significance of the result within the context of the search query and the underlying domain. For example, a result from Neptune graph might be weighted higher if the query pertains to graph-related concepts, i.e. the search term is related directly to the subject or object of a triple, whereas a result from OpenSearch Service might be given more weightage if it aligns closely with text-based information. Documents that appear in both Neptune graph and OpenSearch Service receive the highest priority, because they likely offer comprehensive insights. Next in priority are documents exclusively sourced from the Neptune graph, followed by those solely from OpenSearch Service. This hierarchical arrangement ensures that the most relevant and comprehensive results are presented first. After factoring in these considerations, a final score is calculated for each result. Sorting the results based on their final scores ensures that the most relevant information is presented in the top n results.
Final UI
An evidence catalogue is aggregated from disparate systems. It provides a comprehensive repository of completed, ongoing and planned evidence generation activities. As evidence leads make forward-looking plans, the existing internal base of evidence is made readily available to inform decision-making.
The following video is a demonstration of an evidence catalog:
Customer impact
When completed, the solution provided the following customer benefits:
The search on multiple data source (structured and unstructured documents) enables visibility of complex hidden relationships and insights.
Clinical documents often contain a mix of structured and unstructured data. Neptune can store structured information in a graph format, while the vector database can handle unstructured data using embeddings. This integration provides a comprehensive approach to querying and analyzing diverse clinical information.
By building a knowledge graph using Neptune, you can enrich the clinical data with additional contextual information. This can include relationships between diseases, treatments, medications, and patient records, providing a more holistic view of healthcare data.
The search application helped in staying informed about the latest research, clinical developments, and competitive landscape.
This has enabled customers to make timely decisions, identify market trends, and help positioning of products based on a comprehensive understanding of the industry.
The application helped in monitoring adverse events, tracking safety signals, and ensuring that drug-related information is easily accessible and understandable, thereby supporting pharmacovigilance efforts.
The search application is currently running in production with 3000 active users.
Customer success criteria
The following success criteria were use to evaluate the solution:
Quick, high accuracy search results: The top three search results were 99% accurate with an overall latency of less than 3 seconds for users.
Identified, extracted portions of the protocol: The sections identified has a precision of 0.98 and recall of 0.87.
Accurate and relevant search results based on simple human language that answer the user’s question.
Clear UI and transparency on which portions of the aligned documents (protocol, clinical study reports, and publications) matched the text extraction.
Knowing what evidence is completed or in-process reduces redundancy in newly proposed evidence activities.
Challenges faced and learnings
We faced two main challenges in developing and deploying this solution.
Large data volume
The unstructured documents were required to be embedded completely and OpenSearch Service helped us achieve this with the right configuration. This involved deploying OpenSearch Service with master nodes and allocating sufficient storage capacity for embedding and storing unstructured document embeddings entirely. We stored up to 100 GB of embeddings in OpenSearch Service.
Inference time reduction
In the search application, it was vital that the search results were retrieved with lowest possible latency. With the hybrid graph and embedding search, this was challenging.
We addressed high latency issues by using an interconnected framework of graphs and embeddings. Each search method complemented the other, leading to optimal results. Our streamlined search approach ensures efficient queries of both the graph and the embeddings, eliminating any inefficiencies. The graph model was designed to minimize the number of hops required to navigate from one entity to another, and we improved its performance by avoiding the storage of bulky metadata. Any metadata too large for the graph was stored in OpenSearch, which served as our metadata store for graph and vector store for embeddings. Embeddings were generated using context-aware chunking of content to reduce the total embedding count and retrieval time, resulting in efficient querying with minimal inference time.
The Horizontal Pod Autoscaler (HPA) provided by Amazon EKS, intelligently adjusts pod resources based on user-demand or query loads, optimizing resource utilization and maintaining application performance during peak usage periods.
Conclusion
In this post, we described how to build an advanced information retrieval system designed to assist healthcare professionals and researchers in navigating through a diverse range of medical documents, including study protocols, evidence gaps, clinical activities, and publications. By using Amazon OpenSearch Service as a distributed search and vector database and Amazon Neptune as a knowledge graph, ZS was able to remove the undifferentiated heavy lifting associated with building and maintaining such a complex platform.
If you’re facing similar challenges in managing and searching through vast repositories of medical data, consider exploring the powerful capabilities of OpenSearch Service and Neptune. These services can help you unlock new insights and enhance your organization’s knowledge management capabilities.
About the authors
Abhishek Pan is a Sr. Specialist SA-Data working with AWS India Public sector customers. He engages with customers to define data-driven strategy, provide deep dive sessions on analytics use cases, and design scalable and performant analytical applications. He has 12 years of experience and is passionate about databases, analytics, and AI/ML. He is an avid traveler and tries to capture the world through his lens.
Gourang Harhare is a Senior Solutions Architect at AWS based in Pune, India. With a robust background in large-scale design and implementation of enterprise systems, application modernization, and cloud native architectures, he specializes in AI/ML, serverless, and container technologies. He enjoys solving complex problems and helping customer be successful on AWS. In his free time, he likes to play table tennis, enjoy trekking, or read books
Kevin Phillips is a Neptune Specialist Solutions Architect working in the UK. He has 20 years of development and solutions architectural experience, which he uses to help support and guide customers. He has been enthusiastic about evangelizing graph databases since joining the Amazon Neptune team, and is happy to talk graph with anyone who will listen.
Sandeep Varma is a principal in ZS’s Pune, India, office with over 25 years of technology consulting experience, which includes architecting and delivering innovative solutions for complex business problems leveraging AI and technology. Sandeep has been critical in driving various large-scale programs at ZS Associates. He was the founding member the Big Data Analytics Centre of Excellence in ZS and currently leads the Enterprise Service Center of Excellence. Sandeep is a thought leader and has served as chief architect of multiple large-scale enterprise big data platforms. He specializes in rapidly building high-performance teams focused on cutting-edge technologies and high-quality delivery.
Alex Turok has over 16 years of consulting experience focused on global and US biopharmaceutical companies. Alex’s expertise is in solving ambiguous, unstructured problems for commercial and medical leadership. For his clients, he seeks to drive lasting organizational change by defining the problem, identifying the strategic options, informing a decision, and outlining the transformation journey. He has worked extensively in portfolio and brand strategy, pipeline and launch strategy, integrated evidence strategy and planning, organizational design, and customer capabilities. Since joining ZS, Alex has worked across marketing, sales, medical, access, and patient services and has touched over twenty therapeutic categories, with depth in oncology, hematology, immunology and specialty therapeutics.
This post is co-written with Hemant Aggarwal and Naveen Kambhoji from Kaplan.
Kaplan, Inc. provides individuals, educational institutions, and businesses with a broad array of services, supporting our students and partners to meet their diverse and evolving needs throughout their educational and professional journeys. Our Kaplan culture empowers people to achieve their goals. Committed to fostering a learning culture, Kaplan is changing the face of education.
Kaplan data engineers empower data analytics using Amazon Redshift and Tableau. The infrastructure provides an analytics experience to hundreds of in-house analysts, data scientists, and student-facing frontend specialists. The data engineering team is on a mission to modernize its data integration platform to be agile, adaptive, and straightforward to use. To achieve this, they chose the AWS Cloud and its services. There are various types of pipelines that need to be migrated from the existing integration platform to the AWS Cloud, and the pipelines have different types of sources like Oracle, Microsoft SQL Server, MongoDB, Amazon DocumentDB (with MongoDB compatibility), APIs, software as a service (SaaS) applications, and Google Sheets. In terms of scale, at the time of writing over 250 objects are being pulled from three different Salesforce instances.
In this post, we discuss how the Kaplan data engineering team implemented data integration from the Salesforce application to Amazon Redshift. The solution uses Amazon Simple Storage Service as a data lake, Amazon Redshift as a data warehouse, Amazon Managed Workflows for Apache Airflow (Amazon MWAA) as an orchestrator, and Tableau as the presentation layer.
Solution overview
The high-level data flow starts with the source data stored in Amazon S3 and then integrated into Amazon Redshift using various AWS services. The following diagram illustrates this architecture.
Amazon MWAA is our main tool for data pipeline orchestration and is integrated with other tools for data migration. While searching for a tool to migrate data from a SaaS application like Salesforce to Amazon Redshift, we came across Amazon AppFlow. After some research, we found Amazon AppFlow to be well-suited for our requirement to pull data from Salesforce. Amazon AppFlow provides the ability to directly migrate data from Salesforce to Amazon Redshift. However, in our architecture, we chose to separate the data ingestion and storage processes for the following reasons:
We needed to store data in Amazon S3 (data lake) as an archive and a centralized location for our data infrastructure.
From a future perspective, there might be scenarios where we need to transform the data before storing it in Amazon Redshift. By storing the data in Amazon S3 as an intermediate step, we can integrate transformation logic as a separate module without impacting the overall data flow significantly.
Apache Airflow is the central point in our data infrastructure, and other pipelines are being built using various tools like AWS Glue. Amazon AppFlow is one part of our overall infrastructure, and we wanted to maintain a consistent approach across different data sources and targets.
To accommodate these requirements, we divided the pipeline into two parts:
Migrate data from Salesforce to Amazon S3 using Amazon AppFlow
Load data from Amazon S3 to Amazon Redshift using Amazon MWAA
This approach allows us to take advantage of the strengths of each service while maintaining flexibility and scalability in our data infrastructure. Amazon AppFlow can handle the first part of the pipeline without the need for any other tool, because Amazon AppFlow provides functionalities like creating a connection to source and target, scheduling the data flow, and creating filters, and we can choose the type of flow (incremental and full load). With this, we were able to migrate the data from Salesforce to an S3 bucket. Afterwards, we created a DAG in Amazon MWAA that runs an Amazon Redshift COPY command on the data stored in Amazon S3 and moves the data into Amazon Redshift.
We faced the following challenges with this approach:
To do incremental data, we have to manually change the filter dates in the Amazon AppFlow flows, which isn’t elegant. We wanted to automate that date filter change.
Both parts of the pipeline were not in sync because there was no way to know if the first part of the pipeline was complete so that the second part of the pipeline could start. We wanted to automate these steps as well.
Implementing the solution
To automate and resolve the aforementioned challenges, we used Amazon MWAA. We created a DAG that acts as the control center for Amazon AppFlow. We developed an Airflow operator that can perform various Amazon AppFlow functions using Amazon AppFlow APIs like creating, updating, deleting, and starting flows, and this operator is used in the DAG. Amazon AppFlow stores the connection data in an AWS Secrets Managermanaged secret with the prefix appflow. The cost of storing the secret is included with the charge for Amazon AppFlow. With this, we were able to run the complete data flow using a single DAG.
The complete data flow consists of the following steps:
Create the flow in the Amazon AppFlow using a DAG.
Update the flow with the new filter dates using the DAG.
After updating the flow, the DAG starts the flow.
The DAG waits for the flow complete by checking the flow’s status repeatedly.
A success status indicates that the data has been migrated from Salesforce to Amazon S3.
After the data flow is complete, the DAG calls the COPY command to copy data from Amazon S3 to Amazon Redshift.
This approach helped us resolve the aforementioned issues, and the data pipelines have become more robust, simple to understand, straightforward to use with no manual intervention, and less prone to error because we are controlling everything from a single point (Amazon MWAA). Amazon AppFlow, Amazon S3, and Amazon Redshift are all configured to use encryption to protect the data. We also performed logging and monitoring, and implemented auditing mechanisms to track the data flow and access using AWS CloudTrail and Amazon CloudWatch. The following figure shows a high-level diagram of the final approach we took.
Conclusion
In this post, we shared how Kaplan’s data engineering team successfully implemented a robust and automated data integration pipeline from Salesforce to Amazon Redshift, using AWS services like Amazon AppFlow, Amazon S3, Amazon Redshift, and Amazon MWAA. By creating a custom Airflow operator to control Amazon AppFlow functionalities, we orchestrated the entire data flow seamlessly within a single DAG. This approach has not only resolved the challenges of incremental data loading and synchronization between different pipeline stages, but has also made the data pipelines more resilient, straightforward to maintain, and less error-prone. We reduced the time for creating a pipeline for a new object from an existing instance and a new pipeline for a new source by 50%. This also helped remove the complexity of using a delta column to get the incremental data, which also helped reduce the cost per table by 80–90% compared to a full load of objects every time.
With this modern data integration platform in place, Kaplan is well-positioned to provide its analysts, data scientists, and student-facing teams with timely and reliable data, empowering them to drive informed decisions and foster a culture of learning and growth.
Try out Airflow with Amazon MWAA and other enhancements to improve your data orchestration pipelines.
Hemant Aggarwal is a senior Data Engineer at Kaplan India Pvt Ltd, helping in developing and managing ETL pipelines leveraging AWS and process/strategy development for the team.
Naveen Kambhoji is a Senior Manager at Kaplan Inc. He works with Data Engineers at Kaplan for building data lakes using AWS Services. He is the facilitator for the entire migration process. His passion is building scalable distributed systems for efficiently managing data on cloud.Outside work, he enjoys travelling with his family and exploring new places.
Jimy Matthews is an AWS Solutions Architect, with expertise in AI/ML tech. Jimy is based out of Boston and works with enterprise customers as they transform their business by adopting the cloud and helps them build efficient and sustainable solutions. He is passionate about his family, cars and Mixed martial arts.
Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed service for Apache Airflow that allows you to orchestrate data pipelines and workflows at scale. With Amazon MWAA, you can design Directed Acyclic Graphs (DAGs) that describe your workflows without managing the operational burden of scaling the infrastructure. In this post, we provide guidance on how you can optimize performance and save cost by following best practices.
Amazon MWAA environments include four Airflow components hosted on groups of AWS compute resources: the scheduler that schedules the work, the workers that implement the work, the web server that provides the UI, and the metadata database that keeps track of state. For intermittent or varying workloads, optimizing costs while maintaining price and performance is crucial. This post outlines best practices to achieve cost optimization and efficient performance in Amazon MWAA environments, with detailed explanations and examples. It may not be necessary to apply all of these best practices for a given Amazon MWAA workload; you can selectively choose and implement relevant and applicable principles for your specific workloads.
Right-sizing your Amazon MWAA environment
Right-sizing your Amazon MWAA environment makes sure you have an environment that is able to concurrently scale across your different workloads to provide the best price-performance. The environment class you choose for your Amazon MWAA environment determines the size and the number of concurrent tasks supported by the worker nodes. In Amazon MWAA, you can choose from five different environment classes. In this section, we discuss the steps you can follow to right-size your Amazon MWAA environment.
Monitor resource utilization
The first step in right-sizing your Amazon MWAA environment is to monitor the resource utilization of your existing setup. You can monitor the underlying components of your environments using Amazon CloudWatch, which collects raw data and processes data into readable, near real-time metrics. With these environment metrics, you have greater visibility into key performance indicators to help you appropriately size your environments and debug issues with your workflows. Based on the concurrent tasks needed for your workload, you can adjust the environment size as well as the maximum and minimum workers needed. CloudWatch will provide CPU and memory utilization for all the underlying AWS services utilize by Amazon MWAA. Refer to Container, queue, and database metrics for Amazon MWAA for additional details on available metrics for Amazon MWAA. These metrics also include the number of base workers, additional workers, schedulers, and web servers.
Analyze your workload patterns
Next, take a deep dive into your workflow patterns. Examine DAG schedules, task concurrency, and task runtimes. Monitor CPU/memory usage during peak periods. Query CloudWatch metrics and Airflow logs. Identify long-running tasks, bottlenecks, and resource-intensive operations for optimal environment sizing. Understanding the resource demands of your workload will help you make informed decisions about the appropriate Amazon MWAA environment class to use.
Choose the right environment class
Match requirements to Amazon MWAA environment class specifications (mw1.small to mw1.2xlarge) that can handle your workload efficiently. You can vertically scale up or scale down an existing environment through an API, the AWS Command Line Interface (AWS CLI), or the AWS Management Console. Be aware that a change in the environment class requires a scheduled downtime.
Fine tune configuration parameters
Fine-tuning configuration parameters in Apache Airflow is crucial for optimizing workflow performance and cost reductions. It allows you to tune settings such as Auto scaling, parallelism, logging, and DAG code optimizations.
Auto scaling
Amazon MWAA supports worker auto scaling, which automatically adjusts the number of running worker and web server nodes based on your workload demands. You can specify the minimum and maximum number of Airflow workers that run in your environment. For worker node auto scaling, Amazon MWAA uses RunningTasks and QueuedTasks metrics, where (tasks running + tasks queued) / (tasks per worker) = (required workers). If the required number of workers is greater than the current number of running workers, Amazon MWAA will add additional worker instances using AWS Fargate, up to the maximum value specified by the maximum worker configuration.
Auto scaling in Amazon MWAA will gracefully downscale when there are more additional workers than required. For example, let’s assume a large Amazon MWAA environment with a minimum of 1 worker and a maximum of 10, where each large Amazon MWAA worker can support up to 20 tasks. Let’s say, each day at 8:00 AM, DAGs start up that use 190 concurrent tasks. Amazon MWAA will automatically scale to 10 workers, because the required workers = 190 requested tasks (some running, some queued) / 20 (tasks per worker) = 9.5 workers, rounded up to 10. At 10:00 AM, half of the tasks complete, leaving 85 running. Amazon MWAA will then downscale to 6 workers (95 tasks/20 tasks per worker = 5.25 workers, rounded up to 6). Any workers that are still running tasks remain protected during downscaling until they’re complete, and no tasks will be interrupted. As the queued and running tasks decrease, Amazon MWAA will remove workers without affecting running tasks, down to the minimum specified worker count.
Web server auto scaling in Amazon MWAA allows you to automatically scale the number of web servers based on CPU utilization and active connection count. Amazon MWAA makes sure your Airflow environment can seamlessly accommodate increased demand, whether from REST API requests, AWS CLI usage, or more concurrent Airflow UI users. You can specify the maximum and minimum web server count while configuring your Amazon MWAA environment.
Logging and metrics
In this section, we discuss the steps to select and set the appropriate log configurations and CloudWatch metrics.
Choose the right log levels
If enabled, Amazon MWAA will send Airflow logs to CloudWatch. You can view the logs to determine Airflow task delays or workflow errors without the need for additional third-party tools. You need to enable logging to view Airflow DAG processing, tasks, scheduler, web server, and worker logs. You can enable Airflow logs at the INFO, WARNING, ERROR, or CRITICAL level. When you choose a log level, Amazon MWAA sends logs for that level and higher levels of severity. Standard CloudWatch logs charges apply, so reducing log levels where possible can reduce overall costs. Use the most appropriate log level based on environment, such as INFO for dev and UAT, and ERROR for production.
You can choose which Airflow metrics are sent to CloudWatch by using the Amazon MWAA configuration option metrics.statsd_allow_list. Refer to the complete list of available metrics. Some metrics such as schedule_delay and duration_success are published per DAG, whereas others such as ti.finish are published per task per DAG.
Therefore, the cumulative number of DAGs and tasks directly influence your CloudWatch metric ingestion costs. To control CloudWatch costs, choose to publish selective metrics. For example, the following will only publish metrics that start with scheduler and executor:
An effective practice is to utilize regular expression (regex) pattern matching against the entire metric name instead of only matching the prefix at the beginning of the name.
Monitor CloudWatch dashboards and set up alarms
Create a custom dashboard in CloudWatch and add alarms for a particular metric to monitor the health status of your Amazon MWAA environment. Configuring alarms allows you to proactively monitor the health of the environment.
Optimize AWS Secrets Manager invocations
Airflow has a mechanism to store secrets such as variables and connection information. By default, these secrets are stored in the Airflow meta database. Airflow users can optionally configure a centrally managed location for secrets, such as AWS Secrets Manager. When specified, Airflow will first check this alternate secrets backend when a connection or variable is requested. If the alternate backend contains the needed value, it is returned; if not, Airflow will check the meta database for the value and return that instead. One of the factors affecting the cost to use Secrets Manager is the number of API calls made to it.
On the Amazon MWAA console, you can configure the backend Secrets Manager path for the connections and variables that will be used by Airflow. By default, Airflow searches for all connections and variables in the configured backend. To reduce the number of API calls Amazon MWAA makes to Secrets Manager on your behalf, configure it to use a lookup pattern. By specifying a pattern, you narrow the possible paths that Airflow will look at. This will help in lowering your costs when using Secrets Manager with Amazon MWAA.
To use a secrets cache, enable AIRFLOW_SECRETS_USE_CACHE with TTL to help to reduce the Secrets Manager API calls.
For example, if you want to only look up a specific subset of connections, variables, or config in Secrets Manager, set the relevant *_lookup_pattern parameter. This parameter takes a regex as a string as value. To lookup connections starting with m in Secrets Manager, your configuration file should look like the following code:
Schedulers and workers are two components that are involved in parsing the DAG. After the scheduler parses the DAG and places it in a queue, the worker picks up the DAG from the queue. At the point, all the worker knows is the DAG_id and the Python file, along with some other info. The worker has to parse the Python file in order to run the task.
DAG parsing is run twice, once by the scheduler and then by the worker. Because the workers are also parsing the DAG, the amount of time it takes for the code to parse dictates the number of workers needed, which adds cost of running those workers.
For example, for a total of 200 DAGs having 10 tasks each, taking 60 seconds per task to parse, we can calculate the following:
Total tasks across all DAGs = 2,000
Time per task = 60 seconds + 20 seconds (parse DAG)
Total time = 2000 * 80 = 160,000 seconds
Total time per worker = 72,000 seconds
Number of workers needs = Total time/Total time per worker = 160,000/72,000 = ~3
Now, let’s increase the time taken to parse the DAGs to 100 seconds:
Total tasks across all DAGs = 2,000
Time per task = 60 seconds + 100 seconds
Total time = 2,000 *160 = 320,000 seconds
Total time per worker = 72,000 seconds
Number of workers needs = Total time/Total time per worker = 320,000/72,000 = ~5
As you can see, when the DAG parsing time increased from 20 seconds to 100 seconds, the number of worker nodes needed increased from 3 to 5, thereby adding compute cost.
To reduce the time it takes for parsing the code, follow the best practices in the subsequent sections.
Remove top-level imports
Code imports will run every time the DAG is parsed. If you don’t need the libraries being imported to create the DAG objects, move the import to the task level instead of defining it at the top. After it’s defined in the task, the import will be called only when the task is run.
Avoid multiple calls to databases like the meta database or external system database. Variables are used within the DAG that are defined in the meta database or a backend system like Secrets Manager. Use templating (Jinja) wherein calls to populate the variables are only made at task runtime and not at task parsing time.
For example, see the following code:
import pendulum
from airflow import DAG
from airflow.decorators import task
import numpy as np # <-- DON'T DO THAT!
with DAG(
dag_id="example_python_operator",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
) as dag:
@task()
def print_array():
"""Print Numpy array."""
import numpy as np # <-- INSTEAD DO THIS!
a = np.arange(15).reshape(3, 5)
print(a)
return a
print_array()
The following code is another example:
# Bad example
from airflow.models import Variable
foo_var = Variable.get("foo") # DON'T DO THAT
bash_use_variable_bad_1 = BashOperator(
task_id="bash_use_variable_bad_1", bash_command="echo variable foo=${foo_env}", env={"foo_env": foo_var}
)
bash_use_variable_bad_2 = BashOperator(
task_id="bash_use_variable_bad_2",
bash_command=f"echo variable foo=${Variable.get('foo')}", # DON'T DO THAT
)
bash_use_variable_bad_3 = BashOperator(
task_id="bash_use_variable_bad_3",
bash_command="echo variable foo=${foo_env}",
env={"foo_env": Variable.get("foo")}, # DON'T DO THAT
)
# Good example
bash_use_variable_good = BashOperator(
task_id="bash_use_variable_good",
bash_command="echo variable foo=${foo_env}",
env={"foo_env": "{{ var.value.get('foo') }}"},
)
@task
def my_task():
var = Variable.get("foo") # this is fine, because func my_task called only run task, not scan DAGs.
print(var)
Writing DAGs
Complex DAGs with a large number of tasks and dependencies between them can impact performance of scheduling. One way to keep your Airflow instance performant and well utilized is to simplify and optimize your DAGs.
For example, a DAG that has simple linear structure A → B → C will experience less delays in task scheduling than a DAG that has a deeply nested tree structure with an exponentially growing number of dependent tasks.
Dynamic DAGs
In the following example, a DAG is defined with hardcoded table names from a database. A developer has to define N number of DAGs for N number of tables in a database.
# Bad example
dag_params = getData()
no_of_dags = int(dag_params["no_of_dags"]['N'])
# build a dag for each number in no_of_dags
for n in range(no_of_dags):
dag_id = 'dynperf_t1_{}'.format(str(n))
default_args = {'owner': 'airflow','start_date': datetime(2022, 2, 2, 12, n)}
To reduce verbose and error-prone work, use dynamic DAGs. The following definition of the DAG is created after querying a database catalog, and creates as many DAGs dynamically as there are tables in the database. This achieves the same objective with less code.
Running all DAGs simultaneously or within a short interval in your environment can result in a higher number of worker nodes required to process the tasks, thereby increasing compute costs. For business scenarios where the workload is not time-sensitive, consider spreading the schedule of DAG runs in a way that maximizes the utilization of available worker resources.
DAG folder parsing
Simpler DAGs are usually only in a single Python file; more complex DAGs might be spread across multiple files and have dependencies that should be shipped with them. You can either do this all inside of the DAG_FOLDER , with a standard filesystem layout, or you can package the DAG and all of its Python files up as a single .zip file. Airflow will look into all the directories and files in the DAG_FOLDER. Using the .airflowignore file specifies which directories or files Airflow should intentionally ignore. This will increase the efficiency of finding a DAG within a directory, improving parsing times.
Deferrable operators
You can run deferrable operators on Amazon MWAA. Deferrable operators have the ability to suspend themselves and free up the worker slot. No tasks in the worker means fewer required worker resources, which can lower the worker cost.
For example, let’s assume you’re using a large number of sensors that wait for something to occur and occupy worker node slots. By making the sensors deferrable and using worker auto scaling improvements to aggressively downscale workers, you will immediately see an impact where fewer worker nodes are needed, saving on worker node costs.
Dynamic Task Mapping
Dynamic Task Mapping allows a way for a workflow to create a number of tasks at runtime based on current data, rather than the DAG author having to know in advance how many tasks would be needed. This is similar to defining your tasks in a for loop, but instead of having the DAG file fetch the data and do that itself, the scheduler can do this based on the output of a previous task. Right before a mapped task is run, the scheduler will create N copies of the task, one for each input.
Stop and start the environment
You can stop and start your Amazon MWAA environment based on your workload requirements, which will result in cost savings. You can perform the action manually or automate stopping and starting Amazon MWAA environments. Refer to Automating stopping and starting Amazon MWAA environments to reduce cost to learn how to automate the stop and start of your Amazon MWAA environment retaining metadata.
Conclusion
In conclusion, implementing performance optimization best practices for Amazon MWAA can significantly reduce overall costs while maintaining optimal performance and reliability. Key strategies include right-sizing environment classes based on CloudWatch metrics, managing logging and monitoring costs, using lookup patterns with Secrets Manager, optimizing DAG code, and selectively stopping and starting environments based on workload demands. Continuously monitoring and adjusting these settings as workloads evolve can maximize your cost-efficiency.
About the Authors
Sriharsh Adari is a Senior Solutions Architect at AWS, where he helps customers work backward from business outcomes to develop innovative solutions on AWS. Over the years, he has helped multiple customers on data platform transformations across industry verticals. His core area of expertise includes technology strategy, data analytics, and data science. In his spare time, he enjoys playing sports, binge-watching TV shows, and playing Tabla.
Retina Satish is a Solutions Architect at AWS, bringing her expertise in data analytics and generative AI. She collaborates with customers to understand business challenges and architect innovative, data-driven solutions using cutting-edge technologies. She is dedicated to delivering secure, scalable, and cost-effective solutions that drive digital transformation.
Jeetendra Vaidya is a Senior Solutions Architect at AWS, bringing his expertise to the realms of AI/ML, serverless, and data analytics domains. He is passionate about assisting customers in architecting secure, scalable, reliable, and cost-effective solutions.
The Amazon Global Transportation Technology Services (GTTS) team owns a set of products called INSITE (Insights Into Transportation Everywhere). These products are user-facing applications that solve specific business problems across different transportation domains: network topology management, capacity management, and network monitoring. As of this writing, GTTS serves around 10,000 customers globally on a monthly basis, managing the outbound transportation network.
INSITE applications are in general data intensive. They ingest and transform large volumes of data in different formats and processing patterns (such as batch and near real time) from various sources internal and external to Amazon. Datasets are often shared between applications both within domains and across domains, and are consumed in complex data pipelines that run under tight SLAs. To enable and meet these requirements, GTTS built its own data platform.
A critical component of the data platform is the data pipeline orchestrator. GTTS built its own orchestrator named Langley in 2018, and used it to schedule and monitor extract, transform, and load (ETL) jobs on a variety of compute platforms, such as Amazon EMR, Amazon Redshift, Amazon Relational Database Service (Amazon RDS).
As the Langley user base grew, GTTS engineers faced a couple of challenges on key dimensions, such as maintainability, scalability, multi-tenancy, observability, and interoperability.
Amazon GTTS partnered with AWS Professional Services to modernize their orchestration platform, relying as much as possible on managed services with auto scaling capabilities. After analyzing candidate solutions, the team decided to build a target solution relying on Amazon Managed Workflows for Apache Airflow (Amazon MWAA). This post elaborates on the drivers of the migration and its achieved benefits.
Legacy platform
Amazon GTTS works with diverse and distributed data stores, storing petabytes of data. Data engineers need a tool to define ETL jobs which run on various compute environments, as illustrated in the following diagram.
GTTS built Langley as their custom orchestrator in 2018, and have been operating it ever since. At a high level, the core of Langley’s architecture is based on a set of Amazon Simple Queue Service (Amazon SQS) queues and AWS Lambda functions, and a dedicated RDS database to store ETL job data and metadata. It also uses AWS Data Pipeline to run SQL-based workloads, Amazon Simple Storage Service (Amazon S3) to store configuration files, and Amazon CloudWatch for alarming on failures. Every day, Langley handles the lifecycle of more than 17,000 ETL jobs in Europe and 5,000 ETL jobs in North America.
The following diagram illustrates the Langley architecture.
Business challenges
Langley started as a simple solution to a team-internal problem, but its growth over the years surfaced key issues:
The maintenance of this custom solution requires considerable time from engineers, which increased over the years with the release of new features, increasing the overall complexity.
The Langley user base grew continuously and eventually became a key orchestration platform for multiple teams and products across Amazon. However, it wasn’t created with multi-tenancy in mind and therefore it didn’t provide the robustness and the appropriate level of isolation to guard each tenant from impacting others on the shared platform.
In 2023, AWS announced the upcoming deprecation of Data Pipeline, one of the core services used by Langley.
GTTS partnered with AWS to design and implement a solution to overcome those challenges. AWS used the following evaluation matrix to build a durable solution:
Maintainability
The level of effort required to maintain the orchestrating system in a functional state, encompassing updates, patches, bug fixes, and routine checks for optimal performance.
Costs
The overall expenditure associated with the orchestrator, including infrastructure costs, licensing fees, personnel expenses, and other relevant costs. This criterion particularly assesses the system’s ability to effectively control and reduce costs.
Scheduling
The capabilities related to running and scheduling jobs, including the ability to resume an ETL job from a failed step.
User experience
The overall satisfaction and usability of a system from the end-users’ perspective, considering factors such as responsiveness, accessibility, interoperability, and ease of use.
Security
Mechanisms in place to safeguard data and applications from unauthorized access at all times.
Monitoring and alerting
The continuous observation and analysis of system components and performance metrics to detect and address issues, optimize resource usage, and provide overall health and reliability.
Scalability
The orchestrator’s capacity to efficiently adapt its resources to handle increased workload or demand, providing sustained performance.
Among the explored solutions, Amazon MWAA was finally determined as the best overall performer across this matrix.
The next section is a dive deep into the rationales that led GTTS and AWS Professional Services to choose Amazon MWAA as the best performer.
Benefits of migrating to Amazon MWAA
Amazon GTTS and AWS Professional Services worked together to release a Minimum Viable Product (MVP) of the solution described earlier, which showcases the benefits on the agreed decision criteria.
Maintainability
With their legacy system, Amazon GTTS had to manage the orchestrator database, web servers, activity queue, dispatch functions, and worker nodes.
Amazon MWAA eliminates the need for underlying infrastructure management. It takes care of provisioning and maintenance of the Apache Airflow web server, scheduler, worker nodes, and relational database, allowing GTTS teams to focus on building their ETL jobs.
Amazon MWAA offers one-click updates of the infrastructure for minor versions, like moving from Airflow version x.4.z to x.5.z. During the upgrade process, Amazon MWAA captures a snapshot of your environment metadata; upgrades the workers, schedulers, and web server to the new Airflow version; and finally restores the metadata database using the snapshot, backing it with an automated rollback mechanism.
Costs
Amazon MWAA contributes to a more cost-effective solution by automatically scaling workers depending on the workload. This dynamic scaling in and out avoids over-provisioning and allows the organization to pay for the compute they actually use, without the risk of downtime during activity spikes. Because this is an AWS-managed solution, it also reduced GTTS’s Total Cost of Ownership (TCO) by freeing up time from engineers that were managing the legacy system.
Scheduling
Amazon MWAA supports all the trigger mechanisms that the Amazon orchestrator needed:
Manual trigger – The users can simply invoke a Direct Acyclic Graph (DAG) using the Airflow API or even more simply via the User Interface (UI).
Scheduler – A scheduler can be defined as code, together with the DAG definition, to make sure it will run at specific rates (from hourly to yearly) or on specific cron schedules.
Partial runs on DAG failures – Another key feature for GTTS was the possibility the recover from partial DAG failures without having to rerun the whole DAG. Airflow provides task-level controls that makes this operation straightforward to implement.
User experience
In this section, we discuss three aspects of the user experience: the web UI, the interoperability, and the programming interface.
Web UI
Amazon MWAA comes with a managed web server that hosts the Airflow UI. As a result, and without any maintenance needed, you can use it to quickly run DAGs, check run history, visualize dependencies between DAGs, troubleshoot with a direct access to task logs, manage variables and database connections, and define granular permissions. The following screenshot shows an example of the UI.
Interoperability
One of the most important features evaluated was the ability for the new orchestrator to effortlessly integrate with GTTS multiple data storage services, compute components, and monitoring services.
Amazon MWAA comes with a wide variety of providers preinstalled, such as apache-airflow-providers-amazon, apache-airflow-providers-postgres, and apache-airflow-providers-common-sql. This allowed GTTS to connect with those services using multiple connection methodologies, including AWS IAM Identity Center or AWS Secrets Manager password-based authentications, without having to write a single custom Airflow operator.
Amazon MWAA also makes it straightforward to upgrade providers version and install new ones. By providing a requirements.txt file, GTTS was able to change the major version of apache-airflow-providers-amazon and install the apache-airflow-providers-mysql provider.
Programming interface
Airflow is an orchestrator with a low barrier to entry, especially for those familiar with the Python programming language. Its workflow management is defined in Python scripts, with a well-documented set of native operators and external providers, making it straightforward for Python developers to get started with Airflow and create complex data pipelines.
The following are two key Airflow features:
TaskFlow API – The TaskFlow API removes a lot of the boilerplate code required by traditional operators by using Python decorators while simplifying the DAG editing process DAG with cleaner and more concise DAG files.
Dynamic DAG generation – The dynamic DAG generation capability allowed us to generate DAGs from the original legacy orchestrator’s configuration files. This enabled the platform team to build a centralized framework consumed by multiple teams to keep the code DRY (Don’t Repeat Yourself), providing a seamless migration journey from the legacy orchestrator.
The following screenshot shows an example of these features.
Security
The new Amazon MWAA-based architecture improves GTTS’s posture by introducing granular access control. Amazon MWAA integrates with AWS services such as AWS Key Management Service (AWS KMS), Secrets Manager, and IAM Identity Center to keep data safely encrypted at all times, both at rest and in transit using TLS-based communications. Airflow also includes a role-based access control (RBAC) model to determine what users can do on the platform and enforce the principle of least privilege. Amazon MWAA also natively integrates with AWS CloudTrail for auditing purposes.
The Airflow RBAC model enables administrators to define roles with specific privileges to access Airflow system settings and DAGs themselves. This granular access control reduces the risk of data breaches and malicious activities by limiting access to critical DAGs and sensitive Airflow environment variables. Airflow includes five default roles with different sets of permissions (as shown in the following screenshot), but it is possible to create new roles depending on your security requirements.
GTTS used the Airflow RBAC model to restrict permissions of certain teams and consumers of the application. They also used priority weights and Airflow pools to prioritize tasks and control run concurrency. However, if you want to run a multi-tenant orchestration platform, it’s recommended to use a separate environment for each team. You can assume that everything accessible by the Amazon MWAA role is also accessible to users who can write DAGs to the environment.
To ease authentication in Amazon MWAA, GTTS federated their identity provider (IdP) through Amazon Cognito and SAML. With this integration, users log in to the Amazon MWAA UI using the same identity as in other internal systems, which removes the need for new credentials. The user’s group membership is retrieved from the IdP through Amazon Cognito, and a Lambda function redirects the user to Amazon MWAA with the appropriate Airflow role. This process is illustrated in the following architecture, and is abstracted from the user and attached to a public Application Load Balancer that redirects at the end of the process to an Amazon MWAA private cluster, making the authentication workflow seamless and secure. Refer to Accessing a private Amazon MWAA environment using federated identities to implement it using your own IdP.
Monitoring and alerting
Amazon MWAA integrates with CloudWatch, which manages all infrastructure logs for you. When creating an Amazon MWAA environment, you can configure what level of logs should be saved. GTTS enabled CloudWatch logging for all of the five types of components: Airflow task logs, Airflow web server logs, Airflow scheduler logs, Airflow worker logs, and Airflow DAG processing logs.
These logs are all accessible in CloudWatch for continuous monitoring, but Amazon MWAA users can also access task logs directly from the Airflow UI by looking at the DAG run history. The following screenshot shows an example of task-level logs in Airflow 2.5.1.
Each Amazon MWAA environment includes the schedulers, web server, and worker nodes. Scheduler nodes are responsible for the overall orchestration and parsing of DAG files. These tasks happen in worker nodes that Amazon MWAA auto scales up and down according to system load. When creating a new Amazon MWAA environment, you need to specify the type of worker nodes, the minimum and maximum number of worker nodes, and the scheduler count, as shown in the following screenshot.
There are notably two ways GTTS controlled how Amazon MWAA scales to handle the load:
Minimum and maximum worker count – Amazon MWAA automatically adds or deletes workers within the boundaries you set, depending on the number of tasks that are waiting to be processed. As indicated in the AWS documentation, it is possible to request a quota increase to run up to 50 workers in a single environment.
Size of the node – Larger worker nodes can run more concurrent tasks. For example, mw1.small instances run 5 concurrent tasks by default, whereas mw1.large instances run 20 concurrent tasks by default. The following figure shows the specification for each instance type.
With Amazon MWAA, GTTS can therefore run up to 4,000 concurrent tasks in a single Amazon MWAA environment (50 worker nodes x 80 tasks per node with mw1.2xlarge). This remains an order of magnitude for the load that can fit into the workers vCPUs and RAM, but it is possible to edit the default configuration to add even more tasks per worker. For more information regarding Amazon MWAA automatic scaling, see Configuring Amazon MWAA automatic scaling.
The Amazon MWAA based orchestration platform
After selecting Amazon MWAA as the core service for their orchestrating system, Amazon GTTS and AWS worked together to develop an end-to-end data platform with automation capabilities, access management, monitoring, and integration with downstream systems. The following diagram illustrates the solution architecture.
The following are notable components of the architecture:
DAG update – GTTS Developers manage the creation, update, and deletion of Amazon MWAA DAGs through a dedicated code repository. When a developer edits DAG definitions and commits changes to the code repository, a CI/CD pipeline automatically packages the DAG definition and stores it in Amazon S3, which automatically updates DAGs in Amazon MWAA.
Infrastructure as code – The entire stack is defined as IaC with the AWS CDK, which eases the process of updating components, and makes it repeatable if GTTS wants to extend the solution and redeploy the stack in multiple AWS Regions.
Authentication, authorizations, and Permissions – Permissions are centrally managed with AWS Identity and Access Management (IAM) together with Airflow roles. GTTS integrated their identity provider with Amazon Cognito and Amazon MWAA, so Amazon employees can connect to the Amazon MWAA UI with the same authentication tool they are used to, and see only the DAGs they are allowed to access.
UI and DAG runs – Amazon MWAA includes an AWS-managed web server that exposes the Airflow UI. Amazon employees can connect to this UI to list DAGs, run DAGs, and track their status. In addition, GTTS used the native Amazon MWAA scheduler to automatically invoke DAGs at a specific time.
Airflow workers – The users can use Airflow native providers to run custom Shell or Python code directly on the workers nodes. For compute-intensive jobs, the Amazon MWAA worker can delegate the compute to a more suitable AWS service, such as Apache Spark running on Amazon EMR on Amazon EKS, which will provide compute resources only for the duration of the job, helping in optimizing costs.
Data stores and external computes services – Amazon MWAA comes also with the AWS provider preloaded, allowing a seamless connectivity with more than 23 AWS compute and data services. GTTS can extend the connectivity to other AWS or external services by using Boto3 with the PythonOperator or creating dedicated custom operators.
Logging and alerting – Amazon MWAA is seamlessly integrated with CloudWatch and CloudTrail to publish DAG logs, audit logs, and metrics. This enables GTTS to track completion, troubleshoot, and create an automated alerting and notifications system so DAGs owners can take remediation actions as fast as possible.
Conclusion
Amazon GTTS partnered with AWS Professional Services to overcome the challenges faced by their legacy custom orchestrator against various dimensions such as maintainability, cost efficiency, security, scalability, and observability.
The new Amazon MWAA-based architecture offers significant improvements in the context of the AWS Well-Architected Framework compared to their former system. In terms of operational excellence, the new orchestration platform is built with evolutivity in mind and enables the GTTS team to use the most adapted ETL service to run their jobs. Regarding performance efficiency, GTTS observed up to 70% improvement in end-to-end runtime on their jobs running in Amazon MWAA. In terms of security, the new solution implements best practices such as the deployment in private subnets, authentication of users through Amazon internal federation systems, and data encryption at rest and in transit. Reliability is achieved with Multi-AZ failover and built-in auto scaling to meet the workload demand at all times. Finally, cost is reduced because Amazon MWAA is an AWS-managed service, which decreases the human effort from GTTS to maintain the orchestration platform.
Amazon GTTS is now bringing the MVP into production, where it is planned to handle petabytes of data and host more than 2,000 jobs migrated from the legacy system. Additionally, the migration to Amazon MWAA has empowered GTTS to enhance its operational scalability, paving the way for the integration of new jobs and further expansion with greater efficiency and confidence.
Béntor Bautista is a Senior Data Engineer at Amazon GTTS Louis Hourcade is a Solutions Architect at AWS Raphael Ducay is a Senior DataOps Architect at AWS Konstantin Zarudaev is a DevOps Consultant at AWS Dorra Elboukari is a DevOps Architect at AWS Marcin Zapal is an Engagement Manager at AWS Grigorios Pikoulas is a Strategic Program Lead at AWS Antonio Cennamo is a Senior Customer Practice Manager at AWS
Amazon Managed Workflows for Apache Airflow (Amazon MWAA) provides a fully managed solution for orchestrating and automating complex workflows in the cloud. Amazon MWAA offers two network access modes for accessing the Apache Airflow web UI in your environments: public and private. Customers often deploy Amazon MWAA in private mode and want to use existing login authentication mechanisms and single sign-on (SSO) features to have seamless integration with the corporate Active Directory (AD). Also, the end-users don’t need to log in to the AWS Management Console to access the Airflow UI.
In this post, we illustrate how to configure an Amazon MWAA environment deployed in private network access mode with customer managed VPC endpoints and authenticate users using SAML federated identity using Microsoft Entra ID and Application Load Balancer (ALB). Users can seamlessly log in to the Airflow UI with their corporate credentials and access the DAGs. This solution can be modified for Amazon MWAA public network access mode as well.
Solution overview
The architectural components involved in authenticating the Amazon MWAA environment using SAML SSO are depicted in the following diagram. The infrastructure components include two public subnets and three private subnets. The public subnets are required for the internet-facing ALB. Two private subnets are used to set up the Amazon MWAA environment, and the third private subnet is used to host the AWS Lambda authorizer function. This subnet will have a NAT gateway attached to it, because the function needs to verify the signer to confirm the JWT header has the expected LoadBalancer ARN.
The workflow consists of the following steps:
For SAML configuration, Microsoft Entra ID serves as the identity provider (IdP).
ALB has built-in support for Amazon Cognito and authenticates requests.
Post-authentication, ALB forwards the requests to the Lambda authorizer function. The Lambda function decodes the user’s JWT token and validates whether the user’s AD group is mapped to the relevant AWS Identity and Access Management (IAM) role.
If valid, the function creates a web login token and redirects to the Amazon MWAA environment for successful login.
The following are the high-level steps to deploy the solution:
Appropriate IAM permissions to deploy AWS CloudFormation stack resources
A Microsoft Azure account is required for creating the Microsoft Entra ID app (IdP config) and Microsoft Entra ID P2.
A public certificate for the ALB in the AWS Region where the infrastructure is being deployed and a custom domain name relevant to the certificate.
Create an S3 bucket
In this step, we create an S3 bucket to store your Airflow DAGs, custom plugins in a plugins.zip file, and Python dependencies in a requirements.txt file. This bucket is used by the Amazon MWAA environment to fetch DAGs and dependency files.
On the Amazon S3 console, choose the Region where you want to create a bucket.
In the navigation pane, choose Buckets.
Choose Create bucket.
For Bucket type, select General purpose.
For Bucket name, enter a name for your bucket (for this post, mwaa-sso-blog-<your-aws-account-number>).
Choose Create bucket.
Navigate to the bucket and choose Create folder.
For Folder name, enter a name (for this post, we name the folder dags).
Choose Create folder.
Import certificates into ACM
ACM is integrated with Elastic Load Balancing (ALB). In this step, you can request a public certificate using ACM or import a certificate into ACM. To import organization certificates linked to a custom DNS into ACM, you must provide the certificate and its private key. To import a certificate signed by a non-AWS Certificate Authority (CA), you must also include the private and public keys of the certificate.
On the ACM console, choose Import certificate in the navigation pane.
For Certificate body, enter the contents of the cert.pem file.
For Certificate private key, enter the contents of the privatekey.pem file.
Choose Next.
Choose Review and import.
Review the metadata about your certificate and choose Import.
After the import is successful, the status of the imported certificate will show as Issued.
Create the Azure AD service, users, groups, and enterprise application
For the SSO integration with Azure, an enterprise application is required, which acts as the IdP for the SAML flow. We add relevant users and groups to the application and configure the SP (Amazon Cognito) details.
Airflow comes with five default roles: Public, Admin, Op, User, Viewer. In this post, we focus on three: Admin , User and Viewer. We create three roles and three corresponding users and assign memberships appropriately.
Log in to the Azure portal.
Navigate to Enterprise applications and choose New application.
Enter a name for your application (for example, mwaa-environment) and choose Create.
You can now view the details of your application. Now you create two groups.
In the search bar, search for Microsoft Entra ID.
On the Add menu, choose Group.
For Group type, choose a type (for this post, Security).
Enter a group name (for example, airflow-admins) and description.
Choose Create.
Repeat these steps to create two more groups, named airflow-users and airflow-viewers.
Note the object IDs for each group (these are required in a later step).
Next, you create users.
On the Overview page, on the Add menu, choose User and Create new user.
Enter a name for your user (for example, mwaa-user), display name, and password.
Choose Review + create.
Repeat these steps to create a user called mwaa-admin.
In your airflow-users group details page, choose Members in the navigation pane.
Choose Add members.
Search for and select the users you created and choose Select.
Repeat these steps to add the users to each group.
Navigate to your application and choose Assign users and groups.
Choose Add user/group.
Search for and select the groups you created, then choose Select.
Deploy the Amazon MWAA environment stack
For this solution, we provide two CloudFormation templates that set up the services illustrated in the architecture. Deploying the CloudFormation stacks in your account incurs AWS usage charges.
The first CloudFormation stack creates the following resources:
A VPC with two public subnets and three private subnets and relevant route tables, NAT gateway, internet gateway, and security group
VPC endpoints required for the Amazon MWAA environment
An Amazon Cognito user pool and user pool domain
Application Load Balancer
Deploy the stack by completing the following steps:
Choose Launch Stack to launch the CloudFormation stack.
For Stack name, enter a name (for example, sso-blog-mwaa-infra-stack).
Enter the following parameters:
For MWAAEnvironmentName, enter the environment name.
For MwaaS3Bucket, enter the S3 artifacts bucket you created.
For VpcCIDR, enter the specify IP range (CIDR notation) for this VPC.
For PrivateSubnet1CIDR, enter the IP range (CIDR notation) for the private subnet in the first Availability Zone.
For PrivateSubnet2CIDR, enter the IP range (CIDR notation) for the private subnet in the second Availability Zone.
For PrivateSubnet3CIDR, enter the IP range (CIDR notation) for the private subnet in the third Availability Zone.
For PublicSubnet1CIDR, enter the IP range (CIDR notation) for the public subnet in the first Availability Zone.
For PublicSubnet2CIDR, enter the IP range (CIDR notation) for the public subnet in the second Availability Zone.
Choose Next
Review the template and choose Create stack.
After the stack is deployed successfully, you can view the resources on the stack’s Outputs tab on the AWS CloudFormation console. Note the ALB URL, Amazon Cognito user pool ID, and domain.
Integrate the Amazon MWAA application with the Azure enterprise application
Next, you configure the SAML configuration in the enterprise application by adding the SP details and redirect URLs (in this case, the Amazon Cognito details and ALB URL).
In the Azure portal, navigate to your environment.
Choose Set up single sign on.
For Identifier, enter urn:amazon:cognito:sp:<your cognito user_id>.
For Reply URL, enter https://<Your user pool domain>/saml2/idpresponse.
For Sign on URL, enter https://<Your application load balancer DNS>.
In the Attributes & Claims section, choose Add a group claim.
Select Security groups.
For Source attribute, choose Group ID.
Choose Save.
Note the values for App Federation Metadata Url and Login URL.
Deploy the ALB stack
When the SAML configuration is complete on the Azure end, the IdP details have to be configured in Amazon Cognito. When users access the ALB URL, they will be authenticated against the corporate identity using SAML through Amazon Cognito. After they’re authenticated, they’re redirected to the Lambda function for authorization against the group they belong to. The user’s group is then validated against matching IAM role. If it’s valid, the Lambda function adds the web login token to the URL, and the user will gain access to the Amazon MWAA environment.
This CloudFormation stack creates the following resources:
Two target groups: the Lambda target group and Amazon MWAA target group
Listener rules for the ALB to redirect URL requests to the relevant target groups
A user pool client and SAML provider (Azure) details to the Amazon Cognito user pool
IAM roles for Admin, User, and Viewer personas required for Airflow
The Lambda authorizer function to validate the JWT token and map Azure groups to IAM roles for appropriate Airflow UI access
Deploy the stack by completing the following steps:
Choose Launch Stack to launch the CloudFormation stack:
For Stack name, enter a name (for example, sso-blog-mwaa-alb-stack).
Enter the following parameters:
For MWAAEnvironmentName, enter your environment name.
For ALBCertificateArn, enter the certificate ARN required for ALB.
For AzureAdminGroupID, enter the group name for the Azure Admin persona.
For AzureUserGroupID, enter the group name for the Azure User persona.
For AzureViewerGroupID, enter the group name for the Azure Viewer persona.
For EntraIDLoginURL, enter the Azure IdP URI.
For AppFederationMetadataURL, enter the URL of the metadata file for the SAML provider.
Choose Next.
Review the template and choose Create stack.
Test the solution
Now that the SAML configuration and relevant AWS services are created, it’s time to access the Amazon MWAA environment.
Open your web browser and enter the ALB DNS name. The SP initiates the sign-in request process and the browser redirects you to the Microsoft login page for credentials.
Enter the Admin user credentials.
The SAML request sign-in process completes and the SAML response is redirected to the Amazon Cognito user pool attached to the ALB.
The listener rules will validate the query URL and pass the requests to the Lambda authorizer to validate the JWT and assign the appropriate group (Azure) to role (AWS) mapping.
Repeat the steps to log in with User and Viewer credentials and observe the differences in access.
Clean up
When you’re done experimenting with this solution, it’s essential to clean up your resources to avoid incurring AWS charges.
On the AWS CloudFormation console, delete the stacks you created.
Remove the SSM parameters and private webserver and database VPC endpoints (created by the Lambda events function):
Delete the users, groups, and enterprise application in the Azure environment.
Conclusion
In this post, we demonstrated how to integrate Amazon MWAA with organization Azure AD services. We walked through the solution that solves this problem using infrastructure as code. This solution allows different end-user personas in your organization to access the Amazon MWAA Airflow UI using SAML SSO.
Satya Chikkala is a Solutions Architect at Amazon Web Services. Based in Melbourne, Australia, he works closely with enterprise customers to accelerate their cloud journey. Beyond work, he is very passionate about nature and photography.
Vijay Velpula is a Data Lake Architect with AWS Professional Services. He assists customers in building modern data platforms by implementing big data and analytics solutions. Outside of his professional responsibilities, Velpula enjoys spending quality time with his family, as well as indulging in travel, hiking, and biking activities.
Data Pipeline has been a foundational service for getting customer off the ground for their extract, transform, load (ETL) and infra provisioning use cases. Some customers want a deeper level of control and specificity than possible using Data Pipeline. With the recent advancements in the data industry, customers are looking for a more feature-rich platform to modernize their data pipelines to get them ready for data and machine learning (ML) innovation. This post explains how to migrate from Data Pipeline to alternate AWS services to serve the growing needs of data practitioners. The option you choose depends on your current workload on Data Pipeline. You can migrate typical use cases of Data Pipeline to AWS Glue, Step Functions, or Amazon MWAA.
Note that you will need to modify the configurations and code in the examples provided in this post based on your requirements. Before starting any production workloads after migration, you need to test your new workflows to ensure no disruption to production systems.
Migrating workloads to AWS Glue
AWS Glue is a serverless data integration service that helps analytics users to discover, prepare, move, and integrate data from multiple sources. It includes tooling for authoring, running jobs, and orchestrating workflows. With AWS Glue, you can discover and connect to hundreds of different data sources and manage your data in a centralized data catalog. You can visually create, run, and monitor ETL pipelines to load data into your data lakes. Also, you can immediately search and query cataloged data using Amazon Athena, Amazon EMR, and Amazon Redshift Spectrum.
We recommend migrating your Data Pipeline workload to AWS Glue when:
You’re looking for a serverless data integration service that supports various data sources, authoring interfaces including visual editors and notebooks, and advanced data management capabilities such as data quality and sensitive data detection.
Your workload can be migrated to AWS Glue workflows, jobs (in Python or Apache Spark) and crawlers (for example, your existing pipeline is built on top of Apache Spark).
You need a single platform that can handle all aspects of your data pipeline, including ingestion, processing, transfer, integrity testing, and quality checks.
The template uses EmrActivity (named TableBackupActivity) which runs on EmrCluster (named EmrClusterForBackup) and backs up data on DynamoDBDataNode to S3DataNode.
You can migrate these pipelines to AWS Glue because it natively supports reading from DynamoDB.
To define an AWS Glue job for the preceding use case:
Open the AWS Glue console.
Choose ETL jobs.
Choose Visual ETL.
For Sources, select Amazon DynamoDB.
On the node Data source - DynamoDB, for DynamoDB source, select Choose the DynamoDB table directly, then select your source DynamoDB table from the menu.
For Connection options, enter s3.bucket and dynamodb.s3.prefix.
Choose + (plus) to add a new node.
For Targets, select Amazon S3.
On the node Data target - S3 bucket, for Format, select your preferred format, for example, Parquet.
For S3 Target location, enter your destination S3 path.
Your AWS Glue job has been successfully created and started.
You might notice that there is no property to manage read I/O rate. It’s because the default DynamoDB reader used in Glue Studio does not scan the source DynamoDB table. Instead it uses DynamoDB export.
Example: Migrate EmrActivity on EmrCluster to import DynamoDB from S3
The template uses EmrActivity (named TableLoadActivity) which runs on EmrCluster (named EmrClusterForLoad) and loads data from S3DataNode to DynamoDBDataNode.
You can migrate these pipelines to AWS Glue because it natively supports writing to DynamoDB.
Prerequisites are to create a destination DynamoDB table and catalog it on Glue Data Catalog using Glue crawler, Glue console, or the API.
Open the AWS Glue console.
Choose ETL jobs.
Choose Visual ETL.
For Sources, select Amazon S3.
On the node Data source - S3 bucket, for S3 URL, enter your S3 path.
Choose + (plus) to add a new node.
For Targets, select AWS Glue Data Catalog.
On the node Data target - Data Catalog, for Database, select your destination database on Data Catalog.
For Table, select your destination table on Data Catalog.
Your AWS Glue job has been successfully created and started.
Migrating workloads to Step Functions
AWS Step Functions is a serverless orchestration service that lets you build workflows for your business-critical applications. With Step Functions, you use a visual editor to build workflows and integrate directly with over 11,000 actions for over 250 AWS services, including AWS Lambda, Amazon EMR, DynamoDB, and more. You can use Step Functions for orchestrating data processing pipelines, handling errors, and working with the throttling limits on the underlying AWS services. You can create workflows that process and publish machine learning models, orchestrate micro-services, as well as control AWS services, such as AWS Glue, to create ETL workflows. You also can create long-running, automated workflows for applications that require human interaction.
We recommend migrating your Data Pipeline workload to Step Functions when:
You’re looking for a serverless, highly available workflow orchestration service.
You’re looking for a cost-effective solution that charges at single-task granularity.
Your workloads are orchestrating tasks for multiple AWS services, such as Amazon EMR, AWS Lambda, AWS Glue, or DynamoDB.
You’re looking for a low-code solution that comes with a drag-and-drop visual designer for workflow creation and doesn’t require learning new programming concepts.
You’re looking for a service that provides integrations with over 250 AWS services covering over 11,000 actions out-of-the-box, as well as allowing integrations with custom non-AWS services and activities.
Both Data Pipeline and Step Functions use JSON format to define workflows. This allows you to store your workflows in source control, manage versions, control access, and automate with continuous integration and development (CI/CD). Step Functions use a syntax called Amazon State Language, which is fully based on JSON and allows a seamless transition between the textual and visual representations of the workflow.
Your workload requires orchestrating on-premises servers, user-managed EC2 instances, or a user-managed EMR cluster.
With Step Functions, you can choose the same version of Amazon EMR that you’re currently using in Data Pipeline.
For migrating activities on Data Pipeline managed resources, you can use AWS SDK service integration on Step Functions to automate resource provisioning and cleaning up. For migrating activities on on-premises servers, user-managed EC2 instances, or a user-managed EMR cluster, you can install an SSM agent to the instance. You can initiate the command through the AWS Systems Manager Run Command from Step Functions. You can also initiate the state machine from the schedule defined in Amazon EventBridge.
Example: Migrate HadoopActivity on EmrCluster
To migrate HadoopActivity on EmrCluster on Data Pipeline to Step Functions:
Open the AWS Step Functions console.
Choose State machines.
Choose Create state machine.
In the Choose a template wizard, search for emr, select Manage an EMR job, and choose Select.
For Choose how to use this template, select Build on it.
Choose Use template.
For Create an EMR cluster state, configure API Parameters based on the EMR release label, EMR capacity, IAM role, and so on based on the existing EmrClusternode configuration on Data Pipeline.
For Run first step state, configure API Parameters based on the JAR file and arguments based on the existing HadoopActivity node configuration on Data Pipeline.
If you have further activities configured on the existing HadoopActivity, repeat step 8.
Amazon MWAA is a managed orchestration service for Apache Airflow that lets you use the Apache Airflow platform to set up and operate end-to-end data pipelines in the cloud at scale. Apache Airflow is an open-source tool used to programmatically author, schedule, and monitor sequences of processes and tasks referred to as workflows. Apache Airflow brings in new concepts like executors, pools, and SLAs that provide you with superior data orchestration capabilities. With Amazon MWAA, you can use Airflow and Python programming language to create workflows without having to manage the underlying infrastructure for scalability, availability, and security. Amazon MWAA automatically scales its workflow runtime capacity to meet your needs and is integrated with AWS security services to help provide you with fast and secure access to your data.
We recommend migrating your Data Pipeline workloads to Amazon MWAA when:
You’re looking for a managed, highly available service to orchestrate workflows written in Python.
You want to transition to a fully managed, widely adopted open source technology—Apache Airflow—for maximum portability.
You require a single platform that can handle all aspects of your data pipeline, including ingestion, processing, transfer, integrity testing, and quality checks.
You’re looking for a service designed for data pipeline orchestration with features such as rich UI for observability, restarts for failed workflows, backfills, retries for tasks, and lineage support with OpenLineage.
You’re looking for a service that comes with more than 1,000 pre-built operators and sensors, covering AWS as well as non-AWS services.
Your workload requires orchestrating on-premises servers, user-managed EC2 instances, or a user-managed EMR cluster.
Amazon MWAA workflows are defined as directed acyclic graphs (DAGs) using Python, so you can also treat them as source code. Airflow’s extensible Python framework enables you to build workflows connecting with virtually any technology. It comes with a rich user interface for viewing and monitoring workflows and can be easily integrated with version control systems to automate the CI/CD process. With Amazon MWAA, you can choose the same version of Amazon EMR that you’re currently using in Data Pipeline.
Example: Migrate HadoopActivity on EmrCluster
Complete the following steps in case you do not have existing MWAA environments:
On the CloudFormation console, choose Stacks in the navigation pane.
Choose Create stack with the option With new resources (standard).
Choose Upload a template file and select the local template file.
Choose Next.
Complete the setup steps, entering a name for the environment, and leave the rest of the parameters as default.
On the last step, acknowledge that resources will be created and choose Submit.
The creation can take 20–30 minutes, until the status of the stack changes to CREATE_COMPLETE. The resource that will take the most time is the Airflow environment. While it’s being created, you can continue with the following steps, until you’re required to open the Airflow UI.
An Airflow workflow is based on a DAG, which is defined by a Python file that programmatically specifies the different tasks involved and its interdependencies. Complete the following scripts to create the DAG:
Create a local file named emr_dag.py using a text editor with following snippets, and configure the EMR related parameters based on the existing Data Pipeline definition:
Defining the schedule in Amazon MWAA is as simple as updating the schedule_interval parameter for the DAG. For example, to run the DAG daily, set schedule_interval='@daily'.
Now, you create a workflow that invokes the Amazon EMR step you just created:
On the Amazon S3 console, locate the bucket created by the CloudFormation template, which will have a name starting with the name of the stack followed by -environmentbucket- (for example, myairflowstack-environmentbucket-ap1qks3nvvr4).
Inside that bucket, create a folder called dags, and inside that folder, upload the DAG file emr_dag.py that you created in the previous section.
On the Amazon MWAA console, navigate to the environment you deployed with the CloudFormation stack.
If the status is not yet Available, wait until it reaches that state. It shouldn’t take longer than 30 minutes after you deployed the CloudFormation stack.
Choose the environment link on the table to see the environment details.
It’s configured to pick up DAGs from the bucket and folder you used in the previous steps. Airflow will monitor that folder for changes.
Choose Open Airflow UI to open a new tab accessing the Airflow UI, using the integrated IAM security to sign you in.
If there are issues with the DAG file you created, it will display an error on top of the page indicating the lines affected. In that case, review the steps and upload again. After a few seconds, it will parse it and update or remove the error banner.
Clean up
After you migrate your existing Data Pipeline workload and verify that the migration was successful, delete your pipelines in Data Pipeline to stop further runs and billing.
Conclusion
In this blog post, we outlined a few alternate AWS services for migrating your existing Data Pipeline workloads. You can migrate to AWS Glue to run and orchestrate Apache Spark applications, AWS Step Functions to orchestrate workflows involving various other AWS services, or Amazon MWAA to help manage workflow orchestration using Apache Airflow. By migrating, you will be able to run your workloads with a broader range of data integration functionalities. If you have additional questions, post in the comments or read about migration examples in our documentation.
About the authors
Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team and AWS Data Pipeline team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.
Vaibhav Porwal is a Senior Software Development Engineer on the AWS Glue and AWS Data Pipeline team. He is working on solving problems in orchestration space by building low cost, repeatable, scalable workflow systems that enables customers to create their ETL pipelines seamlessly.
Sriram Ramarathnam is a Software Development Manager on the AWS Glue and AWS Data Pipeline team. His team works on solving challenging distributed systems problems for data integration across AWS serverless and serverfull compute offerings.
Matt Su is a Senior Product Manager on the AWS Glue team and AWS Data Pipeline team. He enjoys helping customers uncover insights and make better decisions using their data with AWS Analytics services. In his spare time, he enjoys skiing and gardening.
Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow that significantly improves security and availability, and reduces infrastructure management overhead when setting up and operating end-to-end data pipelines in the cloud.
Today, we are announcing the availability of Apache Airflow version 2.9.2 environments on Amazon MWAA. Apache Airflow 2.9.2 introduces several notable enhancements, such as new API endpoints for improved dataset management, advanced scheduling options including conditional expressions for dataset dependencies, the combination of dataset and time-based schedules, and custom names in dynamic task mapping for better readability of your DAGs.
In this post, we walk you through some of these new features and capabilities, how you can use them, and how you can set up or upgrade your Amazon MWAA environments to Airflow 2.9.2.
With each new version release, the Apache Airflow community is innovating to make Airflow more data-aware, enabling you to build reactive, event-driven workflows that can accommodate changes in datasets, either between Airflow environments or in external systems. Let’s go through some of these new capabilities.
Logical operators and conditional expressions for DAG scheduling
Prior to the introduction of this capability, users faced significant limitations when working with complex scheduling scenarios involving multiple datasets. Airflow’s scheduling capabilities were restricted to logical AND combinations of datasets, meaning that a DAG run would only be created after all specified datasets were updated since the last run. This rigid approach posed challenges for workflows that required more nuanced triggering conditions, such as running a DAG when any one of several datasets was updated or when specific combinations of dataset updates occurred.
With the release of Airflow 2.9.2, you can now use logical operators (AND and OR) and conditional expressions to define intricate scheduling conditions based on dataset updates. This feature allows for granular control over workflow triggers, enabling DAGs to be scheduled whenever a specific dataset or combination of datasets is updated.
For example, in the financial services industry, a risk management process might need to be run whenever trading data from any regional market is refreshed, or when both trading and regulatory updates are available. The new scheduling capabilities available in Amazon MWAA allow you to express such complex logic using simple expressions. The following diagram illustrates the dependency we need to establish.
The following DAG code contains the logical operations to implement these dependencies:
With Airflow 2.9.2 environments, Amazon MWAA now has a more comprehensive scheduling mechanism that combines the flexibility of data-driven execution with the consistency of time-based schedules.
Consider a scenario where your team is responsible for managing a data pipeline that generates daily sales reports. This pipeline relies on data from multiple sources. Although it’s essential to generate these sales reports on a daily basis to provide timely insights to business stakeholders, you also need to make sure the reports are up to date and reflect important data changes as soon as possible. For instance, if there’s a significant influx of orders during a promotional campaign, or if inventory levels change unexpectedly, the report should incorporate these updates to maintain relevance.
Relying solely on time-based scheduling for this type of data pipeline could lead to potential issues such as outdated information and infrastructure resource wastage.
The DatasetOrTimeSchedule feature introduced in Airflow 2.9 adds the capability to combine conditional dataset expressions with time-based schedules. This means that your workflow can be invoked not only at predefined intervals but also whenever there are updates to the specified datasets, with the specific dependency relationship among them. The following diagram illustrates how you can use this capability to accommodate such scenarios.
See the following DAG code for an example implementation:
from airflow.decorators import dag, task
from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable
from airflow.datasets import Dataset
from datetime import datetime
# Define datasets
orders_dataset = Dataset("s3://path/to/orders/data")
inventory_dataset = Dataset("s3://path/to/inventory/data")
customer_dataset = Dataset("s3://path/to/customer/data")
# Combine datasets using logical operators
combined_dataset = (orders_dataset & inventory_dataset) | customer_dataset
@dag(
dag_id="dataset_time_scheduling",
start_date=datetime(2024, 1, 1),
schedule=DatasetOrTimeSchedule(
timetable=CronTriggerTimetable("0 0 * * *", timezone="UTC"), # Daily at midnight
datasets=combined_dataset
),
catchup=False,
)
def dataset_time_scheduling_pipeline():
@task
def process_orders():
# Task logic for processing orders
pass
@task
def update_inventory():
# Task logic for updating inventory
pass
@task
def update_customer_data():
# Task logic for updating customer data
pass
orders_task = process_orders()
inventory_task = update_inventory()
customer_task = update_customer_data()
dataset_time_scheduling_pipeline()
In the example, the DAG will be run under two conditions:
When the time-based schedule is met (daily at midnight UTC)
When the combined dataset condition is met, when there are updates to both orders and inventory data, or when there are updates to customer data, regardless of the other datasets
This flexibility enables you to create sophisticated scheduling rules that cater to the unique requirements of your data pipelines, so they run when necessary and incorporate the latest data updates from multiple sources.
For more details on data-aware scheduling, refer to Data-aware scheduling in the Airflow documentation.
Dataset event REST API endpoints
Prior to the introduction of this feature, making your Airflow environment aware of changes to datasets in external systems was a challenge—there was no option to mark a dataset as externally updated. With the new dataset event endpoints feature, you can programmatically initiate dataset-related events. The REST API has endpoints to create, list, and delete dataset events.
This capability enables external systems and applications to seamlessly integrate and interact with your Amazon MWAA environment. It significantly improves your ability to expand your data pipeline’s capacity for dynamic data management.
As an example, running the following code from an external system allows you to invoke a dataset event in the target Amazon MWAA environment. This event could then be handled by downstream processes or workflows, enabling greater connectivity and responsiveness in data-driven workflows that rely on timely data updates and interactions.
Airflow 2.9.2 also includes features to ease the operation and monitoring of your environments. Let’s explore some of these new capabilities.
Dag auto-pausing
Customers are using Amazon MWAA to build complex data pipelines with multiple interconnected tasks and dependencies. When one of these pipelines encounters an issue or failure, it can result in a cascade of unnecessary and redundant task runs, leading to wasted resources. This problem is particularly prevalent in scenarios where pipelines run at frequent intervals, such as hourly or daily. A common scenario is a critical pipeline that starts failing during the evening, and due to the failure, it continues to run and fails repeatedly until someone manually intervenes the next morning. This can result in dozens of unnecessary tasks, consuming valuable compute resources and potentially causing data corruption or inconsistencies.
The DAG auto-pausing feature aims to address this challenge by introducing two new configuration parameters:
max_consecutive_failed_dag_runs_per_dag – This is a global Airflow configuration setting. It allows you to specify the maximum number of consecutive failed DAG runs before the DAG is automatically paused.
max_consecutive_failed_dag_runs – This is a DAG-level argument. It overrides the previous global configuration, allowing you to set a custom threshold for each DAG.
In the following code example, we define a DAG with a single PythonOperator. The failing_task is designed to fail by raising a ValueError. The key configuration for DAG auto-pausing is the max_consecutive_failed_dag_runs parameter set in the DAG object. By setting max_consecutive_failed_dag_runs=3, we’re instructing Airflow to automatically pause the DAG after it fails three consecutive times.
from airflow.decorators import dag, task
from datetime import datetime, timedelta
@task
def failing_task():
raise ValueError("This task is designed to fail")
@dag(
dag_id="auto_pause",
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(minutes=1), # Run every minute
catchup=False,
max_consecutive_failed_dag_runs=3, # Set the maximum number of consecutive failed DAG runs
)
def example_dag_with_auto_pause():
failing_task_instance = failing_task()
example_dag_with_auto_pause()
With this parameter, you can now configure your Airflow DAGs to automatically pause after a specified number of consecutive failures.
To learn more, refer to DAG Auto-pausing in the Airflow documentation.
CLI support for bulk pause and resume of DAGs
As the number of DAGs in your environment grows, managing them becomes increasingly challenging. Whether for upgrading or migrating environments, or other operational activities, you may need to pause or resume multiple DAGs. This process can become a daunting cyclical endeavor because you need to navigate through the Airflow UI, manually pausing or resuming DAGs one at a time. These manual activities are time consuming and increase the risk of human error that can result in missteps and lead to data inconsistencies or pipeline disruptions. The previous CLI commands for pausing and resuming DAGs could only handle one DAG at a time, making it inefficient.
Airflow 2.9.2 improves these CLI commands by adding the capability to treat DAG IDs as regular expressions, allowing you to pause or resume multiple DAGs with a single command. This new feature eliminates the need for repetitive manual intervention or individual DAG operations, significantly reducing the risk of human error, providing reliability and consistency in your data pipelines.
As an example, to pause all DAGs generating daily liquidity reporting using Amazon Redshift as a data source, you can use the following CLI command with a regular expression:
Dynamic Task Mapping was added in Airflow 2.3. This powerful feature allows workflows to create tasks dynamically at runtime based on data. Instead of relying on the DAG author to predict the number of tasks needed in advance, the scheduler can generate the appropriate number of copies of a task based on the output of a previous task. Of course, with great powers comes great responsibilities. By default, dynamically mapped tasks were assigned numeric indexes as names. In complex workflows involving high numbers of mapped tasks, it becomes increasingly challenging to pinpoint the specific tasks that require attention, leading to potential delays and inefficiencies in managing and maintaining your data workflows.
Airflow 2.9 introduces the map_index_template parameter, a highly requested feature that addresses the challenge of task identification in Dynamic Task Mapping. With this capability, you can now provide custom names for your dynamically mapped tasks, enhancing visibility and manageability within the Airflow UI.
See the following example:
from airflow.decorators import dag
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def process_data(data):
# Perform data processing logic here
print(f"Processing data: {data}")
@dag(
dag_id="custom_task_mapping_example",
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False,
)
def custom_task_mapping_example():
mapped_processes = PythonOperator.partial(
task_id="process_data_source",
python_callable=process_data,
map_index_template="Processing source={{ task.op_args[0] }}",
).expand(op_args=[["source_a"], ["source_b"], ["source_c"]])
custom_task_mapping_example()
The key aspect in the code is the map_index_template parameter specified in the PythonOperator.partial call. This Jinja template instructs Airflow to use the values of the ops_args environment variable as the map index for each dynamically mapped task instance. In the Airflow UI, you will see three task instances with the indexes source_a, source_b, and source_c, making it straightforward to identify and track the tasks associated with each data source. In case of failures, this capability improves monitoring and troubleshooting.
The map_index_template feature goes beyond simple template rendering, offering dynamic injection capabilities into the rendering context. This functionality unlocks greater levels of flexibility and customization when naming dynamically mapped tasks.
Refer to Named mapping in the Airflow documentation to learn more about named mapping.
TaskFlow decorator for Bash commands
Writing complex Bash commands and scripts using the traditional Airflow BashOperator may bring challenges in areas such as code consistency, task dependencies definition, and dynamic command generation. The new @task.bash decorator addresses these challenges, allowing you to define Bash statements using Python functions, making the code more readable and maintainable. It seamlessly integrates with Airflow’s TaskFlow API, enabling you to define dependencies between tasks and create complex workflows. You can also use Airflow’s scheduling and monitoring capabilities while maintaining a consistent coding style.
The following sample code showcases how the @task.bash decorator simplifies the integration of Bash commands into DAGs, while using the full capabilities of Python for dynamic command generation and data processing:
from airflow.decorators import dag, task
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Sample customer data
customer_data = """
id,name,age,city
1,John Doe,35,New York
2,Jane Smith,42,Los Angeles
3,Michael Johnson,28,Chicago
4,Emily Williams,31,Houston
5,David Brown,47,Phoenix
"""
# Sample order data
order_data = """
order_id,customer_id,product,quantity,price
101,1,Product A,2,19.99
102,2,Product B,1,29.99
103,3,Product A,3,19.99
104,4,Product C,2,14.99
105,5,Product B,1,29.99
"""
@dag(
dag_id='task-bash-customer_order_analysis',
default_args=default_args,
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(days=1),
catchup=False,
)
def customer_order_analysis_dag():
@task.bash
def clean_data():
# Clean customer data
customer_cleaning_commands = """
echo '{}' > cleaned_customers.csv
cat cleaned_customers.csv | sed 's/,/;/g' > cleaned_customers.csv
cat cleaned_customers.csv | awk 'NR > 1' > cleaned_customers.csv
""".format(customer_data)
# Clean order data
order_cleaning_commands = """
echo '{}' > cleaned_orders.csv
cat cleaned_orders.csv | sed 's/,/;/g' > cleaned_orders.csv
cat cleaned_orders.csv | awk 'NR > 1' > cleaned_orders.csv
""".format(order_data)
return customer_cleaning_commands + "\n" + order_cleaning_commands
@task.bash
def transform_data(cleaned_customers, cleaned_orders):
# Transform customer data
customer_transform_commands = """
cat {cleaned_customers} | awk -F';' '{{printf "%s,%s,%s\\n", $1, $2, $3}}' > transformed_customers.csv
""".format(cleaned_customers=cleaned_customers)
# Transform order data
order_transform_commands = """
cat {cleaned_orders} | awk -F';' '{{printf "%s,%s,%s,%s,%s\\n", $1, $2, $3, $4, $5}}' > transformed_orders.csv
""".format(cleaned_orders=cleaned_orders)
return customer_transform_commands + "\n" + order_transform_commands
@task.bash
def analyze_data(transformed_customers, transformed_orders):
analysis_commands = """
# Calculate total revenue
total_revenue=$(awk -F',' '{{sum += $5 * $4}} END {{printf "%.2f", sum}}' {transformed_orders})
echo "Total revenue: $total_revenue"
# Find customers with multiple orders
customers_with_multiple_orders=$(
awk -F',' '{{orders[$2]++}} END {{for (c in orders) if (orders[c] > 1) printf "%s,", c}}' {transformed_orders}
)
echo "Customers with multiple orders: $customers_with_multiple_orders"
# Find most popular product
popular_product=$(
awk -F',' '{{products[$3]++}} END {{max=0; for (p in products) if (products[p] > max) {{max=products[p]; popular=p}}}} END {{print popular}}'
{transformed_orders})
echo "Most popular product: $popular_product"
""".format(transformed_customers=transformed_customers, transformed_orders=transformed_orders)
return analysis_commands
cleaned_data = clean_data()
transformed_data = transform_data(cleaned_data, cleaned_data)
analysis_results = analyze_data(transformed_data, transformed_data)
customer_order_analysis_dag()
Upon successful creation of an Airflow 2.9 environment in Amazon MWAA, certain packages are automatically installed on the scheduler and worker nodes. For a complete list of installed packages and their versions, refer to Apache Airflow provider packages installed on Amazon MWAA environments. You can install additional packages using a requirements file.
Upgrade from older versions of Airflow to version 2.9.2
In this post, we announced the availability of Apache Airflow 2.9 environments in Amazon MWAA. We discussed how some of the latest features added in the release enable you to design more reactive, event-driven workflows, such as DAG scheduling based on the result of logical operations, and the availability of endpoints in the REST API to programmatically create dataset events. We also provided some sample code to show the implementation in Amazon MWAA.
Apache, Apache Airflow, and Airflow are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries.
About the authors
Hernan Garcia is a Senior Solutions Architect at AWS, based out of Amsterdam, working with enterprises in the Financial Services Industry. He specializes in application modernization and supports customers in the adoption of serverless technologies.
Parnab Basak is a Solutions Architect and a Serverless Specialist at AWS. He specializes in creating new solutions that are cloud native using modern software development practices like serverless, DevOps, and analytics. Parnab works closely in the analytics and integration services space helping customers adopt AWS services for their workflow orchestration needs.
Open table formats (OTFs) like Apache Iceberg are being increasingly adopted, for example, to improve transactional consistency of a data lake or to consolidate batch and streaming data pipelines on a single file format and reduce complexity. In practice, architects need to integrate the chosen format with the various layers of a modern data platform. However, the level of support for the different OTFs varies across common analytical services.
Commercial vendors and the open source community have recognized this situation and are working on interoperability between table formats. One approach is to make a single physical dataset readable in different formats by translating its metadata and avoiding reprocessing of actual data files. Apache XTable is an open source solution that follows this approach and provides abstractions and tools for the translation of open table format metadata.
In this post, we show you how to get started with Apache XTable on AWS and how you can use it in a batch pipeline orchestrated with Amazon Managed Workflows for Apache Airflow (Amazon MWAA). To understand how XTable and similar solutions work, we start with a high-level background on metadata management in an OTF and then dive deeper into XTable and its usage.
Open table formats
Open table formats overcome the gaps of traditional storage formats of data lakes such as Apache Hive tables. They provide abstractions and capabilities known from relational databases like transactional consistency and the ability to create, update, or delete single records. In addition, they help manage schema evolution.
In order to understand how the XTable metadata translation approach works, you must first understand how the metadata of an OTF is represented on the storage layer.
An OTF comprises a data layer and a metadata layer, which are both represented as files on storage. The data layer contains the data files. The metadata layer contains metadata files that keep track of the data files and the transactionally consistent sequence of changes to these. The following figure illustrates this configuration.
Inspecting the files of an Iceberg table on storage, we identify the metadata layer through the folder metadata. Adjacent to it are the data files—in this example, as snappy-compressed Parquet:
Comparable to Iceberg, in Delta Lake, the metadata layer is represented through the folder _delta_log:
<table base folder>
├── _delta_log # contains metadata files
│ └── 00000000000000000000.json
└── part-00011-587322f1-1007-4500-a5cf-8022f6e7fa3c-c000.snappy.parquet # data files
Although the metadata layer varies in structure and capabilities between OTFs, it’s eventually just files on storage. Typically, it resides in the table’s base folder adjacent to the data files.
Now, the question emerges: what if metadata files of multiple different formats are stored in parallel for the same table?
Current approaches to interoperability do exactly that, as we will see in the next section.
Apache XTable
XTable is currently provided as a standalone Java binary. It translates the metadata layer between Apache Hudi, Apache Iceberg, or Delta Lake without rewriting data files and integrates with Iceberg-compatible catalogs like the AWS Glue Data Catalog.
In practice, XTable reads the latest snapshot of an input table and creates additional metadata for configurable target formats. It adds this additional metadata to the table on the storage layer—in addition to existing metadata.
Through this, you can choose either format, source or target, read the respective metadata, and get the same consistent view on the table’s data.
The following diagram illustrates the metadata translation process.
Let’s assume you have an existing Delta Lake table that you want to make readable as an Iceberg table. To run XTable, you invoke its Java binary and provide a dataset config file that specifies source and target format, as well as source table paths:
---
sourceFormat: DELTA
targetFormats:
- ICEBERG
datasets:
- tableBasePath: s3://<URI to base folder of table>
tableName: <table name>
...
As shown in the following listing, XTable adds the Iceberg-specific metadata folder to the table’s base path in addition to the existing _delta_log folder. Now, clients can read the table in either Delta Lake or Iceberg format.
<table base folder>
├── _delta_log # Previously existing Delta Lake metadata
│ └── ...
├── metadata # Added by XTable: Apache Iceberg metadata
│ └── ...
└── part-00011-587322f1-1007-4500-a5cf-8022f6e7fa3c-c000.snappy.parquet # data files
To register the Iceberg table in Data Catalog, pass a further config file to XTable that is responsible for Iceberg catalogs:
The minimal contents of glueDataCatalog.yaml are as follows. It configures XTable to use the Data Catalog-specific IcebergCatalog implementation provided by the iceberg-aws module, which is part of the Apache Iceberg core project:
---
catalogImpl: org.apache.iceberg.aws.glue.GlueCatalog
catalogName: glue
catalogOptions:
warehouse: s3://<URI to base folder of Iceberg tables>
catalog-impl: org.apache.iceberg.aws.glue.GlueCatalog
io-impl: org.apache.iceberg.aws.s3.S3FileIO
...
Run Apache XTable as an Airflow Operator
You can use XTable in batch data pipelines that write tables on the data lake and make sure these are readable in different file formats. For instance, operating in the Delta Lake ecosystem, a data pipeline might create Delta tables, which need to be accessible as Iceberg tables as well.
One tool to orchestrate data pipelines on AWS is Amazon MWAA, which is a managed service for Apache Airflow. In the following sections, we explore how XTable can run within a custom Airflow Operator on Amazon MWAA. We elaborate on the initial design of such an Operator and demonstrate its deployment on Amazon MWAA.
Why a custom Operator? Although XTable could also be invoked from a BashOperator directly, we choose to wrap this step in a custom operator to allow for configuration through a native Airflow programming language (Python) and operator parameters only. For a background on how to write custom operators, see Creating a custom operator.
The following diagram illustrates the dependency between the operator and XTable’s binary.
Input parameters of the Operator
XTable’s primary inputs are YAML-based configuration files:
Dataset config – Contains source format, target formats, and source tables
Iceberg catalog config (optional) – Contains the reference to an external Iceberg catalog into which to register the table in the target format
We choose to reflect the data structures of the YAML files in the Operator’s input parameters, as listed in the following table.
Parameter
Type
Values
dataset_config
dict
Contents of dataset config as dict literal
iceberg_catalog_config
dict
Contents of Iceberg catalog config as dict literal
As the Operator runs, the YAML files are generated from the input parameters.
The following example shows the configuration to translate a table from Delta Lake to both Iceberg and Hudi. The attribute dataset_config reflects the structure of the dataset config file through a Python dict literal:
Sample code: The full source code of the sample XtableOperator and all other code used in this post is provided through this GitHub repository.
Solution overview
To deploy the custom operator to Amazon MWAA, we upload it together with DAGs into the configured DAG folder.
Besides the operator itself, we also need to upload XTable’s executable JAR. As of writing this post, the JAR needs to be compiled by the user from source code. To simplify this, we provide a container-based build script.
Prerequisites
We assume you have at least an environment consisting of Amazon MWAA itself, an S3 bucket, and an AWS Identity and Access Management (IAM) role for Amazon MWAA that has read access to the bucket and optionally write access to the AWS Glue Data Catalog.
In addition, you need one of the following container runtimes to run the provided build script for XTable:
Finch
Docker
Build and deploy the XTableOperator
To compile XTable, you can use the provided build script and complete the following steps:
Clone the sample code from GitHub:
git clone https://github.com/aws-samples/apache-xtable-on-aws-samples.git
cd apache-xtable-on-aws-samples
Run the build script:
./build-airflow-operator.sh
Because the Airflow operator uses the library JPype to invoke XTable’s JAR, add a dependency in the Amazon MWAA requirement.txt file:
JPype1==1.5.0
For a background on installing additional Python libraries on Amazon MWAA, see Installing Python dependencies. Because XTable is Java-based, a Java 11 runtime environment (JRE) is required on Amazon MWAA. You can use the Amazon MWAA startup script to install a JRE.
Add the following lines to an existing startup script or create a new one as provided in the sample code base of this post:
if [[ "${MWAA_AIRFLOW_COMPONENT}" != "webserver" ]]
then
sudo yum install -y java-11-amazon-corretto-headless
fi
Upload xtable_operator/, requirements.txt, startup.sh and .airflowignore to the S3 bucket and respective paths from which Amazon MWAA will read files. Make sure the IAM role for Amazon MWAA has appropriate read permissions. With regard to the Customer Operator, make sure to upload the local folder xtable_operator/ and .airflowignore into the configured DAG folder.
Update the configuration of your Amazon MWAA environment as follows and start the update process:
Add or update the S3 URI to the requirements.txt file through the Requirements file configuration option.
Add or update the S3 URI to the startup.sh script through Startup script configuration option.
Optionally, you can use the AWS Glue Data Catalog as an Iceberg catalog. In case you create Iceberg metadata and want to register it in the AWS Glue Data Catalog, the Amazon MWAA role needs permissions to create or modify tables in AWS Glue. The following listing shows a minimal policy for this. It constrains permissions to a defined database in AWS Glue:
Using the XTableOperator in practice: Delta Lake to Apache Iceberg
Let’s look into a practical example that uses the XTableOperator. We continue the scenario of a data pipeline in the Delta Lake ecosystem and assume it is implemented as a DAG on Amazon MWAA. The following figure shows our example batch pipeline.
The pipeline uses an Apache Spark job that is run by AWS Glue to write a Delta table into an S3 bucket. Additionally, the table is made accessible as an Iceberg table without data duplication. Finally, we want to load the Iceberg table into Amazon Redshift, which is a fully managed, petabyte-scale data warehouse service in the cloud.
As shown in the following screenshot of the graph visualization of the example DAG, we run the XTableOperator after creating the Delta table through a Spark job. Then we use the RedshiftDataOperator to refresh a materialized view, which is used in downstream transformations as a source table. Materialized views are a common construct to precompute complex queries on large tables. In this example, we use them to simplify data loading into Amazon Redshift because of the incremental update capabilities in combination with Iceberg.
The input parameters of the XTableOperator are as follows:
The XTableOperator creates Apache Iceberg metadata on Amazon S3 and registers a table accordingly in the Data Catalog. The following screenshot shows the created Iceberg table. AWS Glue stores a pointer to Iceberg’s most recent metadata file. As updates are applied to the table and new metadata files are created, XTable updates the pointer after each job.
Amazon Redshift is able to discover the Iceberg table through the Data Catalog and read it using Amazon Redshift Spectrum.
Summary
In this post, we showed how Apache XTable translates the metadata layer of open table formats without data duplication. This provides advantages from both a cost and data integrity perspective—especially in large-scale environment—and allows for a migration of an existing historical estate of datasets. We also discussed how a you can implement a custom Airflow Operator that embeds Apache XTable into data pipelines on Amazon MWAA.
Matthias Rudolph is an Associate Solutions Architect, digitalizing the German manufacturing industry.
Stephen Said is a Senior Solutions Architect and works with Retail/CPG customers. His areas of interest are data platforms and cloud-native software engineering.
Customers with data engineers and data scientists are using Amazon Managed Workflows for Apache Airflow (Amazon MWAA) as a central orchestration platform for running data pipelines and machine learning (ML) workloads. To support these pipelines, they often require additional Python packages, such as Apache Airflow Providers. For example, a pipeline may require the Snowflake provider package for interacting with a Snowflake warehouse, or the Kubernetes provider package for provisioning Kubernetes workloads. As a result, they need to manage these Python dependencies efficiently and reliably, providing compatibility with each other and the base Apache Airflow installation.
Python includes the tool pip to handle package installations. To install a package, you add the name to a special file named requirements.txt. The pip install command instructs pip to read the contents of your requirements file, determine dependencies, and install the packages. Amazon MWAA runs the pip install command using this requirements.txt file during initial environment startup and subsequent updates. For more information, see How it works.
Creating a reproducible and stable requirements file is key for reducing pip installation and DAG errors. Additionally, this defined set of requirements provides consistency across nodes in an Amazon MWAA environment. This is most important during worker auto scaling, where additional worker nodes are provisioned and having different dependencies could lead to inconsistencies and task failures. Additionally, this strategy promotes consistency across different Amazon MWAA environments, such as dev, qa, and prod.
This post describes best practices for managing your requirements file in your Amazon MWAA environment. It defines the steps needed to determine your required packages and package versions, create and verify your requirements.txt file with package versions, and package your dependencies.
Best practices
The following sections describe the best practices for managing Python dependencies.
Specify package versions in the requirements.txt file
When creating a Python requirements.txt file, you can specify just the package name, or the package name and a specific version. Adding a package without version information instructs the pip installer to download and install the latest available version, subject to compatibility with other installed packages and any constraints. The package versions selected during environment creation may be different than the version selected during an auto scaling event later on. This version change can create package conflicts leading to pip install errors. Even if the updated package installs properly, code changes in the package can affect task behavior, leading to inconsistencies in output. To avoid these risks, it’s best practice to add the version number to each package in your requirements.txt file.
Use the constraints file for your Apache Airflow version
A constraints file contains the packages, with versions, verified to be compatible with your Apache Airflow version. This file adds an additional validation layer to prevent package conflicts. Because the constraints file plays such an important role in preventing conflicts, beginning with Apache Airflow v2.7.2 on Amazon MWAA, your requirements file must include a --constraint statement. If a --constraint statement is not supplied, Amazon MWAA will specify a compatible constraints file for you.
Constraint files are available for each Airflow version and Python version combination. The URLs have the following form:
The official Apache Airflow constraints are guidelines, and if your workflows require newer versions of a provider package, you may need to modify your constraints file and include it in your DAG folder. When doing so, the best practices outlined in this post become even more important to guard against package conflicts.
Create a .zip archive of all dependencies
Creating a .zip file containing the packages in your requirements file and specifying this as the package repository source makes sure the exact same wheel files are used during your initial environment setup and subsequent node configurations. The pip installer will use these local files for installation rather than connecting to the external PyPI repository.
Test the requirements.txt file and dependency .zip file
Testing your requirements file before release to production is key to avoiding installation and DAG errors. Testing both locally, with the MWAA local runner, and in a dev or staging Amazon MWAA environment, are best practices before deploying to production. You can use continuous integration and delivery (CI/CD) deployment strategies to perform the requirements and package installation testing, as described in Automating a DAG deployment with Amazon Managed Workflows for Apache Airflow.
Solution overview
This solution uses the MWAA local runner, an open source utility that replicates an Amazon MWAA environment locally. You use the local runner to build and validate your requirements file, and package the dependencies. In this example, you install the snowflake and dbt-cloud provider packages. You then use the MWAA local runner and a constraints file to determine the exact version of each package compatible with Apache Airflow. With this information, you then update the requirements file, pinning each package to a version, and retest the installation. When you have a successful installation, you package your dependencies and test in a non-production Amazon MWAA environment.
We use MWAA local runner v2.8.1 for this walkthrough and walk through the following steps:
Download and build the MWAA local runner.
Create and test a requirements file with package versions.
Package dependencies.
Deploy the requirements file and dependencies to a non-production Amazon MWAA environment.
Prerequisites
For this walkthrough, you should have the following prerequisites:
An AWS account in an AWS Region where Amazon MWAA is supported
With Docker running, build the container with the following command:
cd aws-mwaa-local-runner
./mwaa-local-env build-image
Create and test a requirements file with package versions
Building a versioned requirements file makes sure all Amazon MWAA components have the same package versions installed. To determine the compatible versions for each package, you start with a constraints file and an un-versioned requirements file, allowing pip to resolve the dependencies. Then you create your versioned requirements file from pip’s installation output.
The following diagram illustrates this workflow.
To build an initial requirements file, complete the following steps:
In your MWAA local runner directory, open requirements/requirements.txt in your preferred editor.
The default requirements file will look similar to the following:
In a terminal, run the following command to generate the pip install output:
./mwaa-local-env test-requirements
test-requirements runs pip install, which handles resolving the compatible package versions. Using a constraints file makes sure the selected packages are compatible with your Airflow version. The output will look similar to the following:
The message beginning with Successfully installed is the output of interest. This shows which dependencies, and their specific version, pip installed. You use this list to create your final versioned requirements file.
Your output will also contain Requirement already satisfied messages for packages already available in the base Amazon MWAA environment. You do not add these packages to your requirements.txt file.
Update the requirements file with the list of versioned packages from the test-requirements command. The updated file will look similar to the following code:
Next, you test the updated requirements file to confirm no conflicts exist.
Rerun the requirements-test function:
./mwaa-local-env test-requirements
A successful test will not produce any errors. If you encounter dependency conflicts, return to the previous step and update the requirements file with additional packages, or package versions, based on pip’s output.
Package dependencies
If your Amazon MWAA environment has a private webserver, you must package your dependencies into a .zip file, upload the file to your S3 bucket, and specify the package location in your Amazon MWAA instance configuration. Because a private webserver can’t access the PyPI repository through the internet, pip will install the dependencies from the .zip file.
If you’re using a public webserver configuration, you also benefit from a static .zip file, which makes sure the package information remains unchanged until it is explicitly rebuilt.
This process uses the versioned requirements file created in the previous section and the package-requirements feature in the MWAA local runner.
To package your dependencies, complete the following steps:
In a terminal, navigate to the directory where you installed the local runner.
Download the constraints file for your Python version and your version of Apache Airflow and place it in the plugins directory. For this post, we use Python 3.11 and Apache Airflow v2.8.1:
In your requirements file, update the constraints URL to the local downloaded file.
The –-constraint statement instructs pip to compare the package versions in your requirements.txt file to the allowed version in the constraints file. Downloading a specific constraints file to your plugins directory enables you to control the constraint file location and contents.
The updated requirements file will look like the following code:
Run the following command to create the .zip file:
./mwaa-local-env package-requirements
package-requirements creates an updated requirements file named packaged_requirements.txt and zips all dependencies into plugins.zip. The updated requirements file looks like the following code:
Note the reference to the local constraints file and the plugins directory. The –-find-links statement instructs pip to install packages from /usr/local/airflow/plugins rather than the public PyPI repository.
Deploy the requirements file
After you achieve an error-free requirements installation and package your dependencies, you’re ready to deploy the assets to a non-production Amazon MWAA environment. Even when verifying and testing requirements with the MWAA local runner, it’s best practice to deploy and test the changes in a non-prod Amazon MWAA environment before deploying to production. For more information about creating a CI/CD pipeline to test changes, refer to Deploying to Amazon Managed Workflows for Apache Airflow.
To deploy your changes, complete the following steps:
Upload your requirements.txt file and plugins.zip file to your Amazon MWAA environment’s S3 bucket.
From the Apache Airflow menu bar, choose Admin, then Providers.
The list of providers, and their versions, is shown in a table. In this example, the page reflects the installation of apache-airflow-providers-db-cloud version 3.5.1 and apache-airflow-providers-snowflake version 5.2.1. This list only contains the provider packages installed, not all supporting Python packages. Provider packages that are part of the base Apache Airflow installation will also appear in the list. The following image is an example of the package list; note the apache-airflow-providers-db-cloud and apache-airflow-providers-snowflake packages and their versions.
To verify all package installations, view the results in Amazon CloudWatch Logs. Amazon MWAA creates a log stream for the requirements installation and the stream contains the pip install output. For instructions, refer to Viewing logs for your requirements.txt.
A successful installation results in the following message:
If you encounter any installation errors, you should determine the package conflict, update the requirements file, run the local runner test, re-package the plugins, and deploy the updated files.
Clean up
If you created an Amazon MWAA environment specifically for this post, delete the environment and S3 objects to avoid incurring additional charges.
Conclusion
In this post, we discussed several best practices for managing Python dependencies in Amazon MWAA and how to use the MWAA local runner to implement these practices. These best practices reduce DAG and pip installation errors in your Amazon MWAA environment. For additional details and code examples on Amazon MWAA, visit the Amazon MWAA User Guide and the Amazon MWAA examples GitHub repo.
Apache, Apache Airflow, and Airflow are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries.
About the Author
Mike Ellis is a Technical Account Manager at AWS and an Amazon MWAA specialist. In addition to assisting customers with Amazon MWAA, he contributes to the Apache Airflow open source project.
Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a fully managed orchestration service that makes it straightforward to run data processing workflows at scale. Amazon MWAA takes care of operating and scaling Apache Airflow so you can focus on developing workflows. However, although Amazon MWAA provides high availability within an AWS Region through features like Multi-AZ deployment of Airflow components, recovering from a Regional outage requires a multi-Region deployment.
In Part 1 of this series, we highlighted challenges for Amazon MWAA disaster recovery and discussed best practices to improve resiliency. In particular, we discussed two key strategies: backup and restore and warm standby. In this post, we dive deep into the implementation for both strategies and provide a deployable solution to realize the architectures in your own AWS account.
The solution for this post is hosted on GitHub. The README in the repository offers tutorials as well as further workflow details for both backup and restore and warm standby strategies.
Backup and restore architecture
The backup and restore strategy involves periodically backing up Amazon MWAA metadata to Amazon Simple Storage Service (Amazon S3) buckets in the primary Region. The backups are replicated to an S3 bucket in the secondary Region. In case of a failure in the primary Region, a new Amazon MWAA environment is created in the secondary Region and hydrated with the backed-up metadata to restore the workflows.
The project uses the AWS Cloud Development Kit (AWS CDK) and is set up like a standard Python project. Refer to the detailed deployment steps in the README file to deploy it in your own accounts.
The following diagram shows the architecture of the backup and restore strategy and its key components:
Primary Amazon MWAA environment – The environment in the primary Region hosts the workflows
Metadata backup bucket – The bucket in the primary Region stores periodic backups of Airflow metadata tables
Secondary Amazon MWAA environment – This environment is created on-demand during recovery in the secondary Region
Backup workflow – This workflow periodically backups up Airflow metadata to the S3 buckets in the primary Region
Recovery workflow – This workflow monitors the primary Amazon MWAA environment and initiates failover when needed in the secondary Region
Figure 1: The backup restore architecture
There are essentially two workflows that work in conjunction to achieve the backup and restore functionality in this architecture. Let’s explore both workflows in detail and the steps as outlined in Figure 1.
Backup workflow
The backup workflow is responsible for periodically taking a backup of your Airflow metadata tables and storing them in the backup S3 bucket. The steps are as follows:
[1.a] You can deploy the provided solution from your continuous integration and delivery (CI/CD) pipeline. The pipeline includes a DAG deployed to the DAGs S3 bucket, which performs backup of your Airflow metadata. This is the bucket where you host all of your DAGs for your environment.
[1.b] The solution enables cross-Region replication of the DAGs bucket. Any new changes to the primary Region bucket, including DAG files, plugins, and requirements.txt files, are replicated to the secondary Region DAGs bucket. However, for existing objects, a one-time replication needs to be performed using S3 Batch Replication.
[1.c] The DAG deployed to take metadata backup runs periodically. The metadata backup doesn’t include some of the auto-generated tables and the list of tables to be backed up is configurable. By default, the solution backs up variable, connection, slot pool, log, job, DAG run, trigger, task instance, and task fail tables. The backup interval is also configurable and should be based on the Recovery Point Objective (RPO), which is the data loss time during a failure that can be sustained by your business.
[1.d] Similar to the DAGs bucket, the backup bucket is also synced using cross-Region replication, through which the metadata backup becomes available in the secondary Region.
Recovery workflow
The recovery workflow runs periodically in the secondary Region monitoring the primary Amazon MWAA environment. It has two functions:
Store the environment configuration of the primary Amazon MWAA environment in the secondary backup bucket, which is used to recreate an identical Amazon MWAA environment in the secondary Region during failure
Perform the failover when a failure is detected
The following are the steps for when the primary Amazon MWAA environment is healthy (see Figure 1):
[2.b] The workflow, using AWS Lambda, checks Amazon CloudWatch in the primary Region for the SchedulerHeartbeat metrics of the primary Amazon MWAA environment. The environment in the primary Region sends heartbeats to CloudWatch every 5 seconds by default. However, to not invoke a recovery workflow spuriously, we use a default aggregation period of 5 minutes to check the heartbeat metrics. Therefore, it can take up to 5 minutes to detect a primary environment failure.
[2.c] Assuming that the heartbeat was detected in 2.b, the workflow makes the cross-Region GetEnvironment call to the primary Amazon MWAA environment.
[2.d] The response from the GetEnvironment call is stored in the secondary backup S3 bucket to be used in case of a failure in the subsequent iterations of the workflow. This makes sure the latest configuration of your primary environment is used to recreate a new environment in the secondary Region. The workflow completes successfully after storing the configuration.
The following are the steps for the case when the primary environment is unhealthy (see Figure 1):
[2.a] The EventBridge scheduler starts the Step Functions workflow on a provided schedule.
[2.b] The workflow, using Lambda, checks CloudWatch in the primary Region for the scheduler heartbeat metrics and detects failure. The scheduler heartbeat check using the CloudWatch API is the recommended approach to detect failure. However, you can implement a custom strategy for failure detection in the Lambda function such as deploying a DAG to periodically send custom metrics to CloudWatch or other data stores as heartbeats and using the function to check that metrics. With the current CloudWatch-based strategy, the unavailability of the CloudWatch API may spuriously invoke the recovery flow.
[2.c] Skipped
[2.d] The workflow reads the previously stored environment details from the backup S3 bucket.
[2.e] The environment details read from the previous step is used to recreate an identical environment in the secondary Region using the CreateEnvironment API call. The API also needs other secondary Region specific configurations such as VPC, subnets, and security groups that are read from the user-supplied configuration file or environment variables during the solution deployment. The workflow in a polling loop waits until the environment becomes available and invokes the DAG to restore metadata from the backup S3 bucket. This DAG is deployed to the DAGs S3 bucket as a part of the solution deployment.
[2.f] The DAG for restoring metadata completes hydrating the newly created environment and notifies the Step Functions workflow of completion using the task token integration. The new environment now starts running the active workflows and the recovery completes successfully.
Considerations
Consider the following when using the backup and restore method:
Recovery Time Objective – From failure detection to workflows running in the secondary Region, failover can take over 30 minutes. This includes new environment creation, Airflow startup, and metadata restore.
Cost – This strategy avoids the overhead of running a passive environment in the secondary Region. Costs are limited to periodic backup storage, cross-Region data transfer charges, and minimal compute for the recovery workflow.
Data loss – The RPO depends on the backup frequency. There is a design trade-off to consider here. Although shorter intervals between backups can minimize potential data loss, too frequent backups can adversely affect the performance of the metadata database and consequently the primary Airflow environment. Also, the solution can’t recover an actively running workflow midway. All active workflows are started fresh in the secondary Region based on the provided schedule.
Ongoing management – The Amazon MWAA environment and dependencies are automatically kept in sync across Regions in this architecture. As specified in the Step 1.b of the backup workflow, the DAGs S3 bucket will need a one-time deployment of the existing resources for the solution to work.
Warm standby architecture
The warm standby strategy involves deploying identical Amazon MWAA environments in two Regions. Periodic metadata backups from the primary Region are used to rehydrate the standby environment in case of failover.
The project uses the AWS CDK and is set up like a standard Python project. Refer to the detailed deployment steps in the README file to deploy it in your own accounts.
The following diagram shows the architecture of the warm standby strategy and its key components:
Primary Amazon MWAA environment – The environment in the primary Region hosts the workflows during normal operation
Secondary Amazon MWAA environment – The environment in the secondary Region acts as a warm standby ready to take over at any time
Metadata backup bucket – The bucket in the primary Region stores periodic backups of Airflow metadata tables
Replicated backup bucket – The bucket in the secondary Region syncs metadata backups through S3 Cross-Region Replication.
Backup workflow – This workflow periodically backups up Airflow metadata to the S3 buckets in both Regions
Recovery workflow – This workflow monitors the primary environment and initiates failover to the secondary environment when needed
Figure 2: The warm standby architecture
Similar to the backup and restore strategy, the backup workflow (Steps 1a–1d) periodically backups up critical Amazon MWAA metadata to S3 buckets in the primary Region, which is synced in the secondary Region.
The recovery workflow runs periodically in the secondary Region monitoring the primary environment. On failure detection, it initiates the failover procedure. The steps are as follows (see Figure 2):
[2.a] The EventBridge scheduler starts the Step Functions workflow on a provided schedule.
[2.b] The workflow checks CloudWatch in the primary Region for the scheduler heartbeat metrics and detects failure. If the primary environment is healthy, the workflow completes without further actions.
[2.c] The workflow invokes the DAG to restore metadata from the backup S3 bucket.
[2.d] The DAG for restoring metadata completes hydrating the passive environment and notifies the Step Functions workflow of completion using the task token integration. The passive environment starts running the active workflows on the provided schedules.
Because the secondary environment is already warmed up, the failover is faster with recovery times in minutes.
Considerations
Consider the following when using the warm standby method:
Recovery Time Objective – With a warm standby ready, the RTO can be as low as 5 minutes. This includes just the metadata restore and reenabling DAGs in the secondary Region.
Cost – This strategy has an added cost of running similar environments in two Regions at all times. With auto scaling for workers, the warm instance can maintain a minimal footprint; however, the web server and scheduler components of Amazon MWAA will remain active in the secondary environment at all times. The trade-off is significantly lower RTO.
Data loss – Similar to the backup and restore model, the RPO depends on the backup frequency. Faster backup cycles minimize potential data loss but can adversely affect performance of the metadata database and consequently the primary Airflow environment.
Ongoing management – This approach comes with some management overhead. Unlike the backup and restore strategy, any changes to the primary environment configurations need to be manually reapplied to the secondary environment to keep the two environments in sync. Automated synchronization of the secondary environment configurations is a future work.
Shared considerations
Although the backup and restore and warm standby strategies differ in their implementation, they share some common considerations:
Periodically test failover to validate recovery procedures, RTO, and RPO.
Enable Amazon MWAA environment logging to help debug issues during failover.
Automate deployments of environment configurations and disaster recovery workflows through CI/CD pipelines.
Monitor key CloudWatch metrics like SchedulerHeartbeat to detect primary environment failures.
Conclusion
In this series, we discussed how backup and restore and warm standby strategies offer configurable data protection based on your RTO, RPO, and cost requirements. Both use periodic metadata replication and restoration to minimize the area of effect of Regional outages.
Which strategy resonates more with your use case? Feel free to try out our solution and share any feedback or questions in the comments section!
About the Authors
Chandan Rupakheti is a Senior Solutions Architect at AWS. His main focus at AWS lies in the intersection of Analytics, Serverless, and AdTech services. He is a passionate technical leader, researcher, and mentor with a knack for building innovative solutions in the cloud. Outside of his professional life, he loves spending time with his family and friends besides listening and playing music.
Parnab Basak is a Senior Solutions Architect and a Serverless Specialist at AWS. He specializes in creating new solutions that are cloud native using modern software development practices like serverless, DevOps, and analytics. Parnab works closely in the analytics and integration services space helping customers adopt AWS services for their workflow orchestration needs.
Apache Airflow is a popular platform for enterprises looking to orchestrate complex data pipelines and workflows. Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed service that streamlines the setup and operation of secure and highly available Airflow environments in the cloud.
In this post, we’re excited to introduce two new features that address common customer challenges and unlock new possibilities for building robust, scalable, and flexible data orchestration solutions using Amazon MWAA. First, the Airflow REST API support enables programmatic interaction with Airflow resources like connections, Directed Acyclic Graphs (DAGs), DAGRuns, and Task instances. Second, the option to horizontally scale web server capacity helps you handle increased demand, whether from REST API requests, command line interface (CLI) usage, or more concurrent Airflow UI users. Both features are available for all actively supported Amazon MWAA versions, including version 2.4.3 and newer.
Airflow REST API support
A frequently requested feature from Amazon MWAA customers has been the ability to interact with their workflows programmatically using Airflow’s APIs. The introduction of REST API support in Amazon MWAA addresses this need, providing a standardized way to access and manage your Airflow environment. With the new REST API, you can now invoke DAG runs, manage datasets, or get the status of Airflow’s metadata database, trigger, and scheduler—all without relying on the Airflow web UI or CLI.
Another example is building monitoring dashboards that aggregate the status of your DAGs across multiple Amazon MWAA environments, or invoke workflows in response to events from external systems, such as completed database jobs or new user signups.
This feature opens up a world of possibilities for integrating your Amazon MWAA environments with other systems and building custom solutions that use the power of your data orchestration pipelines.
To demonstrate this new capability, we use the REST API to invoke a new DAG run. Follow the process detailed in the following sections.
Authenticate with the Airflow REST API
For a user to authenticate with the REST API, they need the necessary permissions to create a web login token, similar to how it works with the Airflow UI. Refer to Creating an Apache Airflow web login token for more details. The user’s AWS Identity and Access Management (IAM) role or policy must include the CreateWebLoginToken permission to generate a token for authenticating. Furthermore, the user’s permissions for interacting with the REST API are determined by the Airflow role assigned to them within Amazon MWAA. The Airflow roles govern the user’s ability to perform various operations, such as invoking DAG runs, checking statuses, or modifying configurations, through the REST API endpoints.
The following is an example of the authentication process:
def get_session_info(region, env_name):
"""
Retrieves the web server hostname and session cookie for an MWAA environment.
Args:
region (str): The AWS region where the MWAA environment is located.
env_name (str): The name of the MWAA environment.
Returns:
tuple: A tuple containing the web server hostname and session cookie, or (None, None) on failure.
"""
logging.basicConfig(level=logging.INFO)
try:
# Initialize MWAA client and request a web login token
mwaa = boto3.client('mwaa', region_name=region)
response = mwaa.create_web_login_token(Name=env_name)
# Extract the web server hostname and login token
web_server_host_name = response["WebServerHostname"]
web_token = response["WebToken"]
# Construct the URL needed for authentication
login_url = f"https://{web_server_host_name}/aws_mwaa/login"
login_payload = {"token": web_token}
# Make a POST request to the MWAA login url using the login payload
response = requests.post(
login_url,
data=login_payload,
timeout=10
)
# Check if login was succesfull
if response.status_code == 200:
# Return the hostname and the session cookie
return (
web_server_host_name,
response.cookies["session"]
)
else:
# Log an error
logging.error("Failed to log in: HTTP %d", response.status_code)
return None
except requests.RequestException as e:
# Log any exceptions raised during the request to the MWAA login endpoint
logging.error("Request failed: %s", str(e))
return None
except Exception as e:
# Log any other unexpected exceptions
logging.error("An unexpected error occurred: %s", str(e))
return None
The get_session_info function uses the AWS SDK for Python (Boto3) and the python request library for the initial steps required for authentication, retrieving a web token and a session cookie, which is valid for 12 hours. These will be used for subsequent REST API requests.
Invoke the Airflow REST API endpoint
When authentication is complete, you have the credentials to start sending requests to the API endpoints. In the following example, we use the endpoint /dags/{dag_id}/dagRuns to initiate a DAG run:
def trigger_dag(region, env_name, dag_name):
"""
Triggers a DAG in a specified MWAA environment using the Airflow REST API.
Args:
region (str): AWS region where the MWAA environment is hosted.
env_name (str): Name of the MWAA environment.
dag_name (str): Name of the DAG to trigger.
"""
logging.info(f"Attempting to trigger DAG {dag_name} in environment {env_name} at region {region}")
# Retrieve the web server hostname and session cookie for authentication
try:
web_server_host_name, session_cookie = get_session_info(region, env_name)
if not session_cookie:
logging.error("Authentication failed, no session cookie retrieved.")
return
except Exception as e:
logging.error(f"Error retrieving session info: {str(e)}")
return
# Prepare headers and payload for the request
cookies = {"session": session_cookie}
json_body = {"conf": {}}
# Construct the URL for triggering the DAG
url = f"https://{web_server_host_name}/api/v1/dags/{dag_id}/dagRuns"
# Send the POST request to trigger the DAG
try:
response = requests.post(url, cookies=cookies, json=json_body)
# Check the response status code to determine if the DAG was triggered successfully
if response.status_code == 200:
logging.info("DAG triggered successfully.")
else:
logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}")
except requests.RequestException as e:
logging.error(f"Request to trigger DAG failed: {str(e)}")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
# Check if the correct number of arguments is provided
if len(sys.argv) != 4:
logging.error("Incorrect usage. Proper format: python script_name.py <region> <env_name> <dag_name>")
sys.exit(1)
region = sys.argv[1]
env_name = sys.argv[2]
dag_name = sys.argv[3]
# Trigger the DAG with the provided arguments
trigger_dag(region, env_name, dag_name)
The following is the complete code of trigger_dag.py:
import sys
import boto3
import requests
import logging
def get_session_info(region, env_name):
"""
Retrieves the web server hostname and session cookie for an MWAA environment.
Args:
region (str): The AWS region where the MWAA environment is located.
env_name (str): The name of the MWAA environment.
Returns:
tuple: A tuple containing the web server hostname and session cookie, or (None, None) on failure.
"""
logging.basicConfig(level=logging.INFO)
try:
# Initialize MWAA client and request a web login token
mwaa = boto3.client('mwaa', region_name=region)
response = mwaa.create_web_login_token(Name=env_name)
# Extract the web server hostname and login token
web_server_host_name = response["WebServerHostname"]
web_token = response["WebToken"]
# Construct the URL needed for authentication
login_url = f"https://{web_server_host_name}/aws_mwaa/login"
login_payload = {"token": web_token}
# Make a POST request to the MWAA login url using the login payload
response = requests.post(
login_url,
data=login_payload,
timeout=10
)
# Check if login was succesfull
if response.status_code == 200:
# Return the hostname and the session cookie
return (
web_server_host_name,
response.cookies["session"]
)
else:
# Log an error
logging.error("Failed to log in: HTTP %d", response.status_code)
return None
except requests.RequestException as e:
# Log any exceptions raised during the request to the MWAA login endpoint
logging.error("Request failed: %s", str(e))
return None
except Exception as e:
# Log any other unexpected exceptions
logging.error("An unexpected error occurred: %s", str(e))
return None
def trigger_dag(region, env_name, dag_name):
"""
Triggers a DAG in a specified MWAA environment using the Airflow REST API.
Args:
region (str): AWS region where the MWAA environment is hosted.
env_name (str): Name of the MWAA environment.
dag_name (str): Name of the DAG to trigger.
"""
logging.info(f"Attempting to trigger DAG {dag_name} in environment {env_name} at region {region}")
# Retrieve the web server hostname and session cookie for authentication
try:
web_server_host_name, session_cookie = get_session_info(region, env_name)
if not session_cookie:
logging.error("Authentication failed, no session cookie retrieved.")
return
except Exception as e:
logging.error(f"Error retrieving session info: {str(e)}")
return
# Prepare headers and payload for the request
cookies = {"session": session_cookie}
json_body = {"conf": {}}
# Construct the URL for triggering the DAG
url = f"https://{web_server_host_name}/api/v1/dags/{dag_name}/dagRuns"
# Send the POST request to trigger the DAG
try:
response = requests.post(url, cookies=cookies, json=json_body)
# Check the response status code to determine if the DAG was triggered successfully
if response.status_code == 200:
logging.info("DAG triggered successfully.")
else:
logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}")
except requests.RequestException as e:
logging.error(f"Request to trigger DAG failed: {str(e)}")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
# Check if the correct number of arguments is provided
if len(sys.argv) != 4:
logging.error("Incorrect usage. Proper format: python script_name.py <region><env_name><dag_name>")
sys.exit(1)
region = sys.argv[1]
env_name = sys.argv[2]
dag_name = sys.argv[3]
# Trigger the DAG with the provided arguments
trigger_dag(region, env_name, dag_name)
Run the request script
Run the request script with the following code, providing your AWS Region, Amazon MWAA environment name, and DAG name:
The following screenshot shows the result in the CLI.
Check the DAG run in the Airflow UI
The following screenshot shows the DAG run status in the Airflow UI.
You can use any other endpoint in the REST API to enable programmatic control, automation, integration, and management of Airflow workflows and resources. To learn more about the Airflow REST API and its various endpoints, refer to the Airflow documentation.
Web server auto scaling
Another key request from Amazon MWAA customers has been the ability to dynamically scale their web servers to handle fluctuating workloads. Previously, you were constrained by two web servers provided with an Airflow environment on Amazon MWAA and had no way to horizontally scale web server capacity, which could lead to performance issues during peak loads. The new web server auto scaling feature in Amazon MWAA solves this problem. By automatically scaling the number of web servers based on CPU utilization and active connection count, Amazon MWAA makes sure your Airflow environment can seamlessly accommodate increased demand, whether from REST API requests, CLI usage, or more concurrent Airflow UI users.
Set up web server auto scaling
To set up auto scaling for your Amazon MWAA environment web servers, follow these steps:
On the Amazon MWAA console, navigate to the environment you want to configure auto scaling for.
Choose Edit.
Choose Next.
On the Configure advanced settings page, in the Environment class section, add the maximum and minimum web server count. For this example, we set the upper limit to 5 and lower limit to 2.
These settings allow Amazon MWAA to automatically scale up the Airflow web server when demand increases and scale down conservatively when demand decreases, optimizing resource usage and cost.
Trigger auto scaling programmatically
After you configure auto scaling, you might want to test how it behaves under simulated conditions. Using the Python code structure we discussed earlier for invoking a DAG, you can also use the Airflow REST API to simulate a load test and see how well your auto scaling setup responds. For the purpose of load testing, we have configured our Amazon MWAA environment with an mw1.small instance class. The following is an example implementation using load_test.py:
import sys
import time
import boto3
import requests
import logging
import concurrent.futures
def get_session_info(region, env_name):
"""
Retrieves the web server hostname and session cookie for an MWAA environment.
Args:
region (str): The AWS region where the MWAA environment is located.
env_name (str): The name of the MWAA environment.
Returns:
tuple: A tuple containing the web server hostname and session cookie, or (None, None) on failure.
"""
try:
# Create an MWAA client in the specified region
mwaa = boto3.client('mwaa', region_name=region)
# Request a web login token for the specified environment
response = mwaa.create_web_login_token(Name=env_name)
web_server_host_name = response["WebServerHostname"]
web_token = response["WebToken"]
# Construct the login URL and payload for authentication
login_url = f"https://{web_server_host_name}/aws_mwaa/login"
login_payload = {"token": web_token}
# Authenticate and obtain the session cookie
response = requests.post(login_url, data=login_payload, timeout=10)
if response.status_code == 200:
return web_server_host_name, response.cookies["session"]
else:
logging.error(f"Failed to log in: HTTP {response.status_code}")
return None, None
except requests.RequestException as e:
logging.error(f"Request failed: {e}")
return None, None
except Exception as e:
logging.error(f"An unexpected error occurred: {e}")
return None, None
def call_rest_api(web_server_host_name, session_cookie):
"""
Calls the Airflow web server API to fetch details of all DAGs and measures the time taken for the call.
Args:
web_server_host_name (str): The hostname of the MWAA web server.
session_cookie (str): The session cookie for authentication.
"""
# Define the endpoint for fetching all DAGs
url = f"https://{web_server_host_name}/api/v1/dags"
headers = {'Content-Type': 'application/json', 'Cookie': f'session={session_cookie}'}
try:
start_time = time.time()
response = requests.get(url, headers=headers)
elapsed_time = time.time() - start_time
if response.status_code == 200:
logging.info(f"API call successful, fetched {len(response.json()['dags'])} DAGs, took {elapsed_time:.2f} seconds")
else:
logging.error(f"API call failed with status code: {response.status_code}, took {elapsed_time:.2f} seconds")
except requests.RequestException as e:
logging.error(f"Error during API call: {e}")
def run_load_test(web_server_host_name, session_cookie, qps, duration):
"""
Performs a load test by sending concurrent requests at a specified rate over a given duration.
Args:
web_server_host_name (str): The hostname of the MWAA web server.
session_cookie (str): The session cookie for authentication.
qps (int): Queries per second.
duration (int): Duration of the test in seconds.
"""
interval = 1.0 / qps
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=qps) as executor:
while time.time() - start_time < duration:
futures = [executor.submit(call_rest_api, web_server_host_name, session_cookie) for _ in range(qps)]
concurrent.futures.wait(futures)
time.sleep(interval)
logging.info("Load test completed.")
def main(region, env_name, qps, duration):
"""
Main function to execute the load testing script.
Args:
region (str): AWS region where the MWAA environment is hosted.
env_name (str): Name of the MWAA environment.
qps (int): Queries per second.
duration (int): Duration in seconds.
"""
web_server_host_name, session_cookie = get_session_info(region, env_name)
if not web_server_host_name or not session_cookie:
logging.error("Failed to retrieve session information")
return
run_load_test(web_server_host_name, session_cookie, qps, duration)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
if len(sys.argv) != 5:
logging.error("Incorrect usage. Proper format: python load_test.py <region> <env_name> <qps> <duration>")
sys.exit(1)
region = sys.argv[1]
env_name = sys.argv[2]
qps = int(sys.argv[3])
duration = int(sys.argv[4])
main(region, env_name, qps, duration)
The Python code uses thread pooling and concurrency concepts to help test the auto scaling performance of your web server by simulating traffic. This script automates the process of sending a specific number of requests per second to your web server, enabling you to trigger an auto scaling event.
You can use the following command to run the script. You have to provide the Region, Amazon MWAA environment name, how many queries per seconds you want to run against the web server, and the duration for which you want the load test to run.
The preceding command will run 10 queries per second for 18 minutes.
When the script is running, you will start seeing rows that show how long (in seconds) it took for the web server to process the request.
This time will gradually start to increase. As active connection count or CPU usage increase, Amazon MWAA will dynamically scale the web servers to accommodate the load.
As new web servers come online, your environment will be able to handle increased load, and the response time will drop. Amazon MWAA provides web server container metrics in the AWS/MWAA service namespace in Amazon CloudWatch, allowing you to monitor the web server performance. The following screenshots show an example of the auto scaling event.
Recommendation
Determining the appropriate minimum and maximum web server count involves carefully considering your typical workload patterns, performance requirements, and cost constrains. To set these values, consider metrics like the required REST API throughput at peak times and the maximum number of concurrent UI users you expect to have. It’s important to note that Amazon MWAA can support up to 10 queries per second (QPS) for the Airflow REST API at full scale for any environment size, provided you follow the recommended number of DAGs.
Amazon MWAA integration with CloudWatch provides granular metrics and monitoring capabilities to help you find the optimal configuration for your specific use case. If you anticipate periods of consistently high demand or increased workloads for an extended duration, you can configure your Amazon MWAA environment to maintain a higher minimum number of web servers. By setting the minimum web server setting to 2 or more, you can make sure your environment always has sufficient capacity to handle load peaks without needing to wait for auto scaling to provision additional resources. This comes at the cost of running more web server instances, which is a trade-off between cost-optimization and responsiveness.
Conclusion
Today, we are announcing the availability of the Airflow REST API and web server auto scaling in Amazon MWAA. The REST API provides a standardized way to programmatically interact with and manage resources in your Amazon MWAA environments. This enables seamless integration, automation, and extensibility of Amazon MWAA within your organization’s existing data and application landscape. With web server auto scaling, you can automatically increase the number of web server instances based on resource utilization, and Amazon MWAA makes sure your Airflow workflows can handle fluctuating workloads without manual intervention.
These features lay the foundation for you to build more robust, scalable, and flexible data orchestration pipelines. We encourage you to use them to streamline your data engineering operations and unlock new possibilities for your business.
Stay tuned for future updates and enhancements to Amazon MWAA that will continue to enhance the developer experience and unlock new opportunities for data-driven organizations.
About the Authors
Mansi Bhutada is an ISV Solutions Architect based in the Netherlands. She helps customers design and implement well-architected solutions in AWS that address their business problems. She is passionate about data analytics and networking. Beyond work, she enjoys experimenting with food, playing pickleball, and diving into fun board games.
Kartikay Khator is a Solutions Architect within the Global Life Sciences at AWS, where he dedicates his efforts to developing innovative and scalable solutions that cater to the evolving needs of customers. His expertise lies in harnessing the capabilities of AWS Analytics services. Extending beyond his professional pursuits, he finds joy and fulfillment in the world of running and hiking. Having already completed two marathons, he is currently preparing for his next marathon challenge.
Kamen Sharlandjiev is a Sr. Big Data and ETL Solutions Architect, MWAA and AWS Glue ETL expert. He’s on a mission to make life easier for customers who are facing complex data integration and orchestration challenges. His secret weapon? Fully managed AWS services that can get the job done with minimal effort. Follow Kamen on LinkedIn to keep up to date with the latest MWAA and AWS Glue features and news!
Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow that you can use to set up and operate data pipelines in the cloud at scale. Apache Airflow is an open source tool used to programmatically author, schedule, and monitor sequences of processes and tasks, referred to as workflows. With Amazon MWAA, you can use Apache Airflow and Python to create workflows without having to manage the underlying infrastructure for scalability, availability, and security.
By using multiple AWS accounts, organizations can effectively scale their workloads and manage their complexity as they grow. This approach provides a robust mechanism to mitigate the potential impact of disruptions or failures, making sure that critical workloads remain operational. Additionally, it enables cost optimization by aligning resources with specific use cases, making sure that expenses are well controlled. By isolating workloads with specific security requirements or compliance needs, organizations can maintain the highest levels of data privacy and security. Furthermore, the ability to organize multiple AWS accounts in a structured manner allows you to align your business processes and resources according to your unique operational, regulatory, and budgetary requirements. This approach promotes efficiency, flexibility, and scalability, enabling large enterprises to meet their evolving needs and achieve their goals.
For this post, we consider a use case where a data engineering team wants to build an ETL process and give the best experience to their end-users when they want to query the latest data after new raw files are added to Amazon S3 in the central account (Account A in the following architecture diagram). The data engineering team wants to separate the raw data into its own AWS account (Account B in the diagram) for increased security and control. They also want to perform the data processing and transformation work in their own account (Account B) to compartmentalize duties and prevent any unintended changes to the source raw data present in the central account (Account A). This approach allows the team to process the raw data extracted from Account A to Account B, which is dedicated for data handling tasks. This makes sure the raw and processed data can be maintained securely separated across multiple accounts, if required, for enhanced data governance and security.
Our solution uses an end-to-end ETL pipeline orchestrated by Amazon MWAA that looks for new incremental files in an Amazon S3 location in Account A, where the raw data is present. This is done by invoking AWS Glue ETL jobs and writing to data objects in a Redshift Serverless cluster in Account B. The pipeline then starts running stored procedures and SQL commands on Redshift Serverless. As the queries finish running, an UNLOAD operation is invoked from the Redshift data warehouse to the S3 bucket in Account A.
Because security is important, this post also covers how to configure an Airflow connection using AWS Secrets Manager to avoid storing database credentials within Airflow connections and variables.
The following diagram illustrates the architectural overview of the components involved in the orchestration of the workflow.
The workflow consists of the following components:
The source and target S3 buckets are in a central account (Account A), whereas Amazon MWAA, AWS Glue, and Amazon Redshift are in a different account (Account B). Cross-account access has been set up between S3 buckets in Account A with resources in Account B to be able to load and unload data.
In the second account, Amazon MWAA is hosted in one VPC and Redshift Serverless in a different VPC, which are connected through VPC peering. A Redshift Serverless workgroup is secured inside private subnets across three Availability Zones.
Secrets like user name, password, DB port, and AWS Region for Redshift Serverless are stored in Secrets Manager.
VPC endpoints are created for Amazon S3 and Secrets Manager to interact with other resources.
Usually, data engineers create an Airflow Directed Acyclic Graph (DAG) and commit their changes to GitHub. With GitHub actions, they are deployed to an S3 bucket in Account B (for this post, we upload the files into S3 bucket directly). The S3 bucket stores Airflow-related files like DAG files, requirements.txt files, and plugins. AWS Glue ETL scripts and assets are stored in another S3 bucket. This separation helps maintain organization and avoid confusion.
The Airflow DAG uses various operators, sensors, connections, tasks, and rules to run the data pipeline as needed.
Because this solution centers around using Amazon MWAA to orchestrate the ETL pipeline, you need to set up certain foundational resources across accounts beforehand. Specifically, you need to create the S3 buckets and folders, AWS Glue resources, and Redshift Serverless resources in their respective accounts prior to implementing the full workflow integration using Amazon MWAA.
Deploy resources in Account A using AWS CloudFormation
In Account A, launch the provided AWS CloudFormation stack to create the following resources:
The source and target S3 buckets and folders. As a best practice, the input and output bucket structures are formatted with hive style partitioning as s3://<bucket>/products/YYYY/MM/DD/.
A sample dataset called products.csv, which we use in this post.
Upload the AWS Glue job to Amazon S3 in Account B
In Account B, create an Amazon S3 location called aws-glue-assets-<account-id>-<region>/scripts (if not present). Replace the parameters for the account ID and Region in the sample_glue_job.py script and upload the AWS Glue job file to the Amazon S3 location.
Deploy resources in Account B using AWS CloudFormation
In Account B, launch the provided CloudFormation stack template to create the following resources:
The S3 bucket airflow-<username>-bucket to store Airflow-related files with the following structure:
dags – The folder for DAG files.
plugins – The file for any custom or community Airflow plugins.
requirements – The requirements.txt file for any Python packages.
scripts – Any SQL scripts used in the DAG.
data – Any datasets used in the DAG.
A Redshift Serverless environment. The name of the workgroup and namespace are prefixed with sample.
An AWS Glue environment, which contains the following:
An AWS Glue crawler, which crawls the data from the S3 source bucket sample-inp-bucket-etl-<username> in Account A.
A database called products_db in the AWS Glue Data Catalog.
An ELT job called sample_glue_job. This job can read files from the products table in the Data Catalog and load data into the Redshift table products.
Create two tables and a stored procedure on an Redshift Serverless workgroup using the products.sql file.
In this example, we create two tables called products and products_f. The name of the stored procedure is sp_products.
Configure Airflow permissions
After the Amazon MWAA environment is created successfully, the status will show as Available. Choose Open Airflow UI to view the Airflow UI. DAGs are automatically synced from the S3 bucket and visible in the UI. However, at this stage, there are no DAGs in the S3 folder.
The policies attached to the Amazon MWAA role have full access and must only be used for testing purposes in a secure test environment. For production deployments, follow the least privilege principle.
Set up the environment
This section outlines the steps to configure the environment. The process involves the following high-level steps:
Update any necessary providers.
Set up cross-account access.
Establish a VPC peering connection between the Amazon MWAA VPC and Amazon Redshift VPC.
Configure Secrets Manager to integrate with Amazon MWAA.
Define Airflow connections.
Update the providers
Follow the steps in this section if your version of Amazon MWAA is less than 2.8.1 (the latest version as of writing this post).
Providers are packages that are maintained by the community and include all the core operators, hooks, and sensors for a given service. The Amazon provider is used to interact with AWS services like Amazon S3, Amazon Redshift Serverless, AWS Glue, and more. There are over 200 modules within the Amazon provider.
Although the version of Airflow supported in Amazon MWAA is 2.6.3, which comes bundled with the Amazon provided package version 8.2.0, support for Amazon Redshift Serverless was not added until the Amazon provided package version 8.4.0. Because the default bundled provider version is older than when Redshift Serverless support was introduced, the provider version must be upgraded in order to use that functionality.
The first step is to update the constraints file and requirements.txt file with the correct versions. Refer to Specifying newer provider packages for steps to update the Amazon provider package.
Navigate to the Amazon MWAA environment and choose Edit.
Under DAG code in Amazon S3, for Requirements file, choose the latest version.
Choose Save.
This will update the environment and new providers will be in effect.
To verify the providers version, go to Providers under the Admin table.
The version for the Amazon provider package should be 8.4.0, as shown in the following screenshot. If not, there was an error while loading requirements.txt. To debug any errors, go to the CloudWatch console and open the requirements_install_ip log in Log streams, where errors are listed. Refer to Enabling logs on the Amazon MWAA console for more details.
Set up cross-account access
You need to set up cross-account policies and roles between Account A and Account B to access the S3 buckets to load and unload data. Complete the following steps:
In Account A, configure the bucket policy for bucket sample-inp-bucket-etl-<username> to grant permissions to the AWS Glue and Amazon MWAA roles in Account B for objects in bucket sample-inp-bucket-etl-<username>:
Similarly, configure the bucket policy for bucket sample-opt-bucket-etl-<username> to grant permissions to Amazon MWAA roles in Account B to put objects in this bucket:
Create a new IAM role called RoleA with Account B as the trusted entity role and add this policy to the role. This allows Account B to assume RoleA to perform necessary Amazon S3 actions on the output bucket.
In Account B, create an IAM policy called s3-cross-account-access with permission to access objects in the bucket sample-inp-bucket-etl-<username>, which is in Account A.
Add this policy to the AWS Glue role and Amazon MWAA role:
In Account B, create the IAM policy policy_for_roleB specifying Account A as a trusted entity. The following is the trust policy to assume RoleA in Account A:
Create a new IAM role called RoleB with Amazon Redshift as the trusted entity type and add this policy to the role. This allows RoleB to assume RoleA in Account A and also to be assumable by Amazon Redshift.
Attach RoleB to the Redshift Serverless namespace, so Amazon Redshift can write objects to the S3 output bucket in Account A.
Attach the policy policy_for_roleB to the Amazon MWAA role, which allows Amazon MWAA to access the output bucket in Account A.
Set up VPC peering between the Amazon MWAA and Amazon Redshift VPCs
Because Amazon MWAA and Amazon Redshift are in two separate VPCs, you need to set up VPC peering between them. You must add a route to the route tables associated with the subnets for both services. Refer to Work with VPC peering connections for details on VPC peering.
Make sure that CIDR range of the Amazon MWAA VPC is allowed in the Redshift security group and the CIDR range of the Amazon Redshift VPC is allowed in the Amazon MWAA security group, as shown in the following screenshot.
If any of the preceding steps are configured incorrectly, you are likely to encounter a “Connection Timeout” error in the DAG run.
Configure the Amazon MWAA connection with Secrets Manager
When the Amazon MWAA pipeline is configured to use Secrets Manager, it will first look for connections and variables in an alternate backend (like Secrets Manager). If the alternate backend contains the needed value, it is returned. Otherwise, it will check the metadata database for the value and return that instead. For more details, refer to Configuring an Apache Airflow connection using an AWS Secrets Manager secret.
Complete the following steps:
Configure a VPC endpoint to link Amazon MWAA and Secrets Manager (com.amazonaws.us-east-1.secretsmanager).
This allows Amazon MWAA to access credentials stored in Secrets Manager.
To provide Amazon MWAA with permission to access Secrets Manager secret keys, add the policy called SecretsManagerReadWrite to the IAM role of the environment.
To create the Secrets Manager backend as an Apache Airflow configuration option, go to the Airflow configuration options, add the following key-value pairs, and save your settings.
This configures Airflow to look for connection strings and variables at the airflow/connections/* and airflow/variables/* paths:
To generate an Airflow connection URI string, go to AWS CloudShell and enter into a Python shell.
Run the following code to generate the connection URI string:
import urllib.parse
conn_type = 'redshift'
host = 'sample-workgroup.<account-id-of-AcctB>.us-east-1.redshift-serverless.amazonaws.com' #Specify the Amazon Redshift workgroup endpoint
port = '5439'
login = 'admin' #Specify the username to use for authentication with Amazon Redshift
password = '<password>' #Specify the password to use for authentication with Amazon Redshift
role_arn = urllib.parse.quote_plus('arn:aws:iam::<account_id>:role/service-role/<MWAA-role>')
database = 'dev'
region = 'us-east-1' #YOUR_REGION
conn_string = '{0}://{1}:{2}@{3}:{4}?role_arn={5}&database={6}®ion={7}'.format(conn_type, login, password, host, port, role_arn, database, region)
print(conn_string)
The connection string should be generated as follows:
redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA role ARN>&database=dev®ion=<region>
Add the connection in Secrets Manager using the following command in the AWS Command Line Interface (AWS CLI).
This can also be done from the Secrets Manager console. This will be added in Secrets Manager as plaintext.
aws secretsmanager create-secret --name airflow/connections/secrets_redshift_connection --description "Apache Airflow to Redshift Cluster" --secret-string "redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA role ARN>&database=dev®ion=us-east-1" --region=us-east-1
Use the connection airflow/connections/secrets_redshift_connection in the DAG. When the DAG is run, it will look for this connection and retrieve the secrets from Secrets Manager. In case of RedshiftDataOperator, pass the secret_arn as a parameter instead of connection name.
You can also add secrets using the Secrets Manager console as key-value pairs.
Add another secret in Secrets Manager in and save it as airflow/connections/redshift_conn_test.
Create an Airflow connection through the metadata database
You can also create connections in the UI. In this case, the connection details will be stored in an Airflow metadata database. If the Amazon MWAA environment is not configured to use the Secrets Manager backend, it will check the metadata database for the value and return that. You can create an Airflow connection using the UI, AWS CLI, or API. In this section, we show how to create a connection using the Airflow UI.
For Connection Id, enter a name for the connection.
For Connection Type, choose Amazon Redshift.
For Host, enter the Redshift endpoint (without port and database) for Redshift Serverless.
For Database, enter dev.
For User, enter your admin user name.
For Password, enter your password.
For Port, use port 5439.
For Extra, set the region and timeout parameters.
Test the connection, then save your settings.
Create and run a DAG
In this section, we describe how to create a DAG using various components. After you create and run the DAG, you can verify the results by querying Redshift tables and checking the target S3 buckets.
Create a DAG
In Airflow, data pipelines are defined in Python code as DAGs. We create a DAG that consists of various operators, sensors, connections, tasks, and rules:
The DAG starts with looking for source files in the S3 bucket sample-inp-bucket-etl-<username> under Account A for the current day using S3KeySensor. S3KeySensor is used to wait for one or multiple keys to be present in an S3 bucket.
For example, our S3 bucket is partitioned as s3://bucket/products/YYYY/MM/DD/, so our sensor should check for folders with the current date. We derived the current date in the DAG and passed this to S3KeySensor, which looks for any new files in the current day folder.
We also set wildcard_match as True, which enables searches on bucket_key to be interpreted as a Unix wildcard pattern. Set the mode to reschedule so that the sensor task frees the worker slot when the criteria is not met and it’s rescheduled at a later time. As a best practice, use this mode when poke_interval is more than 1 minute to prevent too much load on a scheduler.
After the file is available in the S3 bucket, the AWS Glue crawler runs using GlueCrawlerOperator to crawl the S3 source bucket sample-inp-bucket-etl-<username> under Account A and updates the table metadata under the products_db database in the Data Catalog. The crawler uses the AWS Glue role and Data Catalog database that were created in the previous steps.
The DAG uses GlueCrawlerSensor to wait for the crawler to complete.
When the crawler job is complete, GlueJobOperator is used to run the AWS Glue job. The AWS Glue script name (along with location) and is passed to the operator along with the AWS Glue IAM role. Other parameters like GlueVersion, NumberofWorkers, and WorkerType are passed using the create_job_kwargs parameter.
The DAG uses GlueJobSensor to wait for the AWS Glue job to complete. When it’s complete, the Redshift staging table products will be loaded with data from the S3 file.
You can connect to Amazon Redshift from Airflow using three different operators:
PythonOperator.
SQLExecuteQueryOperator, which uses a PostgreSQL connection and redshift_default as the default connection.
RedshiftDataOperator, which uses the Redshift Data API and aws_default as the default connection.
In our DAG, we use SQLExecuteQueryOperator and RedshiftDataOperator to show how to use these operators. The Redshift stored procedures are run RedshiftDataOperator. The DAG also runs SQL commands in Amazon Redshift to delete the data from the staging table using SQLExecuteQueryOperator.
Because we configured our Amazon MWAA environment to look for connections in Secrets Manager, when the DAG runs, it retrieves the Redshift connection details like user name, password, host, port, and Region from Secrets Manager. If the connection is not found in Secrets Manager, the values are retrieved from the default connections.
In SQLExecuteQueryOperator, we pass the connection name that we created in Secrets Manager. It looks for airflow/connections/secrets_redshift_connection and retrieves the secrets from Secrets Manager. If Secrets Manager is not set up, the connection created manually (for example, redshift-conn-id) can be passed.
In RedshiftDataOperator, we pass the secret_arn of the airflow/connections/redshift_conn_test connection created in Secrets Manager as a parameter.
As final task, RedshiftToS3Operator is used to unload data from the Redshift table to an S3 bucket sample-opt-bucket-etl in Account B. airflow/connections/redshift_conn_test from Secrets Manager is used for unloading the data.
TriggerRule is set to ALL_DONE, which enables the next step to run after all upstream tasks are complete.
The dependency of tasks is defined using the chain() function, which allows for parallel runs of tasks if needed. In our case, we want all tasks to run in sequence.
The following is the complete DAG code. The dag_id should match the DAG script name, otherwise it won’t be synced into the Airflow UI.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator
from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
from airflow.providers.amazon.aws.sensors.glue_crawler import GlueCrawlerSensor
from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator
from airflow.utils.trigger_rule import TriggerRule
dag_id = "data_pipeline"
vYear = datetime.today().strftime("%Y")
vMonth = datetime.today().strftime("%m")
vDay = datetime.today().strftime("%d")
src_bucket_name = "sample-inp-bucket-etl-<username>"
tgt_bucket_name = "sample-opt-bucket-etl-<username>"
s3_folder="products"
#Please replace the variable with the glue_role_arn
glue_role_arn_key = "arn:aws:iam::<account_id>:role/<Glue-role>"
glue_crawler_name = "products"
glue_db_name = "products_db"
glue_job_name = "sample_glue_job"
glue_script_location="s3://aws-glue-assets-<account_id>-<region>/scripts/sample_glue_job.py"
workgroup_name = "sample-workgroup"
redshift_table = "products_f"
redshift_conn_id_name="secrets_redshift_connection"
db_name = "dev"
secret_arn="arn:aws:secretsmanager:us-east-1:<account_id>:secret:airflow/connections/redshift_conn_test-xxxx"
poll_interval = 10
@task
def get_role_name(arn: str) -> str:
return arn.split("/")[-1]
@task
def get_s3_loc(s3_folder: str) -> str:
s3_loc = s3_folder + "/year=" + vYear + "/month=" + vMonth + "/day=" + vDay + "/*.csv"
return s3_loc
with DAG(
dag_id=dag_id,
schedule="@once",
start_date=datetime(2021, 1, 1),
tags=["example"],
catchup=False,
) as dag:
role_arn = glue_role_arn_key
glue_role_name = get_role_name(role_arn)
s3_loc = get_s3_loc(s3_folder)
# Check for new incremental files in S3 source/input bucket
sensor_key = S3KeySensor(
task_id="sensor_key",
bucket_key=s3_loc,
bucket_name=src_bucket_name,
wildcard_match=True,
#timeout=18*60*60,
#poke_interval=120,
timeout=60,
poke_interval=30,
mode="reschedule"
)
# Run Glue crawler
glue_crawler_config = {
"Name": glue_crawler_name,
"Role": role_arn,
"DatabaseName": glue_db_name,
}
crawl_s3 = GlueCrawlerOperator(
task_id="crawl_s3",
config=glue_crawler_config,
)
# GlueCrawlerOperator waits by default, setting as False to test the Sensor below.
crawl_s3.wait_for_completion = False
# Wait for Glue crawler to complete
wait_for_crawl = GlueCrawlerSensor(
task_id="wait_for_crawl",
crawler_name=glue_crawler_name,
)
# Run Glue Job
submit_glue_job = GlueJobOperator(
task_id="submit_glue_job",
job_name=glue_job_name,
script_location=glue_script_location,
iam_role_name=glue_role_name,
create_job_kwargs={"GlueVersion": "4.0", "NumberOfWorkers": 10, "WorkerType": "G.1X"},
)
# GlueJobOperator waits by default, setting as False to test the Sensor below.
submit_glue_job.wait_for_completion = False
# Wait for Glue Job to complete
wait_for_job = GlueJobSensor(
task_id="wait_for_job",
job_name=glue_job_name,
# Job ID extracted from previous Glue Job Operator task
run_id=submit_glue_job.output,
verbose=True, # prints glue job logs in airflow logs
)
wait_for_job.poke_interval = 5
# Execute the Stored Procedure in Redshift Serverless using Data Operator
execute_redshift_stored_proc = RedshiftDataOperator(
task_id="execute_redshift_stored_proc",
database=db_name,
workgroup_name=workgroup_name,
secret_arn=secret_arn,
sql="""CALL sp_products();""",
poll_interval=poll_interval,
wait_for_completion=True,
)
# Execute the Stored Procedure in Redshift Serverless using SQL Operator
delete_from_table = SQLExecuteQueryOperator(
task_id="delete_from_table",
conn_id=redshift_conn_id_name,
sql="DELETE FROM products;",
trigger_rule=TriggerRule.ALL_DONE,
)
# Unload the data from Redshift table to S3
transfer_redshift_to_s3 = RedshiftToS3Operator(
task_id="transfer_redshift_to_s3",
s3_bucket=tgt_bucket_name,
s3_key=s3_loc,
schema="PUBLIC",
table=redshift_table,
redshift_conn_id=redshift_conn_id_name,
)
transfer_redshift_to_s3.trigger_rule = TriggerRule.ALL_DONE
#Chain the tasks to be executed
chain(
sensor_key,
crawl_s3,
wait_for_crawl,
submit_glue_job,
wait_for_job,
execute_redshift_stored_proc,
delete_from_table,
transfer_redshift_to_s3
)
Verify the DAG run
After you create the DAG file (replace the variables in the DAG script) and upload it to the s3://sample-airflow-instance/dags folder, it will be automatically synced with the Airflow UI. All DAGs appear on the DAGs tab. Toggle the ON option to make the DAG runnable. Because our DAG is set to schedule="@once", you need to manually run the job by choosing the run icon under Actions. When the DAG is complete, the status is updated in green, as shown in the following screenshot.
In the Links section, there are options to view the code, graph, grid, log, and more. Choose Graph to visualize the DAG in a graph format. As shown in the following screenshot, each color of the node denotes a specific operator, and the color of the node outline denotes a specific status.
Verify the results
On the Amazon Redshift console, navigate to the Query Editor v2 and select the data in the products_f table. The table should be loaded and have the same number of records as S3 files.
On the Amazon S3 console, navigate to the S3 bucket s3://sample-opt-bucket-etl in Account B. The product_f files should be created under the folder structure s3://sample-opt-bucket-etl/products/YYYY/MM/DD/.
Clean up
Clean up the resources created as part of this post to avoid incurring ongoing charges:
Delete the CloudFormation stacks and S3 bucket that you created as prerequisites.
Delete the VPCs and VPC peering connections, cross-account policies and roles, and secrets in Secrets Manager.
Conclusion
With Amazon MWAA, you can build complex workflows using Airflow and Python without managing clusters, nodes, or any other operational overhead typically associated with deploying and scaling Airflow in production. In this post, we showed how Amazon MWAA provides an automated way to ingest, transform, analyze, and distribute data between different accounts and services within AWS. For more examples of other AWS operators, refer to the following GitHub repository; we encourage you to learn more by trying out some of these examples.
About the Authors
Radhika Jakkula is a Big Data Prototyping Solutions Architect at AWS. She helps customers build prototypes using AWS analytics services and purpose-built databases. She is a specialist in assessing wide range of requirements and applying relevant AWS services, big data tools, and frameworks to create a robust architecture.
Sidhanth Muralidhar is a Principal Technical Account Manager at AWS. He works with large enterprise customers who run their workloads on AWS. He is passionate about working with customers and helping them architect workloads for costs, reliability, performance, and operational excellence at scale in their cloud journey. He has a keen interest in data analytics as well.
Amazon Managed Workflow for Apache Airflow (Amazon MWAA) is a managed service that allows you to use a familiar Apache Airflow environment with improved scalability, availability, and security to enhance and scale your business workflows without the operational burden of managing the underlying infrastructure. In Airflow, Directed Acyclic Graphs (DAGs) are defined as Python code. Dynamic DAGs refer to the ability to generate DAGs on the fly during runtime, typically based on some external conditions, configurations, or parameters. Dynamic DAGs helps you to create, schedule, and run tasks within a DAG based on data and configurations that may change over time.
There are various ways to introduce dynamism in Airflow DAGs (dynamic DAG generation) using environment variables and external files. One of the approaches is to use the DAG Factory YAML based configuration file method. This library aims to facilitate the creation and configuration of new DAGs by using declarative parameters in YAML. It allows default customizations and is open-source, making it simple to create and customize new functionalities.
In this post, we explore the process of creating Dynamic DAGs with YAML files, using the DAG Factory library. Dynamic DAGs offer several benefits:
Enhanced code reusability – By structuring DAGs through YAML files, we promote reusable components, reducing redundancy in your workflow definitions.
Streamlined maintenance – YAML-based DAG generation simplifies the process of modifying and updating workflows, ensuring smoother maintenance procedures.
Flexible parameterization – With YAML, you can parameterize DAG configurations, facilitating dynamic adjustments to workflows based on varying requirements.
Improved scheduler efficiency – Dynamic DAGs enable more efficient scheduling, optimizing resource allocation and enhancing overall workflow runs
Enhanced scalability – YAML-driven DAGs allow for parallel runs, enabling scalable workflows capable of handling increased workloads efficiently.
By harnessing the power of YAML files and the DAG Factory library, we unleash a versatile approach to building and managing DAGs, empowering you to create robust, scalable, and maintainable data pipelines.
Overview of solution
In this post, we will use an example DAG file that is designed to process a COVID-19 data set. The workflow process involves processing an open source data set offered by WHO-COVID-19-Global. After we install the DAG-Factory Python package, we create a YAML file that has definitions of various tasks. We process the country-specific death count by passing Country as a variable, which creates individual country-based DAGs.
The following diagram illustrates the overall solution along with data flows within logical blocks.
Prerequisites
For this walkthrough, you should have the following prerequisites:
Make sure the AWS Identity and Access Management (IAM) user or role used for setting up the environment has IAM policies attached for the following permissions:
The access policies mentioned here are just for the example in this post. In a production environment, provide only the needed granular permissions by exercising least privilege principles.
Create an unique (within an account) Amazon S3 bucket name while creating your Amazon MWAA environment, and create folders called dags and requirements.
Create and upload a requirements.txt file with the following content to the requirements folder. Replace {environment-version} with your environment’s version number, and {Python-version} with the version of Python that’s compatible with your environment:
Pandas is needed just for the example use case described in this post, and dag-factory is the only required plug-in. It is recommended to check the compatibility of the latest version of dag-factory with Amazon MWAA. The boto and psycopg2-binary libraries are included with the Apache Airflow v2 base install and don’t need to be specified in your requirements.txt file.
Download the WHO-COVID-19-global data file to your local machine and upload it under the dags prefix of your S3 bucket.
Make sure that you are pointing to the latest AWS S3 bucket version of your requirements.txt file for the additional package installation to happen. This should typically take between 15 – 20 minutes depending on your environment configuration.
Validate the DAGs
When your Amazon MWAA environment shows as Available on the Amazon MWAA console, navigate to the Airflow UI by choosing Open Airflow UI next to your environment.
Verify the existing DAGs by navigating to the DAGs tab.
Configure your DAGs
Complete the following steps:
Create empty files named dynamic_dags.yml, example_dag_factory.py and process_s3_data.py on your local machine.
Edit the process_s3_data.py file and save it with following code content, then upload the file back to the Amazon S3 bucket dags folder. We are doing some basic data processing in the code:
Read the file from an Amazon S3 location
Rename the Country_code column as appropriate to the country.
Filter data by the given country.
Write the processed final data into CSV format and upload back to S3 prefix.
import boto3
import pandas as pd
import io
def process_s3_data(COUNTRY):
### Top level Variables replace S3_BUCKET with your bucket name ###
s3 = boto3.client('s3')
S3_BUCKET = "my-mwaa-assets-bucket-sfj33ddkm"
INPUT_KEY = "dags/WHO-COVID-19-global-data.csv"
OUTPUT_KEY = "dags/count_death"
### get csv file ###
response = s3.get_object(Bucket=S3_BUCKET, Key=INPUT_KEY)
status = response['ResponseMetadata']['HTTPStatusCode']
if status == 200:
### read csv file and filter based on the country to write back ###
df = pd.read_csv(response.get("Body"))
df.rename(columns={"Country_code": "country"}, inplace=True)
filtered_df = df[df['country'] == COUNTRY]
with io.StringIO() as csv_buffer:
filtered_df.to_csv(csv_buffer, index=False)
response = s3.put_object(
Bucket=S3_BUCKET, Key=OUTPUT_KEY + '_' + COUNTRY + '.csv', Body=csv_buffer.getvalue()
)
status = response['ResponseMetadata']['HTTPStatusCode']
if status == 200:
print(f"Successful S3 put_object response. Status - {status}")
else:
print(f"Unsuccessful S3 put_object response. Status - {status}")
else:
print(f"Unsuccessful S3 get_object response. Status - {status}")
Edit the dynamic_dags.yml and save it with the following code content, then upload the file back to the dags folder. We are stitching various DAGs based on the country as follows:
Define the default arguments that are passed to all DAGs.
Create a DAG definition for individual countries by passing op_args
Map the process_s3_data function with python_callable_name.
Use Python Operator to process csv file data stored in Amazon S3 bucket.
We have set schedule_interval as 10 minutes, but feel free to adjust this value as needed.
Edit the file example_dag_factory.py and save it with the following code content, then upload the file back to dags folder. The code cleans the existing the DAGs and generates clean_dags() method and the creating new DAGs using the generate_dags() method from the DagFactory instance.
from airflow import DAG
import dagfactory
config_file = "/usr/local/airflow/dags/dynamic_dags.yml"
example_dag_factory = dagfactory.DagFactory(config_file)
## to clean up or delete any existing DAGs ##
example_dag_factory.clean_dags(globals())
## generate and create new DAGs ##
example_dag_factory.generate_dags(globals())
After you upload the files, go back to the Airflow UI console and navigate to the DAGs tab, where you will find new DAGs.
Once you upload the files, go back to the Airflow UI console and under the DAGs tab you will find new DAGs are appearing as shown below:
You can enable DAGs by making them active and testing them individually. Upon activation, an additional CSV file named count_death_{COUNTRY_CODE}.csv is generated in the dags folder.
Cleaning up
There may be costs associated with using the various AWS services discussed in this post. To prevent incurring future charges, delete the Amazon MWAA environment after you have completed the tasks outlined in this post, and empty and delete the S3 bucket.
Conclusion
In this blog post we demonstrated how to use the dag-factory library to create dynamic DAGs. Dynamic DAGs are characterized by their ability to generate results with each parsing of the DAG file based on configurations. Consider using dynamic DAGs in the following scenarios:
Automating migration from a legacy system to Airflow, where flexibility in DAG generation is crucial
Situations where only a parameter changes between different DAGs, streamlining the workflow management process
Managing DAGs that are reliant on the evolving structure of a source system, providing adaptability to changes
Establishing standardized practices for DAGs across your team or organization by creating these blueprints, promoting consistency and efficiency
Embracing YAML-based declarations over complex Python coding, simplifying DAG configuration and maintenance processes
Creating data driven workflows that adapt and evolve based on the data inputs, enabling efficient automation
By incorporating dynamic DAGs into your workflow, you can enhance automation, adaptability, and standardization, ultimately improving the efficiency and effectiveness of your data pipeline management.
Jayesh Shinde is Sr. Application Architect with AWS ProServe India. He specializes in creating various solutions that are cloud centered using modern software development practices like serverless, DevOps, and analytics.
Harshd Yeola is Sr. Cloud Architect with AWS ProServe India helping customers to migrate and modernize their infrastructure into AWS. He specializes in building DevSecOps and scalable infrastructure using containers, AIOPs, and AWS Developer Tools and services.
Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed service for Apache Airflow that streamlines the setup and operation of the infrastructure to orchestrate data pipelines in the cloud. Customers use Amazon MWAA to manage the scalability, availability, and security of their Apache Airflow environments. As they design more intensive, complex, and ever-growing data processing pipelines, customers have asked us for additional underlying resources to provide greater concurrency and capacity for their tasks and workflows.
To address this, today, we are announcing the availability of larger environment classes in Amazon MWAA. In this post, we dive into the capabilities of these new XL and 2XL environments, the scenarios they are well suited for, and how you can set up or upgrade your existing Amazon MWAA environment to take advantage of the increased resources.
Current challenges
When you create an Amazon MWAA environment, a set of managed Amazon Elastic Container Service (Amazon ECS) with AWS Fargate containers are provisioned with defined virtual CPUs and RAM.
As you work with larger, complex, resource-intensive workloads, or run thousands of Directed Acyclic Graphs (DAGs) per day, you may start exhausting CPU availability on schedulers and workers, or reaching memory limits in workers. Running Apache Airflow at scale puts proportionally greater load on the Airflow metadata database, sometimes leading to CPU and memory issues on the underlying Amazon Relational Database Service (Amazon RDS) cluster. A resource-starved metadata database may lead to dropped connections from your workers, failing tasks prematurely.
To improve performance and resiliency of your tasks, consider following Apache Airflow best practices to author DAGs. As an alternative, you can create multiple Amazon MWAA environments to distribute workloads. However, this requires additional engineering and management effort.
New environment classes
With today’s release, you can now create XL and 2XL environments in Amazon MWAA in addition to the existing environment classes. They have two and four times the compute, and three and six times the memory, respectively, of the current large Amazon MWAA environment instance class. These instances add compute and RAM linearly to directly improve capacity and performance of all Apache Airflow components. The following table summarizes the environment capabilities.
.
Scheduler and Worker CPU / RAM
Web Server
CPU / RAM
Concurrent Tasks
DAG Capacity
mw1.xlarge
8 vCPUs / 24 GB
4 vCPUs / 12 GB
40 tasks (default)
Up to 2000
mw1.2xlarge
16 vCPUs / 48 GB
8 vCPUs / 24 GB
80 tasks (default)
Up to 4000
With the introduction of these larger environments, your Amazon Aurora metadata database will now use larger, memory-optimized instances powered by AWS Graviton2. With the Graviton2 family of processors, you get compute, storage, and networking improvements, and the reduction of your carbon footprint offered by the AWS family of processors.
Pricing
Amazon MWAA pricing dimensions remains unchanged, and you only pay for what you use:
The environment class
Additional worker instances
Additional scheduler instances
Metadata database storage consumed
You now get two additional options in the first three dimensions: XL and 2XL for environment class, additional workers, and schedulers instances. Metadata database storage pricing remains the same. Refer to Amazon Managed Workflows for Apache Airflow Pricing for rates and more details.
Observe Amazon MWAA performance to plan scaling to larger environments
Before you start using the new environment classes, it’s important to understand if you are in a scenario that relates to capacity issues, such as metadata database out of memory, or workers or schedulers running at high CPU usage. Understanding the performance of your environment resources is key to troubleshooting issues related to capacity. We recommend following the guidance described in Introducing container, database, and queue utilization metrics for the Amazon MWAA environment to better understand the state of Amazon MWAA environments, and get insights to right-size your instances.
In the following test, we simulate a high load scenario, use the CloudWatch observability metrics to identify common problems, and make an informed decision to plan scaling to larger environments to mitigate the issues.
During our tests, we ran a complex DAG that dynamically creates over 500 tasks and uses external sensors to wait for a task completion in a different DAG. After running on an Amazon MWAA large environment class with auto scaling set up to a maximum of 10 worker nodes, we noticed the following metrics and values in the CloudWatch dashboard.
The worker nodes have reached maximum CPU capacity, causing the number of queued tasks to keep increasing. The metadata database CPU utilization has peaked at over 65% capacity, and the available database free memory has been reduced. In this situation, we could further increase the worker nodes to scale, but that would put additional load on the metadata database CPU. This might lead to a drop in the number of worker database connections and available free database memory.
With new environment classes, you can vertically scale to increase available resources by editing the environment and selecting a higher class of environment, as shown in the following screenshot.
From the list of environments, we select the one in use for this test. Choose Edit to navigate to the Configure advanced settings page, and select the appropriate xlarge or 2xlarge environment as required.
After you save the change, the environment upgrade will take 20–30 minutes to complete. Any running DAG that got interrupted during the upgrade is scheduled for a retry, depending on the way you configured the retries for your DAGs. You can now choose to invoke them manually or wait for the next scheduled run.
After we upgraded the environment class, we tested the same DAG and observed the metrics were showing improved values because more resources are now available. With this XL environment, you can run more tasks on fewer worker nodes, and therefore the number of queued tasks kept decreasing. Alternately, if you have tasks that require more memory and/or CPU, you can reduce the tasks per worker, but still achieve a high number of tasks per worker with a larger environment size. For example, if you have a large environment where the worker node CPU is maxed out with celery.worker_autoscale (the Airflow configuration that defines the number of tasks per worker) Set at 20,20, you can increase to an XL environment and set celery.worker_autoscale to 20,20 on the XL, rather than the default 40 tasks per worker on an XL environment and the CPU load should reduce significantly.
Amazon MWAA XL and 2XL environment classes are available today in all Regions where Amazon MWAA is currently available.
Conclusion
Today, we are announcing the availability of two new environment classes in Amazon MWAA. With XL and 2XL environment classes, you can orchestrate larger volumes of complex or resource-intensive workflows. If you are running DAGs with a high number of dependencies, running thousands of DAGs across multiple environments, or in a scenario that requires you to heavily use workers for compute, you can now overcome the related capacity issues by increasing your environment resources in a few straightforward steps.
In this post, we discussed the capabilities of the two new environment classes, including pricing and some common resource constraint problems they solve. We provided guidance and an example of how to observe your existing environments to plan scaling to XL or 2XL, and we described how you can upgrade existing environments to use the increased resources.
Apache, Apache Airflow, and Airflow are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries.
About the Authors
Hernan Garcia is a Senior Solutions Architect at AWS based in the Netherlands. He works in the financial services industry, supporting enterprises in their cloud adoption. He is passionate about serverless technologies, security, and compliance. He enjoys spending time with family and friends, and trying out new dishes from different cuisines.
Jeetendra Vaidya is a Senior Solutions Architect at AWS, bringing his expertise to the realms of AI/ML, serverless, and data analytics domains. He is passionate about assisting customers in architecting secure, scalable, reliable, and cost-effective solutions.
Sriharsh Adari is a Senior Solutions Architect at AWS, where he helps customers work backward from business outcomes to develop innovative solutions on AWS. Over the years, he has helped multiple customers on data platform transformations across industry verticals. His core area of expertise includes technology strategy, data analytics, and data science. In his spare time, he enjoys playing sports, watching TV shows, and playing Tabla.
2023 was a rollercoaster year in tech, and we at the AWS Architecture Blog feel so fortunate to have shared in the excitement. As we move into 2024 and all of the new technologies we could see, we want to take a moment to highlight the brightest stars from 2023.
As always, thanks to our readers and to the many talented and hardworking Solutions Architects and other contributors to our blog.
I give you our 2023 cream of the crop!
#10: Build a serverless retail solution for endless aisle on AWS
In this post, Sandeep and Shashank help retailers and their customers alike in this guided approach to finding inventory that doesn’t live on shelves.
Figure 1. Building endless aisle architecture for order processing
#9: Optimizing data with automated intelligent document processing solutions
Who else dreads wading through large amounts of data in multiple formats? Just me? I didn’t think so. Using Amazon AI/ML and content-reading services, Deependra, Anirudha, Bhajandeep, and Senaka have created a solution that is scalable and cost-effective to help you extract the data you need and store it in a format that works for you.
#8: Disaster Recovery Solutions with AWS managed services, Part 3: Multi-Site Active/Passive
Disaster recovery posts are always popular, and this post by Brent and Dhruv is no exception. Their creative approach in part 3 of this series is most helpful for customers who have business-critical workloads with higher availability requirements.
#7: Simulating Kubernetes-workload AZ failures with AWS Fault Injection Simulator
Continuing with the theme of “when bad things happen,” we have Siva, Elamaran, and Re’s post about preparing for workload failures. If resiliency is a concern (and it really should be), the secret is test, test, TEST.
Figure 4. Architecture flow for Microservices to simulate a realistic failure scenario
Luca, Laura, Vittorio, and Zamira weren’t content with their four top-10 spots last year – they’re back with some things you definitely need to know about event-driven architectures.
#5: Use a reusable ETL framework in your AWS lake house architecture
As your lake house increases in size and complexity, you could find yourself facing maintenance challenges, and Ashutosh and Prantik have a solution: frameworks! The reusable ETL template with AWS Glue templates might just save you a headache or three.
#4: Invoking asynchronous external APIs with AWS Step Functions
It’s possible that AWS’ menagerie of services doesn’t have everything you need to run your organization. (Possible, but not likely; we have a lot of amazing services.) If you are using third-party APIs, then Jorge, Hossam, and Shirisha’s architecture can help you maintain a secure, reliable, and cost-effective relationship among all involved.
#3: Announcing updates to the AWS Well-Architected Framework
The Well-Architected Framework continues to help AWS customers evaluate their architectures against its six pillars. They are constantly striving for improvement, and Haleh’s diligence in keeping us up to date has not gone unnoticed. Thank you, Haleh!
#2: Let’s Architect! Designing architectures for multi-tenancy
The practically award-winning Let’s Architect! series strikes again! This time, Luca, Laura, Vittorio, and Zamira were joined by Federica to discuss multi-tenancy and why that concept is so crucial for SaaS providers.
#1: Understand resiliency patterns and trade-offs to architect efficiently in the cloud
Haresh, Lewis, and Bonnie revamped this 2022 post into a masterpiece that completely stole our readers’ hearts and is among the top posts we’ve ever made!
Organizations use Amazon MWAA to enhance their business workflows. For example, C2i Genomics uses Amazon MWAA in their data platform to orchestrate the validation of algorithms processing cancer genomics data in billions of records. Twitch, a live streaming platform, manages and orchestrates the training and deployment of its recommendation models for over 140 million active users. They use Amazon MWAA to scale, while significantly improving security and reducing infrastructure management overhead.
Today, we are announcing the availability of Apache Airflow version 2.8.1 environments on Amazon MWAA. In this post, we walk you through some of the new features and capabilities of Airflow now available in Amazon MWAA, and how you can set up or upgrade your Amazon MWAA environment to version 2.8.1.
Object storage
As data pipelines scale, engineers struggle to manage storage across multiple systems with unique APIs, authentication methods, and conventions for accessing data, requiring custom logic and storage-specific operators. Airflow now offers a unified object storage abstraction layer that handles these details, letting engineers focus on their data pipelines. Airflow object storage uses fsspec to enable consistent data access code across different object storage systems, thereby streamlining infrastructure complexity.
The following are some of the feature’s key benefits:
Portable workflows – You can switch storage services with minimal changes in your Directed Acyclic Graphs (DAGs)
Efficient data transfers – You can stream data instead of loading into memory
Reduced maintenance – You don’t need separate operators, making your pipelines straightforward to maintain
Familiar programming experience – You can use Python modules, like shutil, for file operations
In the sample code below, you can see how to move data directly from Google Cloud Storage to Amazon S3. Because Airflow’s object storage uses shutil.copyfileobj, the objects’ data is read in chunks from gcs_data_source and streamed to amazon_s3_data_target.
gcs_data_source = ObjectStoragePath("gcs://source-bucket/prefix/", conn_id="google_cloud_default")
amazon_s3_data_target = ObjectStoragePath("s3://target-bucket/prefix/", conn_id="aws_default ")
with DAG(
dag_id="copy_from_gcs_to_amazon_s3",
start_date=datetime(2024, 2, 26),
schedule="0 0 * * *",
catchup=False,
tags=["2.8", "ObjectStorage"],
) as dag:
def list_objects(path: ObjectStoragePath) -> list[ObjectStoragePath]:
objects = [f for f in path.iterdir() if f.is_file()]
return objects
def copy_object(path: ObjectStoragePath, object: ObjectStoragePath):
object.copy(dst=path)
objects_list = list_objects(path=gcs_data_source)
copy_object.partial(path=amazon_s3_data_target).expand(object=objects_list)
For more information on Airflow object storage, refer to Object Storage.
XCom UI
XCom (cross-communications) allows for the passing of data between tasks, facilitating communication and coordination between them. Previously, developers had to switch to a diffferent view to see XComs related to a task. With Airflow 2.8, XCom key-values are rendered directly on a tab within the Airflow Grid view, as shown in the following screenshot.
The new XCom tab provides the following benefits:
Improved XCom visibility – A dedicated tab in the UI provides a convenient and user-friendly way to see all XComs associated with a DAG or task.
Improved debugging – Being able to see XCom values directly in the UI is helpful for debugging DAGs. You can quickly see the output of upstream tasks without needing to manually pull and inspect them using Python code.
Task context logger
Managing task lifecycles is crucial for the smooth operation of data pipelines in Airflow. However, certain challenges have persisted, particularly in scenarios where tasks are unexpectedly stopped. This can occur due to various reasons, including scheduler timeouts, zombie tasks (tasks that remain in a running state without sending heartbeats), or instances where the worker runs out of memory.
Traditionally, such failures, particularly those triggered by core Airflow components like the scheduler or executor, weren’t recorded within the task logs. This limitation required users to troubleshoot outside the Airflow UI, complicating the process of pinpointing and resolving issues.
Airflow 2.8 introduced a significant improvement that addresses this problem. Airflow components, including the scheduler and executor, can now use the new TaskContextLogger to forward error messages directly to the task logs. This feature allows you to see all the relevant error messages related to a task’s run in one place. This simplifies the process of figuring out why a task failed, offering a complete perspective of what went wrong within a single log view.
The following screenshot shows how the task is detected as zombie, and the scheduler log is being included as part of the task log.
You need to set the environment configuration parameter enable_task_context_logger to True, to enable the feature. Once it’s enabled, Airflow can ship logs from the scheduler, the executor, or callback run context to the task logs, and make them available in the Airflow UI.
Listener hooks for datasets
Datasets were introduced in Airflow 2.4 as a logical grouping of data sources to create data-aware scheduling and dependencies between DAGs. For example, you can schedule a consumer DAG to run when a producer DAG updates a dataset. Listeners enable Airflow users to create subscriptions to certain events happening in the environment. In Airflow 2.8, listeners are added for two datasets events: on_dataset_created and on_dataset_changed, effectively allowing Airflow users to write custom code to react to dataset management operations. For example, you can trigger an external system, or send a notification.
Using listener hooks for datasets is straightforward. Complete the following steps to create a listener for on_dataset_changed:
Create the listener (dataset_listener.py):
from airflow import Dataset
from airflow.listeners import hookimpl
@hookimpl
def on_dataset_changed(dataset: Dataset):
"""Following custom code is executed when a dataset is changed."""
print("Invoking external endpoint")
"""Validating a specific dataset"""
if dataset.uri == "s3://bucket-prefix/object-key.ext":
print ("Execute specific/different action for this dataset")
Create a plugin to register the listener in your Airflow environment (dataset_listener_plugin.py):
from airflow.plugins_manager import AirflowPlugin
from plugins import listener_code
class DatasetListenerPlugin(AirflowPlugin):
name = "dataset_listener_plugin"
listeners = [dataset_listener]
Upon successful creation of an Airflow version 2.8.1 environment in Amazon MWAA, certain packages are automatically installed on the scheduler and worker nodes. For a complete list of installed packages and their versions, refer to Apache Airflow provider packages installed on Amazon MWAA environments. You can install additional packages using a requirements file.
Upgrade from older versions of Airflow to version 2.8.1
In this post, we discussed some important features introduced in Airflow version 2.8, such as object storage, the new XCom tab added to the grid view, task context logging, listener hooks for datasets, and how you can start using them. We also provided some sample code to show implementations in Amazon MWAA. For the complete list of changes, refer to Airflow’s release notes.
Apache, Apache Airflow, and Airflow are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries.
About the Authors
Mansi Bhutada is an ISV Solutions Architect based in the Netherlands. She helps customers design and implement well-architected solutions in AWS that address their business problems. She is passionate about data analytics and networking. Beyond work, she enjoys experimenting with food, playing pickleball, and diving into fun board games.
Hernan Garcia is a Senior Solutions Architect at AWS based in the Netherlands. He works in the financial services industry, supporting enterprises in their cloud adoption. He is passionate about serverless technologies, security, and compliance. He enjoys spending time with family and friends, and trying out new dishes from different cuisines.
In the dynamic world of cloud computing, ensuring the resilience and availability of critical applications is paramount. Disaster recovery (DR) is the process by which an organization anticipates and addresses technology-related disasters. For organizations implementing critical workload orchestration using Amazon Managed Workflows for Apache Airflow (Amazon MWAA), it is crucial to have a DR plan in place to ensure business continuity.
In this series, we explore the need for Amazon MWAA disaster recovery and prescribe solutions that will sustain Amazon MWAA environments against unintended disruptions. This lets you to define, avoid, and handle disruption risks as part of your business continuity plan. This post focuses on designing the overall DR architecture. A future post in this series will focus on implementing the individual components using AWS services.
The need for Amazon MWAA disaster recovery
Amazon MWAA, a fully managed service for Apache Airflow, brings immense value to organizations by automating workflow orchestration for extract, transform, and load (ETL), DevOps, and machine learning (ML) workloads. Amazon MWAA has a distributed architecture with multiple components such as scheduler, worker, web server, queue, and database. This makes it difficult to implement a comprehensive DR strategy.
An active Amazon MWAA environment continuously parses Airflow Directed Acyclic Graphs (DAGs), reading them from a configured Amazon Simple Storage Service (Amazon S3) bucket. DAG source unavailability due to network unreachability, unintended corruption, or deletes leads to extended downtime and service disruption.
Within Airflow, the metadata database is a core component storing configuration variables, roles, permissions, and DAG run histories. A healthy metadata database is therefore critical for your Airflow environment. As with any core Airflow component, having a backup and disaster recovery plan in place for the metadata database is essential.
Amazon MWAA deploys Airflow components to multiple Availability Zones within your VPC in your preferred AWS Region. This provides fault tolerance and automatic recovery against a single Availability Zone failure. For mission-critical workloads, being resilient to the impairments of a unitary Region through multi-Region deployments is additionally important to ensure high availability and business continuity.
Balancing between costs to maintain redundant infrastructures, complexity, and recovery time is essential for Amazon MWAA environments. Organizations aim for cost-effective solutions that minimize their Recovery Time Objective (RTO) and Recovery Point Objective (RPO) to meet their service level agreements, be economically viable, and meet their customers’ demands.
Detect disasters in the primary environment: Proactive monitoring through metrics and alarms
Prompt detection of disasters in the primary environment is crucial for timely disaster recovery. Monitoring the Amazon CloudWatchSchedulerHeartbeat metric provides insights into Airflow health of an active Amazon MWAA environment. You can add other health check metrics to the evaluation criteria, such as checking the availability of upstream or downstream systems and network reachability. Combined with CloudWatch alarms, you can send notifications when these thresholds over a number of time periods are not met. You can add alarms to dashboards to monitor and receive alerts about your AWS resources and applications across multiple Regions.
AWS publishes our most up-to-the-minute information on service availability on the Service Health Dashboard. You can check at any time to get current status information, or subscribe to an RSS feed to be notified of interruptions to each individual service in your operating Region. The AWS Health Dashboard provides information about AWS Health events that can affect your account.
By combining metric monitoring, available dashboards, and automatic alarming, you can promptly detect unavailability of your primary environment, enabling proactive measures to transition to your DR plan. It is critical to factor in incident detection, notification, escalation, discovery, and declaration into your DR planning and implementation to provide realistic and achievable objectives that provide business value.
In the following sections, we discuss two Amazon MWAA DR strategy solutions and their architecture.
DR strategy solution 1: Backup and restore
The backup and restore strategy involves generating Airflow component backups in the same or different Region as your primary Amazon MWAA environment. To ensure continuity, you can asynchronously replicate these to your DR Region, with minimal performance impact on your primary Amazon MWAA environment. In the event of a rare primary Regional impairment or service disruption, this strategy will create a new Amazon MWAA environment and recover historical data to it from existing backups. However, it’s important to note that during the recovery process, there will be a period where no Airflow environments are operational to process workflows until the new environment is fully provisioned and marked as available.
This strategy provides a low-cost and low-complexity solution that is also suitable for mitigating against data loss or corruption within your primary Region. The amount of data being backed up and the time to create a new Amazon MWAA environment (typically 20–30 minutes) affects how quickly restoration can happen. To enable infrastructure to be redeployed quickly without errors, deploy using infrastructure as code (IaC). Without IaC, it may be complex to restore an analogous DR environment, which will lead to increased recovery times and possibly exceed your RTO.
Let’s explore the setup required when your primary Amazon MWAA environment is actively running, as shown in the following figure.
The solution comprises three key components. The first component is the primary environment, where the Airflow workflows are initially deployed and actively running. The second component is the disaster monitoring component, comprised of CloudWatch and a combination of an AWS Step Functions state machine and a AWS Lambda function. The third component is for creating and storing backups of all configurations and metadata that is required to restore. This can be in the same Region as your primary or replicated to your DR Region using S3 Cross-Region Replication (CRR). For CRR, you also pay for inter-Region data transfer out from Amazon S3 to each destination Region.
The first three steps in the workflow are as follows:
As part of your backup creation process, Airflow metadata is replicated to an S3 bucket using an export DAG utility, run periodically based on your RPO interval.
Your existing primary Amazon MWAA environment automatically emits the status of its scheduler’s health to the CloudWatch SchedulerHeartbeat metric.
A multi-step Step Functions state machine is triggered from a periodic Amazon EventBridgeschedule to monitor the scheduler’s health status. As the primary step of the state machine, a Lambda function evaluates the status of the SchedulerHeartbeat metric. If the metric is deemed healthy, no action is taken.
The following figure illustrates the additional steps in the solution workflow.
When the heartbeat count deviates from the normal count for a period of time, a series of actions are initiated to recover to a new Amazon MWAA environment in the DR Region. These actions include starting creation of a new Amazon MWAA environment, replicating the primary environment configurations, and then waiting for the new environment to become available.
When the environment is available, an import DAG utility is run to restore the metadata contents from the backups. Any DAG runs that were interrupted during the impairment of the primary environment need to be manually rerun to maintain service level agreements. Future DAG runs are queued to run as per their next configured schedule.
DR strategy solution 2: Active-passive environments with periodic data synchronization
The active-passive environments with periodic data synchronization strategy focuses on maintaining recurrent data synchronization between an active primary and a passive Amazon MWAA DR environment. By periodically updating and synchronizing DAG stores and metadata databases, this strategy ensures that the DR environment remains current or nearly current with the primary. The DR Region can be the same or a different Region than your primary Amazon MWAA environment. In the event of a disaster, backups are available to revert to a previous known good state to minimize data loss or corruption.
This strategy provides low RTO and RPO with frequent synchronization, allowing quick recovery with minimal data loss. The infrastructure costs and code deployments are compounded to maintain both the primary and DR Amazon MWAA environments. Your DR environment is available immediately to run DAGs on.
The following figure illustrates the setup required when your primary Amazon MWAA environment is actively running.
The solution comprises four key components. Similar to the backup and restore solution, the first component is the primary environment, where the workflow is initially deployed and is actively running. The second component is the disaster monitoring component, consisting of CloudWatch and a combination of a Step Functions state machine and Lambda function. The third component creates and stores backups for all configurations and metadata required for the database synchronization. This can be in the same Region as your primary or replicated to your DR Region using Amazon S3 Cross-Region Replication. As mentioned earlier, for CRR, you also pay for inter-Region data transfer out from Amazon S3 to each destination Region. The last component is a passive Amazon MWAA environment that has the same Airflow code and environment configurations as the primary. The DAGs are deployed in the DR environment using the same continuous integration and continuous delivery (CI/CD) pipeline as the primary. Unlike the primary, DAGs are kept in a paused state to not cause duplicate runs.
The first steps of the workflow are similar to the backup and restore strategy:
As part of your backup creation process, Airflow metadata is replicated to an S3 bucket using an export DAG utility, run periodically based on your RPO interval.
Your existing primary Amazon MWAA environment automatically emits the status of its scheduler’s health to CloudWatch SchedulerHeartbeat metric.
A multi-step Step Functions state machine is triggered from a periodic Amazon EventBridge schedule to monitor scheduler health status. As the primary step of the state machine, a Lambda function evaluates the status of the SchedulerHeartbeat metric. If the metric is deemed healthy, no action is taken.
The following figure illustrates the final steps of the workflow.
When the heartbeat count deviates from the normal count for a period of time, DR actions are initiated.
As a first step, a Lambda function triggers an import DAG utility to restore the metadata contents from the backups to the passive Amazon MWAA DR environment. When the imports are complete, the same DAG can un-pause the other Airflow DAGs, making them active for future runs. Any DAG runs that were interrupted during the impairment of the primary environment need to be manually rerun to maintain service level agreements. Future DAG runs are queued to run as per their next configured schedule.
Best practices to improve resiliency of Amazon MWAA
To enhance the resiliency of your Amazon MWAA environment and ensure smooth disaster recovery, consider implementing the following best practices:
Robust backup and restore mechanisms – Implementing comprehensive backup and restore mechanisms for Amazon MWAA data is essential. Regularly deleting existing metadata based on your organization’s retention policies reduces backup times and makes your Amazon MWAA environment more performant.
Automation using IaC – Using automation and orchestration tools such as AWS CloudFormation, the AWS Cloud Development Kit (AWS CDK), or Terraform can streamline the deployment and configuration management of Amazon MWAA environments. This ensures consistency, reproducibility, and faster recovery during DR scenarios.
Idempotent DAGs and tasks – In Airflow, a DAG is considered idempotent if rerunning the same DAG with the same inputs multiple times has the same effect as running it only once. Designing idempotent DAGs and keeping tasks atomic decreases recovery time from failures when you have to manually rerun an interrupted DAG in your recovered environment.
Regular testing and validation – A robust Amazon MWAA DR strategy should include regular testing and validation exercises. By simulating disaster scenarios, you can identify any gaps in your DR plans, fine-tune processes, and ensure your Amazon MWAA environments are fully recoverable.
Conclusion
In this post, we explored the challenges for Amazon MWAA disaster recovery and discussed best practices to improve resiliency. We examined two DR strategy solutions: backup and restore and active-passive environments with periodic data synchronization. By implementing these solutions and following best practices, you can protect your Amazon MWAA environments, minimize downtime, and mitigate the impact of disasters. Regular testing, validation, and adaptation to evolving requirements are crucial for an effective Amazon MWAA DR strategy. By continuously evaluating and refining your disaster recovery plans, you can ensure the resilience and uninterrupted operation of your Amazon MWAA environments, even in the face of unforeseen events.
Parnab Basak is a Senior Solutions Architect and a Serverless Specialist at AWS. He specializes in creating new solutions that are cloud native using modern software development practices like serverless, DevOps, and analytics. Parnab works closely in the analytics and integration services space helping customers adopt AWS services for their workflow orchestration needs.
Chandan Rupakheti is a Solutions Architect and a Serverless Specialist at AWS. He is a passionate technical leader, researcher, and mentor with a knack for building innovative solutions in the cloud and bringing stakeholders together in their cloud journey. Outside his professional life, he loves spending time with his family and friends besides listening and playing music.
Vinod Jayendra is a Enterprise Support Lead in ISV accounts at Amazon Web Services, where he helps customers in solving their architectural, operational, and cost optimization challenges. With a particular focus on Serverless technologies, he draws from his extensive background in application development to deliver top-tier solutions. Beyond work, he finds joy in quality family time, embarking on biking adventures, and coaching youth sports team.
Rupesh Tiwari is a Senior Solutions Architect at AWS in New York City, with a focus on Financial Services. He has over 18 years of IT experience in the finance, insurance, and education domains, and specializes in architecting large-scale applications and cloud-native big data workloads. In his spare time, Rupesh enjoys singing karaoke, watching comedy TV series, and creating joyful moments with his family.
As data engineering becomes increasingly complex, organizations are looking for new ways to streamline their data processing workflows. Many data engineers today use Apache Airflow to build, schedule, and monitor their data pipelines.
However, as the volume of data grows, managing and scaling these pipelines can become a daunting task. Amazon Managed Workflows for Apache Airflow (Amazon MWAA) can help simplify the process of building, running, and managing data pipelines. By providing Apache Airflow as a fully managed platform, Amazon MWAA allows data engineers to focus on building data workflows instead of worrying about infrastructure.
Today, businesses and organizations require cost-effective and efficient ways to process large amounts of data. Amazon EMR Serverless is a cost-effective and scalable solution for big data processing that can handle large volumes of data. The Amazon Provider in Apache Airflow comes with EMR Serverless operators and is already included in Amazon MWAA, making it easy for data engineers to build scalable and reliable data processing pipelines. You can use EMR Serverless to run Spark jobs on the data, and use Amazon MWAA to manage the workflows and dependencies between these jobs. This integration can also help reduce costs by automatically scaling the resources needed to process data.
Amazon Athena is a serverless, interactive analytics service built on open-source frameworks, supporting open-table and file formats. You can use standard SQL to interact with data. Athena, a serverless and interactive analytics service, makes this possible without the need to manage complex infrastructure.
In this post, we use Amazon MWAA, EMR Serverless, and Athena to build a complete end-to-end data processing pipeline.
Solution overview
The following diagram illustrates the solution architecture.
The workflow includes the following steps:
Create an Amazon MWAA workflow that retrieves data from your input Amazon Simple Storage Service (Amazon S3) bucket.
Use EMR Serverless to process the data stored in Amazon S3. EMR Serverless automatically scales up or down based on the workload, so you don’t need to worry about provisioning or managing any infrastructure.
Use EMR Serverless to transform the data using PySpark code and then store the transformed data back in your S3 bucket.
Use Athena to create an external table based on the S3 dataset and run queries to analyze the transformed data. Athena uses the AWS Glue Data Catalog to store the table metadata.
A basic understanding of Amazon S3, Athena for running SQL queries, Amazon MWAA to create an environment, and EMR Serverless.
A VPC with two private subnets.
An AWS Identity and Access Management (IAM) role with permissions to create an Amazon MWAA cluster, create the AWS Glue Data Catalog, and run SQL queries using Athena.
To illustrate using EMR Serverless jobs with Apache Spark via Amazon MWAA and data validation using Athena, we use the publicly available NYC taxi dataset. Download the following datasets to your local machine:
Green taxi and Yellow taxi trip records – Trip records for yellow and green taxis, which include information such as pick-up and drop-off dates and times, locations, trip distances, and payment types. In our example, we use the latest Parquet files for 2022.
In later steps, we upload these datasets to Amazon S3.
Create solution resources
This section outlines the steps for setting up data processing and transformation.
Create an EMR Serverless application
You can create one or more EMR Serverless applications that use open source analytics frameworks like Apache Spark or Apache Hive. Unlike EMR on EC2, you do not need to delete or terminate EMR Serverless applications. EMR Serverless application is only a definition and once created, can be re-used as long as needed. This makes the MWAA pipeline simpler as now you just have to submit jobs to a pre-created EMR Serverless application.
By default, EMR Serverless application will auto-start on job submission and auto-stop when idle for 15 minutes by default to ensure cost efficiency. You can modify the amount of idle time or choose to turn the feature off.
To create an application using EMR Serverless console, follow the instructions in “Create an EMR Serverless application”. Note down the application ID as we will use it in following steps.
Create an S3 bucket and folders
Complete the following steps to set up your S3 bucket and folders:
Note the name of the S3 bucket to use in later steps.
Create an input_data folder for storing input data.
Within that folder, create three separate folders, one for each dataset: green, yellow, and zone_lookup.
You can download and work with the latest datasets available. For our testing, we use the following files:
The green/ folder has the file green_tripdata_2022-06.parquet
The yellow/ folder has the file yellow_tripdata_2022-06.parquet
The zone_lookup/ folder has the file taxi_zone_lookup.csv
Set up the Amazon MWAA DAG scripts
Complete the following steps to set up your DAG scripts:
Download the following scripts to your local machine:
requirements.txt – A Python dependency is any package or distribution that is not included in the Apache Airflow base install for your Apache Airflow version on your Amazon MWAA environment. For this post, we use Boto3 version >=1.23.9.
blog_dag_mwaa_emrs_ny_taxi.py – This script is a part of the Amazon MWAA DAG and consists of the following tasks: yellow_taxi_zone_lookup, green_taxi_zone_lookup, and ny_taxi_summary,. These tasks involve running Spark jobs to lookup taxi zones, and generating a data summary .
green_zone.py – This PySpark script reads data files for green taxi rides and zone lookup, performs a join operation to combine them, and generates an output file containing green taxi rides with zone information. It utilizes temporary views for the df_green and df_zone data frames, performs column-based joins, and aggregates data like passenger count, trip distance, and fare amount. Lastly, it creates the output_data folder in the specified S3 bucket to write the resulting data frame, df_green_zone, as Parquet files.
yellow_zone.py – This PySpark script processes yellow taxi ride and zone lookup data files by joining them to generate an output file containing yellow taxi rides with zone information. The script accepts a user-provided S3 bucket name and initiates a Spark session with the application name yellow_zone. It reads the yellow taxi files and zone lookup file from the specified S3 bucket, creates temporary views, performs a join based on location ID, and calculates statistics such as passenger count, trip distance, and fare amount. Lastly, it creates the output_data folder in the specified S3 bucket to write the resulting data frame, df_yellow_zone, as Parquet files.
ny_taxi_summary.py – This PySpark script processes the green_zone and yellow_zone files to aggregate statistics on taxi rides, grouping data by service zones and location IDs. It requires an S3 bucket name as a command line argument, creates a SparkSession named ny_taxi_summary, reads the files from S3, performs a join, and generates a new data frame named ny_taxi_summary. It creates an output_data folder in the specified S3 bucket to write the resulting data frame to new Parquet files.
On your local machine, update the blog_dag_mwaa_emrs_ny_taxi.py script with the following information:
Update your S3 bucket name in the following two lines:
JOB_ROLE_ARN = “<<emr_serverless_execution_role ARN here>>”
e.g. arn:aws:iam::<<ACCOUNT_ID>>:role/<<ROLE_NAME>>
Update EMR Serverless Application ID. Use the Application ID created earlier.
EMR_SERVERLESS_APPLICATION_ID = “<<emr serverless application ID here>>”
Upload the requirements.txt file to the S3 bucket created earlier
In the S3 bucket, create a folder named dags and upload the updated blog_dag_mwaa_emrs_ny_taxi.py file from your local machine.
On the Amazon S3 console, create a new folder named scripts inside the S3 bucket and upload the scripts to this folder from your local machine.
Create an Amazon MWAA environment
To create an Airflow environment, complete the following steps:
On the Amazon MWAA console, choose Create environment.
For Name, enter mwaa_emrs_athena_pipeline.
For Airflow version, choose the latest version (for this post, 2.5.1).
For S3 Bucket, enter the path to your S3 bucket.
For DAGs folder, enter the path to your dags folder.
For Requirements file, enter the path to the requirements.txt file.
Choose Next.
For Virtual private cloud (VPC), choose a VPC that has a minimum of two private subnets.
This will populate two of the private subnets in your VPC.
Under Web server access, select Public network.
This allows the Apache Airflow UI to be accessed over the internet by users granted access to the IAM policy for your environment.
For Security group(s), select Create new security group.
For Environment class, select mw1.small.
For Execution role, choose Create a new role.
For Role name, enter a name.
Leave the other configurations as default and choose Next.
On the next page, choose Createenvironment.
It may take about 20–30 minutes to create your Amazon MWAA environment.
When the Amazon MWAA environment status changes to Available, navigate to the IAM console and update cluster execution role to add pass role privileges to emr_serverless_execution_role.
Trigger the Amazon MWAA DAG
To trigger the DAG, complete the following steps:
On the Amazon MWAA console, choose Environments in the navigation pane.
Open your environment and choose Open Airflow UI.
Select blog_dag_mwaa_emr_ny_taxi, choose the play icon, and choose Trigger DAG.
When the DAG is running, choose the DAG blog_dag_mwaa_emrs_ny_taxi and choose Graph to locate your DAG run workflow.
The DAG will take approximately 4–6 minutes to run all the scripts. You will see all the complete tasks and the overall status of the DAG will show as success.
To rerun the DAG, remove s3://<<your_s3_bucket here >>/output_data/.
Optionally, to understand how Amazon MWAA runs these tasks, choose the task you want to inspect.
Choose Run to view the task run details.
The following screenshot shows an example of the task logs.
If you like to dive deep in the execution logs, then on the EMR Serverless console, navigate to “Applications”. The Apache Spark driver logs will indicate the initiation of your job along with the details for executors, stages and tasks that were created by EMR Serverless. These logs can be helpful to monitor your job progress and troubleshoot failures.
By default, EMR Serverless will store application logs securely in Amazon EMR managed storage for a period of 30 days. However, you can also specify Amazon S3 or Amazon CloudWatch as your log delivery options during job submission.
Validate the final result set with Athena
Let’s validate the data loaded by the process using Athena SQL queries.
On the Athena console, choose Query editor in the navigation pane.
If you’re using Athena for the first time, under Settings, choose Manage and enter the S3 bucket location that you created earlier (<S3_BUCKET_NAME>/athena), then choose Save.
In the query editor, enter the following query to create an external table:
CREATE EXTERNAL TABLE default.ny_taxi_summary(
pu_service_zone string,
pulocationid bigint,
do_service_zone string,
dolocationid bigint,
passenger_count bigint,
trip_distance double,
fare_amount double,
extra double,
mta_tax double,
tip_amount double,
tolls_amount double,
improvement_surcharge double,
total_amount double,
congestion_surcharge double,
airport_fee double)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
's3://<<YOUR-S3-BUCKET Here>>/output_data/ny_taxi_summary/' -- *** Change bucket name to your bucket***
TBLPROPERTIES (
'classification'='parquet',
'compressionType'='none');
Run the following query on the recently created ny_taxi_summary table to retrieve the first 10 rows to validate the data:
select * from default.ny_taxi_summary limit 10;
Clean up
To prevent future charges, complete the following steps:
On the Amazon S3 console, delete the S3 bucket you created to store the Amazon MWAA DAG, scripts, and logs.
On the Athena console, drop the table you created:
drop table default.ny_taxi_summary;
On the Amazon MWAA console, navigate to the environment that you created and choose Delete.
On the EMR Studio console, delete the application.
To delete the application, navigate to the List applications page. Select the application that you created and choose Actions → Stop to stop the application. After the application is in the STOPPED state, select the same application and choose Actions → Delete.
Conclusion
Data engineering is a critical component of many organizations, and as data volumes continue to grow, it’s essential to find ways to streamline data processing workflows. The combination of Amazon MWAA, EMR Serverless, and Athena provides a powerful solution to build, run, and manage data pipelines efficiently. With this end-to-end data processing pipeline, data engineers can easily process and analyze large amounts of data quickly and cost-effectively without the need to manage complex infrastructure. The integration of these AWS services provides a robust and scalable solution for data processing, helping organizations make informed decisions based on their data insights.
Now that you’ve seen how to submit Spark jobs on EMR Serverless via Amazon MWAA, we encourage you to use Amazon MWAA to create a workflow that will run PySpark jobs via EMR Serverless.
We welcome your feedback and inquiries. Please feel free to reach out to us if you have any questions or comments.
About the authors
Rahul Sonawane is a Principal Analytics Solutions Architect at AWS with AI/ML and Analytics as his area of specialty.
Gaurav Parekh is a Solutions Architect helping AWS customers build large scale modern architecture. He specializes in data analytics and networking. Outside of work, Gaurav enjoys playing cricket, soccer and volleyball.
Audit History
December 2023: This post was reviewed for technical accuracy by Santosh Gantaram, Sr. Technical Account Manager.
In this post, we demonstrate automating deployment of Amazon Managed Workflows for Apache Airflow (Amazon MWAA) using customer-managed endpoints in a VPC, providing compatibility with shared, or otherwise restricted, VPCs.
Data scientists and engineers have made Apache Airflow a leading open source tool to create data pipelines due to its active open source community, familiar Python development as Directed Acyclic Graph (DAG) workflows, and extensive library of pre-built integrations. Amazon MWAA is a managed service for Airflow that makes it easy to run Airflow on AWS without the operational burden of having to manage the underlying infrastructure. For each Airflow environment, Amazon MWAA creates a single-tenant service VPC, which hosts the metadatabase that stores states and the web server that provides the user interface. Amazon MWAA further manages Airflow scheduler and worker instances in a customer-owned and managed VPC, in order to schedule and run tasks that interact with customer resources. Those Airflow containers in the customer VPC access resources in the service VPC via a VPC endpoint.
Many organizations choose to centrally manage their VPC using AWS Organizations, allowing a VPC in an owner account to be shared with resources in a different participant account. However, because creating a new route outside of a VPC is considered a privileged operation, participant accounts can’t create endpoints in owner VPCs. Furthermore, many customers don’t want to extend the security privileges required to create VPC endpoints to all users provisioning Amazon MWAA environments. In addition to VPC endpoints, customers also wish to restrict data egress via Amazon Simple Queue Service (Amazon SQS) queues, and Amazon SQS access is a requirement in the Amazon MWAA architecture.
Shared VPC support for Amazon MWAA adds the ability for you to manage your own endpoints within your VPCs, adding compatibility to shared and otherwise restricted VPCs. Specifying customer-managed endpoints also provides the ability to meet strict security policies by explicitly restricting VPC resource access to just those needed by your Amazon MWAA environments. This post demonstrates how customer-managed endpoints work with Amazon MWAA and provides examples of how to automate the provisioning of those endpoints.
Solution overview
Shared VPC support for Amazon MWAA allows multiple AWS accounts to create their Airflow environments into shared, centrally managed VPCs. The account that owns the VPC (owner) shares the two private subnets required by Amazon MWAA with other accounts (participants) that belong to the same organization from AWS Organizations. After the subnets are shared, the participants can view, create, modify, and delete Amazon MWAA environments in the subnets shared with them.
When users specify the need for a shared, or otherwise policy-restricted, VPC during environment creation, Amazon MWAA will first create the service VPC resources, then enter a pending state for up to 72 hours, with an Amazon EventBridge notification of the change in state. This allows owners to create the required endpoints on behalf of participants based on endpoint service information from the Amazon MWAA console or API, or programmatically via an AWS Lambda function and EventBridge rule, as in the example in this post.
After those endpoints are created on the owner account, the endpoint service in the single-tenant Amazon MWAA VPC will detect the endpoint connection event and resume environment creation. Should there be an issue, you can cancel environment creation by deleting the environment during this pending state.
This feature also allows you to remove the create, modify, and delete VPCE privileges from the AWS Identity and Access Management (IAM) principal creating Amazon MWAA environments, even when not using a shared VPC, because that permission will instead be imposed on the IAM principal creating the endpoint (the Lambda function in our example). Furthermore, the Amazon MWAA environment will provide the SQS queue Amazon Resource Name (ARN) used by the Airflow Celery Executor to queue tasks (the Celery Executor Queue), allowing you to explicitly enter those resources into your network policy rather than having to provide a more open and generalized permission.
In this example, we create the VPC and Amazon MWAA environment in the same account. For shared VPCs across accounts, the EventBridge rule and Lambda function would exist in the owner account, and the Amazon MWAA environment would be created in the participant account. See Sending and receiving Amazon EventBridge events between AWS accounts for more information.
Prerequisites
You should have the following prerequisites:
An AWS account
An AWS user in that account, with permissions to create VPCs, VPC endpoints, and Amazon MWAA environments
We begin by creating a restrictive VPC using an AWS CloudFormation template, in order to simulate creating the necessary VPC endpoint and modifying the SQS endpoint policy. If you want to use an existing VPC, you can proceed to the next section.
On the AWS CloudFormation console, choose Create stack and choose With new resources (standard).
Under Specify template, choose Upload a template file.
Now we edit our CloudFormation template to restrict access to Amazon SQS. In cfn-vpc-private-bjs.yml, edit the SqsVpcEndoint section to appear as follows:
This additional policy document entry prevents Amazon SQS egress to any resource not explicitly listed.
Now we can create our CloudFormation stack.
On the AWS CloudFormation console, choose Create stack.
Select Upload a template file.
Choose Choose file.
Browse to the file you modified.
Choose Next.
For Stack name, enter MWAA-Environment-VPC.
Choose Next until you reach the review page.
Choose Submit.
Create the Lambda function
We have two options for self-managing our endpoints: manual and automated. In this example, we create a Lambda function that responds to the Amazon MWAA EventBridge notification. You could also use the EventBridge notification to send an Amazon Simple Notification Service (Amazon SNS) message, such as an email, to someone with permission to create the VPC endpoint manually.
First, we create a Lambda function to respond to the EventBridge event that Amazon MWAA will emit.
On the Lambda console, choose Create function.
For Name, enter mwaa-create-lambda.
For Runtime, choose Python 3.11.
Choose Create function.
For Code, in the Code source section, for lambda_function, enter the following code:
import boto3
import json
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
if event['detail']['status']=="PENDING":
detail=event['detail']
name=detail['name']
celeryExecutorQueue=detail['celeryExecutorQueue']
subnetIds=detail['networkConfiguration']['subnetIds']
securityGroupIds=detail['networkConfiguration']['securityGroupIds']
databaseVpcEndpointService=detail['databaseVpcEndpointService']
# MWAA does not need to store the VPC ID, but we can get it from the subnets
client = boto3.client('ec2')
response = client.describe_subnets(SubnetIds=subnetIds)
logger.info(response['Subnets'][0]['VpcId'])
vpcId=response['Subnets'][0]['VpcId']
logger.info("vpcId: " + vpcId)
webserverVpcEndpointService=None
if detail['webserverAccessMode']=="PRIVATE_ONLY":
webserverVpcEndpointService=event['detail']['webserverVpcEndpointService']
response = client.describe_vpc_endpoints(
VpcEndpointIds=[],
Filters=[
{"Name": "vpc-id", "Values": [vpcId]},
{"Name": "service-name", "Values": ["*.sqs"]},
],
MaxResults=1000
)
sqsVpcEndpoint=None
for r in response['VpcEndpoints']:
if subnetIds[0] in r['SubnetIds'] or subnetIds[0] in r['SubnetIds']:
# We are filtering describe by service name, so this must be SQS
sqsVpcEndpoint=r
break
if sqsVpcEndpoint:
logger.info("Found SQS endpoint: " + sqsVpcEndpoint['VpcEndpointId'])
logger.info(sqsVpcEndpoint)
pd = json.loads(sqsVpcEndpoint['PolicyDocument'])
for s in pd['Statement']:
if s['Effect']=='Allow':
resource = s['Resource']
logger.info(resource)
if '*' in resource:
logger.info("'*' already allowed")
elif celeryExecutorQueue in resource:
logger.info("'"+celeryExecutorQueue+"' already allowed")
else:
s['Resource'].append(celeryExecutorQueue)
logger.info("Updating SQS policy to " + str(pd))
client.modify_vpc_endpoint(
VpcEndpointId=sqsVpcEndpoint['VpcEndpointId'],
PolicyDocument=json.dumps(pd)
)
break
# create MWAA database endpoint
logger.info("creating endpoint to " + databaseVpcEndpointService)
endpointName=name+"-database"
response = client.create_vpc_endpoint(
VpcEndpointType='Interface',
VpcId=vpcId,
ServiceName=databaseVpcEndpointService,
SubnetIds=subnetIds,
SecurityGroupIds=securityGroupIds,
TagSpecifications=[
{
"ResourceType": "vpc-endpoint",
"Tags": [
{
"Key": "Name",
"Value": endpointName
},
]
},
],
)
logger.info("created VPCE: " + response['VpcEndpoint']['VpcEndpointId'])
# create MWAA web server endpoint (if private)
if webserverVpcEndpointService:
endpointName=name+"-webserver"
logger.info("creating endpoint to " + webserverVpcEndpointService)
response = client.create_vpc_endpoint(
VpcEndpointType='Interface',
VpcId=vpcId,
ServiceName=webserverVpcEndpointService,
SubnetIds=subnetIds,
SecurityGroupIds=securityGroupIds,
TagSpecifications=[
{
"ResourceType": "vpc-endpoint",
"Tags": [
{
"Key": "Name",
"Value": endpointName
},
]
},
],
)
logger.info("created VPCE: " + response['VpcEndpoint']['VpcEndpointId'])
return {
'statusCode': 200,
'body': json.dumps(event['detail']['status'])
}
Choose Deploy.
On the Configuration tab of the Lambda function, in the General configuration section, choose Edit.
For Timeout, increate to 5 minutes, 0 seconds.
Choose Save.
In the Permissions section, under Execution role, choose the role name to edit the permissions of this function.
For Permission policies, choose the link under Policy name.
Choose Edit and add a comma and the following statement:
Next, we configure EventBridge to send the Amazon MWAA notifications to our Lambda function.
On the EventBridge console, choose Create rule.
For Name, enter mwaa-create.
Select Rule with an event pattern.
Choose Next.
For Creation method, choose User pattern form.
Choose Edit pattern.
For Event pattern, enter the following:
{
"source": ["aws.airflow"],
"detail-type": ["MWAA Environment Status Change"]
}
Choose Next.
For Select a target, choose Lambda function.
You may also specify an SNS notification in order to receive a message when the environment state changes.
For Function, choose mwaa-create-lambda.
Choose Next until you reach the final section, then choose Create rule.
Create an Amazon MWAA environment
Finally, we create an Amazon MWAA environment with customer-managed endpoints.
On the Amazon MWAA console, choose Create environment.
For Name, enter a unique name for your environment.
For Airflow version, choose the latest Airflow version.
For S3 bucket, choose Browse S3 and choose your S3 bucket, or enter the Amazon S3 URI.
For DAGs folder, choose Browse S3 and choose the dags/ folder in your S3 bucket, or enter the Amazon S3 URI.
Choose Next.
For Virtual Private Cloud, choose the VPC you created earlier.
For Web server access, choose Public network (Internet accessible).
For Security groups, deselect Create new security group.
Choose the shared VPC security group created by the CloudFormation template.
Because the security groups of the AWS PrivateLink endpoints from the earlier step are self-referencing, you must choose the same security group for your Amazon MWAA environment.
For Endpoint management, choose Customer managed endpoints.
Keep the remaining settings as default and choose Next.
Choose Create environment.
When your environment is available, you can access it via the Open Airflow UI link on the Amazon MWAA console.
Clean up
Cleaning up resources that are not actively being used reduces costs and is a best practice. If you don’t delete your resources, you can incur additional charges. To clean up your resources, complete the following steps:
After the above resources have completed deletion, delete the CloudFormation stack to ensure that you have removed all of the remaining resources.
Summary
This post described how to automate environment creation with shared VPC support in Amazon MWAA. This gives you the ability to manage your own endpoints within your VPC, adding compatibility to shared, or otherwise restricted, VPCs. Specifying customer-managed endpoints also provides the ability to meet strict security policies by explicitly restricting VPC resource access to just those needed by their Amazon MWAA environments. To learn more about Amazon MWAA, refer to the Amazon MWAA User Guide. For more posts about Amazon MWAA, visit the Amazon MWAA resources page.
About the author
John Jackson has over 25 years of software experience as a developer, systems architect, and product manager in both startups and large corporations and is the AWS Principal Product Manager responsible for Amazon MWAA.
The collective thoughts of the interwebz
By continuing to use the site, you agree to the use of cookies. more information
The cookie settings on this website are set to "allow cookies" to give you the best browsing experience possible. If you continue to use this website without changing your cookie settings or you click "Accept" below then you are consenting to this.