Tag Archives: Amazon Managed Workflows for Apache Airflow (Amazon MWAA)

How Amazon GTTS runs large-scale ETL jobs on AWS using Amazon MWAA

Post Syndicated from Louis Hourcade original https://aws.amazon.com/blogs/big-data/how-amazon-gtts-runs-large-scale-etl-jobs-on-aws-using-amazon-mwaa/

The Amazon Global Transportation Technology Services (GTTS) team owns a set of products called INSITE (Insights Into Transportation Everywhere). These products are user-facing applications that solve specific business problems across different transportation domains: network topology management, capacity management, and network monitoring. As of this writing, GTTS serves around 10,000 customers globally on a monthly basis, managing the outbound transportation network.

INSITE applications are in general data intensive. They ingest and transform large volumes of data in different formats and processing patterns (such as batch and near real time) from various sources internal and external to Amazon. Datasets are often shared between applications both within domains and across domains, and are consumed in complex data pipelines that run under tight SLAs. To enable and meet these requirements, GTTS built its own data platform.

A critical component of the data platform is the data pipeline orchestrator. GTTS built its own orchestrator named Langley in 2018, and used it to schedule and monitor extract, transform, and load (ETL) jobs on a variety of compute platforms, such as Amazon EMR, Amazon Redshift, Amazon Relational Database Service (Amazon RDS).

As the Langley user base grew, GTTS engineers faced a couple of challenges on key dimensions, such as maintainability, scalability, multi-tenancy, observability, and interoperability.

Amazon GTTS partnered with AWS Professional Services to modernize their orchestration platform, relying as much as possible on managed services with auto scaling capabilities. After analyzing candidate solutions, the team decided to build a target solution relying on Amazon Managed Workflows for Apache Airflow (Amazon MWAA). This post elaborates on the drivers of the migration and its achieved benefits.

Legacy platform

Amazon GTTS works with diverse and distributed data stores, storing petabytes of data. Data engineers need a tool to define ETL jobs which run on various compute environments, as illustrated in the following diagram.

Amazon GTTS orchestration platfrom - high-level diagram

GTTS built Langley as their custom orchestrator in 2018, and have been operating it ever since. At a high level, the core of Langley’s architecture is based on a set of Amazon Simple Queue Service (Amazon SQS) queues and AWS Lambda functions, and a dedicated RDS database to store ETL job data and metadata. It also uses AWS Data Pipeline to run SQL-based workloads, Amazon Simple Storage Service (Amazon S3) to store configuration files, and Amazon CloudWatch for alarming on failures. Every day, Langley handles the lifecycle of more than 17,000 ETL jobs in Europe and 5,000 ETL jobs in North America.

The following diagram illustrates the Langley architecture.

Langley architecture diagram

Business challenges

Langley started as a simple solution to a team-internal problem, but its growth over the years surfaced key issues:

  • The maintenance of this custom solution requires considerable time from engineers, which increased over the years with the release of new features, increasing the overall complexity.
  • The Langley user base grew continuously and eventually became a key orchestration platform for multiple teams and products across Amazon. However, it wasn’t created with multi-tenancy in mind and therefore it didn’t provide the robustness and the appropriate level of isolation to guard each tenant from impacting others on the shared platform.
  • In 2023, AWS announced the upcoming deprecation of Data Pipeline, one of the core services used by Langley.

GTTS partnered with AWS to design and implement a solution to overcome those challenges. AWS used the following evaluation matrix to build a durable solution:

Maintainability The level of effort required to maintain the orchestrating system in a functional state, encompassing updates, patches, bug fixes, and routine checks for optimal performance.
Costs The overall expenditure associated with the orchestrator, including infrastructure costs, licensing fees, personnel expenses, and other relevant costs. This criterion particularly assesses the system’s ability to effectively control and reduce costs.
Scheduling The capabilities related to running and scheduling jobs, including the ability to resume an ETL job from a failed step.
User experience The overall satisfaction and usability of a system from the end-users’ perspective, considering factors such as responsiveness, accessibility, interoperability, and ease of use.
Security Mechanisms in place to safeguard data and applications from unauthorized access at all times.
Monitoring and alerting The continuous observation and analysis of system components and performance metrics to detect and address issues, optimize resource usage, and provide overall health and reliability.
Scalability The orchestrator’s capacity to efficiently adapt its resources to handle increased workload or demand, providing sustained performance.

Among the explored solutions, Amazon MWAA was finally determined as the best overall performer across this matrix.

The next section is a dive deep into the rationales that led GTTS and AWS Professional Services to choose Amazon MWAA as the best performer.

Benefits of migrating to Amazon MWAA

Amazon GTTS and AWS Professional Services worked together to release a Minimum Viable Product (MVP) of the solution described earlier, which showcases the benefits on the agreed decision criteria.

Maintainability

With their legacy system, Amazon GTTS had to manage the orchestrator database, web servers, activity queue, dispatch functions, and worker nodes.

Amazon MWAA eliminates the need for underlying infrastructure management. It takes care of provisioning and maintenance of the Apache Airflow web server, scheduler, worker nodes, and relational database, allowing GTTS teams to focus on building their ETL jobs.

Amazon MWAA offers one-click updates of the infrastructure for minor versions, like moving from Airflow version x.4.z to x.5.z. During the upgrade process, Amazon MWAA captures a snapshot of your environment metadata; upgrades the workers, schedulers, and web server to the new Airflow version; and finally restores the metadata database using the snapshot, backing it with an automated rollback mechanism.

Costs

Amazon MWAA contributes to a more cost-effective solution by automatically scaling workers depending on the workload. This dynamic scaling in and out avoids over-provisioning and allows the organization to pay for the compute they actually use, without the risk of downtime during activity spikes. Because this is an AWS-managed solution, it also reduced GTTS’s Total Cost of Ownership (TCO) by freeing up time from engineers that were managing the legacy system.

Scheduling

Amazon MWAA supports all the trigger mechanisms that the Amazon orchestrator needed:

  • Manual trigger – The users can simply invoke a Direct Acyclic Graph (DAG) using the Airflow API or even more simply via the User Interface (UI).
  • Scheduler – A scheduler can be defined as code, together with the DAG definition, to make sure it will run at specific rates (from hourly to yearly) or on specific cron schedules.
  • Event-driven trigger – Airflow provides native operators that enable invoking a downstream DAG from another DAG or from a dataset update (push approach). It also includes sensors that listen for the completion of a task external to the DAG (pull approach).
  • Partial runs on DAG failures – Another key feature for GTTS was the possibility the recover from partial DAG failures without having to rerun the whole DAG. Airflow provides task-level controls that makes this operation straightforward to implement.

User experience

In this section, we discuss three aspects of the user experience: the web UI, the interoperability, and the programming interface.

Web UI

Amazon MWAA comes with a managed web server that hosts the Airflow UI. As a result, and without any maintenance needed, you can use it to quickly run DAGs, check run history, visualize dependencies between DAGs, troubleshoot with a direct access to task logs, manage variables and database connections, and define granular permissions. The following screenshot shows an example of the UI.

Amazon MWAA User Interface - console screenshot

Interoperability

One of the most important features evaluated was the ability for the new orchestrator to effortlessly integrate with GTTS multiple data storage services, compute components, and monitoring services.

Amazon MWAA comes with a wide variety of providers preinstalled, such as apache-airflow-providers-amazon, apache-airflow-providers-postgres, and apache-airflow-providers-common-sql. This allowed GTTS to connect with those services using multiple connection methodologies, including AWS IAM Identity Center or AWS Secrets Manager password-based authentications, without having to write a single custom Airflow operator.

Amazon MWAA also makes it straightforward to upgrade providers version and install new ones. By providing a requirements.txt file, GTTS was able to change the major version of apache-airflow-providers-amazon and install the apache-airflow-providers-mysql provider.

Programming interface

Airflow is an orchestrator with a low barrier to entry, especially for those familiar with the Python programming language. Its workflow management is defined in Python scripts, with a well-documented set of native operators and external providers, making it straightforward for Python developers to get started with Airflow and create complex data pipelines.

The following are two key Airflow features:

  • TaskFlow API – The TaskFlow API removes a lot of the boilerplate code required by traditional operators by using Python decorators while simplifying the DAG editing process DAG with cleaner and more concise DAG files.
  • Dynamic DAG generation – The dynamic DAG generation capability allowed us to generate DAGs from the original legacy orchestrator’s configuration files. This enabled the platform team to build a centralized framework consumed by multiple teams to keep the code DRY (Don’t Repeat Yourself), providing a seamless migration journey from the legacy orchestrator.

The following screenshot shows an example of these features.

Airflow dynamic DAG definition - code sample

Security

The new Amazon MWAA-based architecture improves GTTS’s posture by introducing granular access control. Amazon MWAA integrates with AWS services such as AWS Key Management Service (AWS KMS), Secrets Manager, and IAM Identity Center to keep data safely encrypted at all times, both at rest and in transit using TLS-based communications. Airflow also includes a role-based access control (RBAC) model to determine what users can do on the platform and enforce the principle of least privilege. Amazon MWAA also natively integrates with AWS CloudTrail for auditing purposes.

The Airflow RBAC model enables administrators to define roles with specific privileges to access Airflow system settings and DAGs themselves. This granular access control reduces the risk of data breaches and malicious activities by limiting access to critical DAGs and sensitive Airflow environment variables. Airflow includes five default roles with different sets of permissions (as shown in the following screenshot), but it is possible to create new roles depending on your security requirements.

Airflow roles - console screenshot

GTTS used the Airflow RBAC model to restrict permissions of certain teams and consumers of the application. They also used priority weights and Airflow pools to prioritize tasks and control run concurrency. However, if you want to run a multi-tenant orchestration platform, it’s recommended to use a separate environment for each team. You can assume that everything accessible by the Amazon MWAA role is also accessible to users who can write DAGs to the environment.

To ease authentication in Amazon MWAA, GTTS federated their identity provider (IdP) through Amazon Cognito and SAML. With this integration, users log in to the Amazon MWAA UI using the same identity as in other internal systems, which removes the need for new credentials. The user’s group membership is retrieved from the IdP through Amazon Cognito, and a Lambda function redirects the user to Amazon MWAA with the appropriate Airflow role. This process is illustrated in the following architecture, and is abstracted from the user and attached to a public Application Load Balancer that redirects at the end of the process to an Amazon MWAA private cluster, making the authentication workflow seamless and secure. Refer to Accessing a private Amazon MWAA environment using federated identities to implement it using your own IdP.

Amazon MWAA federation - architecture diagram

Monitoring and alerting

Amazon MWAA integrates with CloudWatch, which manages all infrastructure logs for you. When creating an Amazon MWAA environment, you can configure what level of logs should be saved. GTTS enabled CloudWatch logging for all of the five types of components: Airflow task logs, Airflow web server logs, Airflow scheduler logs, Airflow worker logs, and Airflow DAG processing logs.

Amazon MWAA logging configuration - console screenshot

These logs are all accessible in CloudWatch for continuous monitoring, but Amazon MWAA users can also access task logs directly from the Airflow UI by looking at the DAG run history. The following screenshot shows an example of task-level logs in Airflow 2.5.1.

Amazon MWAA task-level logs - console screenshot

You can also build CloudWatch monitoring dashboards to keep an eye on the state of your environment and alert administrators when required. Amazon MWAA natively provides Airflow environment metrics and Amazon MWAA infrastructure-related metrics.

Scalability

Each Amazon MWAA environment includes the schedulers, web server, and worker nodes. Scheduler nodes are responsible for the overall orchestration and parsing of DAG files. These tasks happen in worker nodes that Amazon MWAA auto scales up and down according to system load. When creating a new Amazon MWAA environment, you need to specify the type of worker nodes, the minimum and maximum number of worker nodes, and the scheduler count, as shown in the following screenshot.

Amazon MWAA environment classes - console screenshot

There are notably two ways GTTS controlled how Amazon MWAA scales to handle the load:

  • Minimum and maximum worker count – Amazon MWAA automatically adds or deletes workers within the boundaries you set, depending on the number of tasks that are waiting to be processed. As indicated in the AWS documentation, it is possible to request a quota increase to run up to 50 workers in a single environment.
  • Size of the node – Larger worker nodes can run more concurrent tasks. For example, mw1.small instances run 5 concurrent tasks by default, whereas mw1.large instances run 20 concurrent tasks by default. The following figure shows the specification for each instance type.

Amazon MWAA environment sizes - console screenshot

With Amazon MWAA, GTTS can therefore run up to 4,000 concurrent tasks in a single Amazon MWAA environment (50 worker nodes x 80 tasks per node with mw1.2xlarge). This remains an order of magnitude for the load that can fit into the workers vCPUs and RAM, but it is possible to edit the default configuration to add even more tasks per worker. For more information regarding Amazon MWAA automatic scaling, see Configuring Amazon MWAA automatic scaling.

The Amazon MWAA based orchestration platform

After selecting Amazon MWAA as the core service for their orchestrating system, Amazon GTTS and AWS worked together to develop an end-to-end data platform with automation capabilities, access management, monitoring, and integration with downstream systems. The following diagram illustrates the solution architecture.

MWAA-based platform - architecture diagram

The following are notable components of the architecture:

  1. DAG update – GTTS Developers manage the creation, update, and deletion of Amazon MWAA DAGs through a dedicated code repository. When a developer edits DAG definitions and commits changes to the code repository, a CI/CD pipeline automatically packages the DAG definition and stores it in Amazon S3, which automatically updates DAGs in Amazon MWAA.
  2. Infrastructure as code – The entire stack is defined as IaC with the AWS CDK, which eases the process of updating components, and makes it repeatable if GTTS wants to extend the solution and redeploy the stack in multiple AWS Regions.
  3. Authentication, authorizations, and Permissions – Permissions are centrally managed with AWS Identity and Access Management (IAM) together with Airflow roles. GTTS integrated their identity provider with Amazon Cognito and Amazon MWAA, so Amazon employees can connect to the Amazon MWAA UI with the same authentication tool they are used to, and see only the DAGs they are allowed to access.
  4. UI and DAG runs – Amazon MWAA includes an AWS-managed web server that exposes the Airflow UI. Amazon employees can connect to this UI to list DAGs, run DAGs, and track their status. In addition, GTTS used the native Amazon MWAA scheduler to automatically invoke DAGs at a specific time.
  5. Airflow workers – The users can use Airflow native providers to run custom Shell or Python code directly on the workers nodes. For compute-intensive jobs, the Amazon MWAA worker can delegate the compute to a more suitable AWS service, such as Apache Spark running on Amazon EMR on Amazon EKS, which will provide compute resources only for the duration of the job, helping in optimizing costs.
  6. Data stores and external computes services – Amazon MWAA comes also with the AWS provider preloaded, allowing a seamless connectivity with more than 23 AWS compute and data services. GTTS can extend the connectivity to other AWS or external services by using Boto3 with the PythonOperator or creating dedicated custom operators.
  7. Logging and alerting – Amazon MWAA is seamlessly integrated with CloudWatch and CloudTrail to publish DAG logs, audit logs, and metrics. This enables GTTS to track completion, troubleshoot, and create an automated alerting and notifications system so DAGs owners can take remediation actions as fast as possible.

Conclusion

Amazon GTTS partnered with AWS Professional Services to overcome the challenges faced by their legacy custom orchestrator against various dimensions such as maintainability, cost efficiency, security, scalability, and observability.

The new Amazon MWAA-based architecture offers significant improvements in the context of the AWS Well-Architected Framework compared to their former system. In terms of operational excellence, the new orchestration platform is built with evolutivity in mind and enables the GTTS team to use the most adapted ETL service to run their jobs. Regarding performance efficiency, GTTS observed up to 70% improvement in end-to-end runtime on their jobs running in Amazon MWAA. In terms of security, the new solution implements best practices such as the deployment in private subnets, authentication of users through Amazon internal federation systems, and data encryption at rest and in transit. Reliability is achieved with Multi-AZ failover and built-in auto scaling to meet the workload demand at all times. Finally, cost is reduced because Amazon MWAA is an AWS-managed service, which decreases the human effort from GTTS to maintain the orchestration platform.

Amazon GTTS is now bringing the MVP into production, where it is planned to handle petabytes of data and host more than 2,000 jobs migrated from the legacy system. Additionally, the migration to Amazon MWAA has empowered GTTS to enhance its operational scalability, paving the way for the integration of new jobs and further expansion with greater efficiency and confidence.

To learn more, refer to the following resources:


About the Authors

Béntor Bautista is a Senior Data Engineer at Amazon GTTS
Louis Hourcade is a Solutions Architect at AWS
Raphael Ducay is a Senior DataOps Architect at AWS
Konstantin Zarudaev is a DevOps Consultant at AWS
Dorra Elboukari is a DevOps Architect at AWS
Marcin Zapal is an Engagement Manager at AWS
Grigorios Pikoulas is a Strategic Program Lead at AWS
Antonio Cennamo is a Senior Customer Practice Manager at AWS

Integrate Amazon MWAA with Microsoft Entra ID using SAML authentication

Post Syndicated from Satya Chikkala original https://aws.amazon.com/blogs/big-data/integrate-amazon-mwaa-with-microsoft-entra-id-using-saml-authentication/

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) provides a fully managed solution for orchestrating and automating complex workflows in the cloud. Amazon MWAA offers two network access modes for accessing the Apache Airflow web UI in your environments: public and private. Customers often deploy Amazon MWAA in private mode and want to use existing login authentication mechanisms and single sign-on (SSO) features to have seamless integration with the corporate Active Directory (AD). Also, the end-users don’t need to log in to the AWS Management Console to access the Airflow UI.

In this post, we illustrate how to configure an Amazon MWAA environment deployed in private network access mode with customer managed VPC endpoints and authenticate users using SAML federated identity using Microsoft Entra ID and Application Load Balancer (ALB). Users can seamlessly log in to the Airflow UI with their corporate credentials and access the DAGs. This solution can be modified for Amazon MWAA public network access mode as well.

Solution overview

The architectural components involved in authenticating the Amazon MWAA environment using SAML SSO are depicted in the following diagram. The infrastructure components include two public subnets and three private subnets. The public subnets are required for the internet-facing ALB. Two private subnets are used to set up the Amazon MWAA environment, and the third private subnet is used to host the AWS Lambda authorizer function. This subnet will have a NAT gateway attached to it, because the function needs to verify the signer to confirm the JWT header has the expected LoadBalancer ARN.

The workflow consists of the following steps:

  1. For SAML configuration, Microsoft Entra ID serves as the identity provider (IdP).
  2. Amazon Cognito serves as the service provider (SP).
  3. ALB has built-in support for Amazon Cognito and authenticates requests.
  4. Post-authentication, ALB forwards the requests to the Lambda authorizer function. The Lambda function decodes the user’s JWT token and validates whether the user’s AD group is mapped to the relevant AWS Identity and Access Management (IAM) role.
  5. If valid, the function creates a web login token and redirects to the Amazon MWAA environment for successful login.

The following are the high-level steps to deploy the solution:

  1. Create an Amazon Simple Storage Service (Amazon S3) bucket for artifacts.
  2. Create an SSL certificate and upload it to AWS Certificate Manager (ACM).
  3. Deploy the Amazon MWAA infrastructure stack using AWS CloudFormation.
  4. Configure Microsoft Entra ID services and integrate the Amazon Cognito user pool.
  5. Deploy the ALB CloudFormation stack.
  6. Log in to Amazon MWAA using Microsoft Entra ID user credentials.

Prerequisites

Before you get started, make sure you have the following prerequisites:

  • An AWS account
  • Appropriate IAM permissions to deploy AWS CloudFormation stack resources
  • A Microsoft Azure account is required for creating the Microsoft Entra ID app (IdP config) and Microsoft Entra ID P2.
  • A public certificate for the ALB in the AWS Region where the infrastructure is being deployed and a custom domain name relevant to the certificate.

Create an S3 bucket

In this step, we create an S3 bucket to store your Airflow DAGs, custom plugins in a plugins.zip file, and Python dependencies in a requirements.txt file. This bucket is used by the Amazon MWAA environment to fetch DAGs and dependency files.

  1. On the Amazon S3 console, choose the Region where you want to create a bucket.
  2. In the navigation pane, choose Buckets.
  3. Choose Create bucket.
  4. For Bucket type, select General purpose.
  5. For Bucket name, enter a name for your bucket (for this post, mwaa-sso-blog-<your-aws-account-number>).
  6. Choose Create bucket. 

  7. Navigate to the bucket and choose Create folder.
  8. For Folder name, enter a name (for this post, we name the folder dags).
  9. Choose Create folder.


Import certificates into ACM

ACM is integrated with Elastic Load Balancing (ALB). In this step,  you can request a public certificate using ACM or import a certificate into ACM. To import organization certificates linked to a custom DNS into ACM, you must provide the certificate and its private key. To import a certificate signed by a non-AWS Certificate Authority (CA), you must also include the private and public keys of the certificate.

  1. On the ACM console, choose Import certificate in the navigation pane.
  2. For Certificate body, enter the contents of the cert.pem file.
  3. For Certificate private key, enter the contents of the privatekey.pem file.
  4. Choose Next.


  5. Choose Review and import.
  6. Review the metadata about your certificate and choose Import.

After the import is successful, the status of the imported certificate will show as Issued.

Create the Azure AD service, users, groups, and enterprise application

For the SSO integration with Azure, an enterprise application is required, which acts as the IdP for the SAML flow. We add relevant users and groups to the application and configure the SP (Amazon Cognito) details.

Airflow comes with five default roles: Public, Admin, Op, User, Viewer. In this post, we focus on three: Admin , User and Viewer. We create three roles and three corresponding users and assign memberships appropriately.

  1. Log in to the Azure portal.
  2. Navigate to Enterprise applications and choose New application.

  3. Enter a name for your application (for example, mwaa-environment) and choose Create.



    You can now view the details of your application.


    Now you create two groups.

  4. In the search bar, search for Microsoft Entra ID.

  5. On the Add menu, choose Group.

  6. For Group type, choose a type (for this post, Security).
  7. Enter a group name (for example, airflow-admins) and description.
  8. Choose Create.


  9. Repeat these steps to create two more groups, named airflow-users and airflow-viewers.
  10. Note the object IDs for each group (these are required in a later step).


    Next, you create users.
  11. On the Overview page, on the Add menu, choose User and Create new user.
  12. Enter a name for your user (for example, mwaa-user), display name, and password.
  13. Choose Review + create.


  14. Repeat these steps to create a user called mwaa-admin.
  15. In your airflow-users group details page, choose Members in the navigation pane.
  16. Choose Add members.
  17. Search for and select the users you created and choose Select.


  18. Repeat these steps to add the users to each group.

  19. Navigate to your application and choose Assign users and groups.

  20. Choose Add user/group.

  21. Search for and select the groups you created, then choose Select.

 

Deploy the Amazon MWAA environment stack

For this solution, we provide two CloudFormation templates that set up the services illustrated in the architecture. Deploying the CloudFormation stacks in your account incurs AWS usage charges.

The first CloudFormation stack creates the following resources:
  • A VPC with two public subnets and three private subnets and relevant route tables, NAT gateway, internet gateway, and security group
  • VPC endpoints required for the Amazon MWAA environment
  • An Amazon Cognito user pool and user pool domain
  • Application Load Balancer
Deploy the stack by completing the following steps:
  1. Choose Launch Stack to launch the CloudFormation stack.

  2. For Stack name, enter a name (for example, sso-blog-mwaa-infra-stack).

  3.  Enter the following parameters:

    1. For MWAAEnvironmentName, enter the environment name.

    2. For MwaaS3Bucket, enter the S3 artifacts bucket you created.

    3. For VpcCIDR, enter the specify IP range (CIDR notation) for this VPC.

    4. For PrivateSubnet1CIDR, enter the IP range (CIDR notation) for the private subnet in the first Availability Zone.

    5.  For PrivateSubnet2CIDR, enter the IP range (CIDR notation) for the private subnet in the second Availability Zone.

    6. For PrivateSubnet3CIDR, enter the IP range (CIDR notation) for the private subnet in the third Availability Zone.

    7. For PublicSubnet1CIDR, enter the IP range (CIDR notation) for the public subnet in the first Availability Zone.

    8. For PublicSubnet2CIDR, enter the IP range (CIDR notation) for the public subnet in the second Availability Zone.

  4. Choose Next

  5. Review the template and choose Create stack.

After the stack is deployed successfully, you can view the resources on the stack’s Outputs tab on the AWS CloudFormation console. Note the ALB URL, Amazon Cognito user pool ID, and domain.

 

Integrate the Amazon MWAA application with the Azure enterprise application

Next, you configure the SAML configuration in the enterprise application by adding the SP details and redirect URLs (in this case, the Amazon Cognito details and ALB URL).

  1. In the Azure portal, navigate to your environment.
  2. Choose Set up single sign on.
  3. For Identifier, enter urn:amazon:cognito:sp:<your cognito user_id>.
  4. For Reply URL, enter https://<Your user pool domain>/saml2/idpresponse.
  5. For Sign on URL, enter https://<Your application load balancer DNS>.
  6. In the Attributes & Claims section, choose Add a group claim.
  7. Select Security groups.
  8. For Source attribute, choose Group ID.
  9. Choose Save.
  10. Note the values for App Federation Metadata Url and Login URL.


Deploy the ALB stack

When the SAML configuration is complete on the Azure end, the IdP details have to be configured in Amazon Cognito. When users access the ALB URL, they will be authenticated against the corporate identity using SAML through Amazon Cognito. After they’re authenticated, they’re redirected to the Lambda function for authorization against the group they belong to. The user’s group is then validated against matching IAM role. If it’s valid, the Lambda function adds the web login token to the URL, and the user will gain access to the Amazon MWAA environment.

This CloudFormation stack creates the following resources:

  • Two target groups: the Lambda target group and Amazon MWAA target group
  • Listener rules for the ALB to redirect URL requests to the relevant target groups
  • A user pool client and SAML provider (Azure) details to the Amazon Cognito user pool
  • IAM roles for Admin, User, and Viewer personas required for Airflow
  • The Lambda authorizer function to validate the JWT token and map Azure groups to IAM roles for appropriate Airflow UI access

Deploy the stack by completing the following steps:

  1. Choose Launch Stack to launch the CloudFormation stack:
  2. For Stack name, enter a name (for example, sso-blog-mwaa-alb-stack).

  3. Enter the following parameters:

    1. For MWAAEnvironmentName, enter your environment name.

    2. For ALBCertificateArn, enter the certificate ARN required for ALB. 

    3. For AzureAdminGroupID, enter the group name for the Azure Admin persona.

    4. For AzureUserGroupID, enter the group name for the Azure User persona.

    5. For AzureViewerGroupID, enter the group name for the Azure Viewer persona.

    6. For EntraIDLoginURL, enter the Azure IdP URI.

    7. For AppFederationMetadataURL, enter the URL of the metadata file for the SAML provider. 

  4. Choose Next.

  5. Review the template and choose Create stack.

Test the solution

Now that the SAML configuration and relevant AWS services are created, it’s time to access the Amazon MWAA environment.

  1. Open your web browser and enter the ALB DNS name.
    The SP initiates the sign-in request process and the browser redirects you to the Microsoft login page for credentials.
  2. Enter the Admin user credentials.

    The SAML request sign-in process completes and the SAML response is redirected to the Amazon Cognito user pool attached to the ALB.

    The listener rules will validate the query URL and pass the requests to the Lambda authorizer to validate the JWT and assign the appropriate group (Azure) to role (AWS) mapping.


  3. Repeat the steps to log in with User and Viewer credentials and observe the differences in access.

Clean up

When you’re done experimenting with this solution, it’s essential to clean up your resources to avoid incurring AWS charges.

  1. On the AWS CloudFormation console, delete the stacks you created.
  2. Remove the SSM parameters and private webserver and database VPC endpoints (created by the Lambda events function):
    aws ssm delete-parameters --names "MyFirstParameter" "MySecondParameter"
    aws ec2 delete-vpc-endpoints --vpc-endpoint-ids "Endpoint1" "Endpoint2"

  3. Delete the users, groups, and enterprise application in the Azure environment.

Conclusion

In this post, we demonstrated how to integrate Amazon MWAA with organization Azure AD services. We walked through the solution that solves this problem using infrastructure as code. This solution allows different end-user personas in your organization to access the Amazon MWAA Airflow UI using SAML SSO.

For additional details and code examples for Amazon MWAA, visit the Amazon MWAA User Guide and the Amazon MWAA examples GitHub repo.


About the Authors

Satya Chikkala is a Solutions Architect at Amazon Web Services. Based in Melbourne, Australia, he works closely with enterprise customers to accelerate their cloud journey. Beyond work, he is very passionate about nature and photography.

Vijay Velpula is a Data Lake Architect with AWS Professional Services. He assists customers in building modern data platforms by implementing big data and analytics solutions. Outside of his professional responsibilities, Velpula enjoys spending quality time with his family, as well as indulging in travel, hiking, and biking activities.

Migrate workloads from AWS Data Pipeline

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/migrate-workloads-from-aws-data-pipeline/

AWS Data Pipeline helps customers automate the movement and transformation of data. With Data Pipeline, customers can define data-driven workflows, so that tasks can be dependent on the successful completion of previous tasks. Launched in 2012, Data Pipeline predates several popular Amazon Web Services (AWS) offerings for orchestrating data pipelines such as AWS Glue, AWS Step Functions, and Amazon Managed Workflows for Apache Airflow (Amazon MWAA).

Data Pipeline has been a foundational service for getting customer off the ground for their extract, transform, load (ETL) and infra provisioning use cases. Some customers want a deeper level of control and specificity than possible using Data Pipeline. With the recent advancements in the data industry, customers are looking for a more feature-rich platform to modernize their data pipelines to get them ready for data and machine learning (ML) innovation. This post explains how to migrate from Data Pipeline to alternate AWS services to serve the growing needs of data practitioners. The option you choose depends on your current workload on Data Pipeline. You can migrate typical use cases of Data Pipeline to AWS Glue, Step Functions, or Amazon MWAA.

Note that you will need to modify the configurations and code in the examples provided in this post based on your requirements. Before starting any production workloads after migration, you need to test your new workflows to ensure no disruption to production systems.

Migrating workloads to AWS Glue

AWS Glue is a serverless data integration service that helps analytics users to discover, prepare, move, and integrate data from multiple sources. It includes tooling for authoring, running jobs, and orchestrating workflows. With AWS Glue, you can discover and connect to hundreds of different data sources and manage your data in a centralized data catalog. You can visually create, run, and monitor ETL pipelines to load data into your data lakes. Also, you can immediately search and query cataloged data using Amazon Athena, Amazon EMR, and Amazon Redshift Spectrum.

We recommend migrating your Data Pipeline workload to AWS Glue when:

  • You’re looking for a serverless data integration service that supports various data sources, authoring interfaces including visual editors and notebooks, and advanced data management capabilities such as data quality and sensitive data detection.
  • Your workload can be migrated to AWS Glue workflows, jobs (in Python or Apache Spark) and crawlers (for example, your existing pipeline is built on top of Apache Spark).
  • You need a single platform that can handle all aspects of your data pipeline, including ingestion, processing, transfer, integrity testing, and quality checks.
  • Your existing pipeline was created from a pre-defined template on the AWS Management Console for Data Pipeline, such as exporting a DynamoDB table to Amazon S3, or importing DynamoDB backup data from S3, and you’re looking for the same template.
  • Your workload doesn’t depend on a specific Hadoop ecosystem application such as Apache Hive.
  • Your workload doesn’t require orchestrating on-premises servers, user-managed Amazon Elastic Compute Cloude (Amazon EC2) instances, or a user-managed Amazon EMR cluster.

Example: Migrate EmrActivity on EmrCluster to export DynamoDB tables to S3

One of the most common workloads on Data Pipeline is to backup Amazon DynamoDB tables to Amazon Simple Storage Service (Amazon S3). Data Pipeline has a pre-defined template named Export DynamoDB table to S3 to export DynamoDB table data to a given S3 bucket.

The template uses EmrActivity (named TableBackupActivity) which runs on EmrCluster (named EmrClusterForBackup) and backs up data on DynamoDBDataNode to S3DataNode.

You can migrate these pipelines to AWS Glue because it natively supports reading from DynamoDB.

To define an AWS Glue job for the preceding use case:

  1. Open the AWS Glue console.
  2. Choose ETL jobs.
  3. Choose Visual ETL.
  4. For Sources, select Amazon DynamoDB.
  5. On the node Data source - DynamoDB, for DynamoDB source, select Choose the DynamoDB table directly, then select your source DynamoDB table from the menu.
  6. For Connection options, enter s3.bucket and dynamodb.s3.prefix.
  7. Choose + (plus) to add a new node.
  8. For Targets, select Amazon S3.
  9. On the node Data target - S3 bucket, for Format, select your preferred format, for example, Parquet.
  10. For S3 Target location, enter your destination S3 path.
  11. On Job details tab, select IAM role. In case you do not have the IAM role, follow Configuring IAM permissions for AWS Glue.
  12. Choose Save and Run.

Your AWS Glue job has been successfully created and started.

You might notice that there is no property to manage read I/O rate. It’s because the default DynamoDB reader used in Glue Studio does not scan the source DynamoDB table. Instead it uses DynamoDB export.

Example: Migrate EmrActivity on EmrCluster to import DynamoDB from S3

Another common workload on Data Pipeline is to restore DynamoDB tables using backup data on Amazon S3. Data Pipeline has a pre-defined template named Import DynamoDB backup data from S3 to import DynamoDB table data from a given S3 bucket.

The template uses EmrActivity (named TableLoadActivity) which runs on EmrCluster (named EmrClusterForLoad) and loads data from S3DataNode to DynamoDBDataNode.

You can migrate these pipelines to AWS Glue because it natively supports writing to DynamoDB.

Prerequisites are to create a destination DynamoDB table and catalog it on Glue Data Catalog using Glue crawler, Glue console, or the API.

  1. Open the AWS Glue console.
  2. Choose ETL jobs.
  3. Choose Visual ETL.
  4. For Sources, select Amazon S3.
  5. On the node Data source - S3 bucket, for S3 URL, enter your S3 path.
  6. Choose + (plus) to add a new node.
  7. For Targets, select AWS Glue Data Catalog.
  8. On the node Data target - Data Catalog, for Database, select your destination database on Data Catalog.
  9. For Table, select your destination table on Data Catalog.
  10. On Job details tab, select IAM role. In case you do not have the IAM role, follow Configuring IAM permissions for AWS Glue.
  11. Choose Save and Run.

Your AWS Glue job has been successfully created and started.

Migrating workloads to Step Functions

AWS Step Functions is a serverless orchestration service that lets you build workflows for your business-critical applications. With Step Functions, you use a visual editor to build workflows and integrate directly with over 11,000 actions for over 250 AWS services, including AWS Lambda, Amazon EMR, DynamoDB, and more. You can use Step Functions for orchestrating data processing pipelines, handling errors, and working with the throttling limits on the underlying AWS services. You can create workflows that process and publish machine learning models, orchestrate micro-services, as well as control AWS services, such as AWS Glue, to create ETL workflows. You also can create long-running, automated workflows for applications that require human interaction.

We recommend migrating your Data Pipeline workload to Step Functions when:

  • You’re looking for a serverless, highly available workflow orchestration service.
  • You’re looking for a cost-effective solution that charges at single-task granularity.
  • Your workloads are orchestrating tasks for multiple AWS services, such as Amazon EMR, AWS Lambda, AWS Glue, or DynamoDB.
  • You’re looking for a low-code solution that comes with a drag-and-drop visual designer for workflow creation and doesn’t require learning new programming concepts.
  • You’re looking for a service that provides integrations with over 250 AWS services covering over 11,000 actions out-of-the-box, as well as allowing integrations with custom non-AWS services and activities.
  • Both Data Pipeline and Step Functions use JSON format to define workflows. This allows you to store your workflows in source control, manage versions, control access, and automate with continuous integration and development (CI/CD). Step Functions use a syntax called Amazon State Language, which is fully based on JSON and allows a seamless transition between the textual and visual representations of the workflow.
  • Your workload requires orchestrating on-premises servers, user-managed EC2 instances, or a user-managed EMR cluster.

With Step Functions, you can choose the same version of Amazon EMR that you’re currently using in Data Pipeline.

For migrating activities on Data Pipeline managed resources, you can use AWS SDK service integration on Step Functions to automate resource provisioning and cleaning up. For migrating activities on on-premises servers, user-managed EC2 instances, or a user-managed EMR cluster, you can install an SSM agent to the instance. You can initiate the command through the AWS Systems Manager Run Command from Step Functions. You can also initiate the state machine from the schedule defined in Amazon EventBridge.

Example: Migrate HadoopActivity on EmrCluster

To migrate HadoopActivity on EmrCluster on Data Pipeline to Step Functions:

  1. Open the AWS Step Functions console.
  2. Choose State machines.
  3. Choose Create state machine.
  4. In the Choose a template wizard, search for emr, select Manage an EMR job, and choose Select.
  1. For Choose how to use this template, select Build on it.
  2. Choose Use template.
  1. For Create an EMR cluster state, configure API Parameters based on the EMR release label, EMR capacity, IAM role, and so on based on the existing EmrClusternode configuration on Data Pipeline.
  1. For Run first step state, configure API Parameters based on the JAR file and arguments based on the existing HadoopActivity node configuration on Data Pipeline.
  2. If you have further activities configured on the existing HadoopActivity, repeat step 8.
  3. Choose Create.

Your state machine has been successfully configured. Learn more in Manage an Amazon EMR Job.

Migrating workloads to Amazon MWAA

Amazon MWAA is a managed orchestration service for Apache Airflow that lets you use the Apache Airflow platform to set up and operate end-to-end data pipelines in the cloud at scale. Apache Airflow is an open-source tool used to programmatically author, schedule, and monitor sequences of processes and tasks referred to as workflows. Apache Airflow brings in new concepts like executors, pools, and SLAs that provide you with superior data orchestration capabilities. With Amazon MWAA, you can use Airflow and Python programming language to create workflows without having to manage the underlying infrastructure for scalability, availability, and security. Amazon MWAA automatically scales its workflow runtime capacity to meet your needs and is integrated with AWS security services to help provide you with fast and secure access to your data.

We recommend migrating your Data Pipeline workloads to Amazon MWAA when:

  • You’re looking for a managed, highly available service to orchestrate workflows written in Python.
  • You want to transition to a fully managed, widely adopted open source technology—Apache Airflow—for maximum portability.
  • You require a single platform that can handle all aspects of your data pipeline, including ingestion, processing, transfer, integrity testing, and quality checks.
  • You’re looking for a service designed for data pipeline orchestration with features such as rich UI for observability, restarts for failed workflows, backfills, retries for tasks, and lineage support with OpenLineage.
  • You’re looking for a service that comes with more than 1,000 pre-built operators and sensors, covering AWS as well as non-AWS services.
  • Your workload requires orchestrating on-premises servers, user-managed EC2 instances, or a user-managed EMR cluster.

Amazon MWAA workflows are defined as directed acyclic graphs (DAGs) using Python, so you can also treat them as source code. Airflow’s extensible Python framework enables you to build workflows connecting with virtually any technology. It comes with a rich user interface for viewing and monitoring workflows and can be easily integrated with version control systems to automate the CI/CD process. With Amazon MWAA, you can choose the same version of Amazon EMR that you’re currently using in Data Pipeline.

Example: Migrate HadoopActivity on EmrCluster

Complete the following steps in case you do not have existing MWAA environments:

  1. Create an AWS CloudFormation template on your computer by copying the template from the quick start guide into a local text file.
  2. On the CloudFormation console, choose Stacks in the navigation pane.
  3. Choose Create stack with the option With new resources (standard).
  4. Choose Upload a template file and select the local template file.
  5. Choose Next.
  6. Complete the setup steps, entering a name for the environment, and leave the rest of the parameters as default.
  7. On the last step, acknowledge that resources will be created and choose Submit.

The creation can take 20–30 minutes, until the status of the stack changes to CREATE_COMPLETE. The resource that will take the most time is the Airflow environment. While it’s being created, you can continue with the following steps, until you’re required to open the Airflow UI.

An Airflow workflow is based on a DAG, which is defined by a Python file that programmatically specifies the different tasks involved and its interdependencies. Complete the following scripts to create the DAG:

  1. Create a local file named emr_dag.py using a text editor with following snippets, and configure the EMR related parameters based on the existing Data Pipeline definition:
    from airflow import DAG
    from airflow.providers.amazon.aws.operators.emr import (
        EmrCreateJobFlowOperator,
        EmrAddStepsOperator,
    )
    from airflow.providers.amazon.aws.sensors.emr import EmrStepSensor
    from airflow.utils.dates import days_ago
    from datetime import timedelta
    import os
    DAG_ID = os.path.basename(__file__).replace(".py", "")
    SPARK_STEPS = [
        {
            'Name': 'calculate_pi',
            'ActionOnFailure': 'CONTINUE',
            'HadoopJarStep': {
                'Jar': 'command-runner.jar',
                'Args': ['spark-example', 'SparkPi', '10'],
            },
        }
    ]
    JOB_FLOW_OVERRIDES = {
        'Name': 'my-demo-cluster',
        'ReleaseLabel': 'emr-6.1.0',
        'Applications': [
            {
                'Name': 'Spark'
            },
        ],
        'Instances': {
            'InstanceGroups': [
                {
                    'Name': "Master nodes",
                    'Market': 'ON_DEMAND',
                    'InstanceRole': 'MASTER',
                    'InstanceType': 'm5.xlarge',
                    'InstanceCount': 1,
                },
                {
                    'Name': "Slave nodes",
                    'Market': 'ON_DEMAND',
                    'InstanceRole': 'CORE',
                    'InstanceType': 'm5.xlarge',
                    'InstanceCount': 2,
                }
            ],
            'KeepJobFlowAliveWhenNoSteps': False,
            'TerminationProtected': False,
        },
        'VisibleToAllUsers': True,
        'JobFlowRole': 'EMR_EC2_DefaultRole',
        'ServiceRole': 'EMR_DefaultRole'
    }
    with DAG(
        dag_id=DAG_ID,
        start_date=days_ago(1),
        schedule_interval='@once',
        dagrun_timeout=timedelta(hours=2),
        catchup=False,
        tags=['emr'],
    ) as dag:
        cluster_creator = EmrCreateJobFlowOperator(
            task_id='create_job_flow',
            job_flow_overrides=JOB_FLOW_OVERRIDES,
            aws_conn_id='aws_default',
        )
        step_adder = EmrAddStepsOperator(
            task_id='add_steps',
            job_flow_id=cluster_creator.output,
            aws_conn_id='aws_default',
            steps=SPARK_STEPS,
        )
        step_checker = EmrStepSensor(
            task_id='watch_step',
            job_flow_id=cluster_creator.output,
            step_id="{{ task_instance.xcom_pull(task_ids='add_steps')[0] }}",
            aws_conn_id='aws_default',
        )
        cluster_creator >> step_adder >> step_checker

Defining the schedule in Amazon MWAA is as simple as updating the schedule_interval parameter for the DAG. For example, to run the DAG daily, set schedule_interval='@daily'.

Now, you create a workflow that invokes the Amazon EMR step you just created:

  1. On the Amazon S3 console, locate the bucket created by the CloudFormation template, which will have a name starting with the name of the stack followed by -environmentbucket- (for example, myairflowstack-environmentbucket-ap1qks3nvvr4).
  2. Inside that bucket, create a folder called dags, and inside that folder, upload the DAG file emr_dag.py that you created in the previous section.
  3. On the Amazon MWAA console, navigate to the environment you deployed with the CloudFormation stack.

If the status is not yet Available, wait until it reaches that state. It shouldn’t take longer than 30 minutes after you deployed the CloudFormation stack.

  1. Choose the environment link on the table to see the environment details.

It’s configured to pick up DAGs from the bucket and folder you used in the previous steps. Airflow will monitor that folder for changes.

  1. Choose Open Airflow UI to open a new tab accessing the Airflow UI, using the integrated IAM security to sign you in.

If there are issues with the DAG file you created, it will display an error on top of the page indicating the lines affected. In that case, review the steps and upload again. After a few seconds, it will parse it and update or remove the error banner.

Clean up

After you migrate your existing Data Pipeline workload and verify that the migration was successful, delete your pipelines in Data Pipeline to stop further runs and billing.

Conclusion

In this blog post, we outlined a few alternate AWS services for migrating your existing Data Pipeline workloads. You can migrate to AWS Glue to run and orchestrate Apache Spark applications, AWS Step Functions to orchestrate workflows involving various other AWS services, or Amazon MWAA to help manage workflow orchestration using Apache Airflow. By migrating, you will be able to run your workloads with a broader range of data integration functionalities. If you have additional questions, post in the comments or read about migration examples in our documentation.


About the authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team and AWS Data Pipeline team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Vaibhav Porwal is a Senior Software Development Engineer on the AWS Glue and AWS Data Pipeline team. He is working on solving problems in orchestration space by building low cost, repeatable, scalable workflow systems that enables customers to create their ETL pipelines seamlessly.

Sriram Ramarathnam is a Software Development Manager on the AWS Glue and AWS Data Pipeline team. His team works on solving challenging distributed systems problems for data integration across AWS serverless and serverfull compute offerings.

Matt Su is a Senior Product Manager on the AWS Glue team and AWS Data Pipeline team. He enjoys helping customers uncover insights and make better decisions using their data with AWS Analytics services. In his spare time, he enjoys skiing and gardening.

Introducing Amazon MWAA support for Apache Airflow version 2.9.2

Post Syndicated from Hernan Garcia original https://aws.amazon.com/blogs/big-data/introducing-amazon-mwaa-support-for-apache-airflow-version-2-9-2/

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow that significantly improves security and availability, and reduces infrastructure management overhead when setting up and operating end-to-end data pipelines in the cloud.

Today, we are announcing the availability of Apache Airflow version 2.9.2 environments on Amazon MWAA. Apache Airflow 2.9.2 introduces several notable enhancements, such as new API endpoints for improved dataset management, advanced scheduling options including conditional expressions for dataset dependencies, the combination of dataset and time-based schedules, and custom names in dynamic task mapping for better readability of your DAGs.

In this post, we walk you through some of these new features and capabilities, how you can use them, and how you can set up or upgrade your Amazon MWAA environments to Airflow 2.9.2.

With each new version release, the Apache Airflow community is innovating to make Airflow more data-aware, enabling you to build reactive, event-driven workflows that can accommodate changes in datasets, either between Airflow environments or in external systems. Let’s go through some of these new capabilities.

Logical operators and conditional expressions for DAG scheduling

Prior to the introduction of this capability, users faced significant limitations when working with complex scheduling scenarios involving multiple datasets. Airflow’s scheduling capabilities were restricted to logical AND combinations of datasets, meaning that a DAG run would only be created after all specified datasets were updated since the last run. This rigid approach posed challenges for workflows that required more nuanced triggering conditions, such as running a DAG when any one of several datasets was updated or when specific combinations of dataset updates occurred.

With the release of Airflow 2.9.2, you can now use logical operators (AND and OR) and conditional expressions to define intricate scheduling conditions based on dataset updates. This feature allows for granular control over workflow triggers, enabling DAGs to be scheduled whenever a specific dataset or combination of datasets is updated.

For example, in the financial services industry, a risk management process might need to be run whenever trading data from any regional market is refreshed, or when both trading and regulatory updates are available. The new scheduling capabilities available in Amazon MWAA allow you to express such complex logic using simple expressions. The following diagram illustrates the dependency we need to establish.

The following DAG code contains the logical operations to implement these dependencies:

from airflow.decorators import dag, task
from airflow.datasets import Dataset
from pendulum import datetime

trading_data_asia = Dataset("s3://trading/asia/data.parquet")
trading_data_europe = Dataset("s3://trading/europe/data.parquet")
trading_data_americas = Dataset("s3://trading/americas/data.parquet")
regulatory_updates = Dataset("s3://regulators/updates.json")

@dag(
    dag_id='risk_management_trading_data',
    start_date=datetime(2023, 5, 1),
    schedule=((trading_data_asia | trading_data_europe | trading_data_americas) & regulatory_updates),
    catchup=False
)
def risk_management_pipeline():
    @task
    def risk_analysis():
        # Task for risk analysis
        ...

    @task
    def reporting():
        # Task for reporting
        ...

    @task
    def notifications():
        # Task for notifications
        ...

    analysis = risk_analysis()
    report = reporting()
    notify = notifications()

risk_management_pipeline()

To learn more about this feature, refer to Logical operators for datasets in the Airflow documentation.

Combining dataset and time-based schedules

With Airflow 2.9.2 environments, Amazon MWAA now has a more comprehensive scheduling mechanism that combines the flexibility of data-driven execution with the consistency of time-based schedules.

Consider a scenario where your team is responsible for managing a data pipeline that generates daily sales reports. This pipeline relies on data from multiple sources. Although it’s essential to generate these sales reports on a daily basis to provide timely insights to business stakeholders, you also need to make sure the reports are up to date and reflect important data changes as soon as possible. For instance, if there’s a significant influx of orders during a promotional campaign, or if inventory levels change unexpectedly, the report should incorporate these updates to maintain relevance.

Relying solely on time-based scheduling for this type of data pipeline could lead to potential issues such as outdated information and infrastructure resource wastage.

The DatasetOrTimeSchedule feature introduced in Airflow 2.9 adds the capability to combine conditional dataset expressions with time-based schedules. This means that your workflow can be invoked not only at predefined intervals but also whenever there are updates to the specified datasets, with the specific dependency relationship among them. The following diagram illustrates how you can use this capability to accommodate such scenarios.

See the following DAG code for an example implementation:

from airflow.decorators import dag, task
from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable
from airflow.datasets import Dataset
from datetime import datetime

# Define datasets
orders_dataset = Dataset("s3://path/to/orders/data")
inventory_dataset = Dataset("s3://path/to/inventory/data")
customer_dataset = Dataset("s3://path/to/customer/data")

# Combine datasets using logical operators
combined_dataset = (orders_dataset & inventory_dataset) | customer_dataset

@dag(
    dag_id="dataset_time_scheduling",
    start_date=datetime(2024, 1, 1),
    schedule=DatasetOrTimeSchedule(
        timetable=CronTriggerTimetable("0 0 * * *", timezone="UTC"),  # Daily at midnight
        datasets=combined_dataset
    ),
    catchup=False,
)
def dataset_time_scheduling_pipeline():
    @task
    def process_orders():
        # Task logic for processing orders
        pass

    @task
    def update_inventory():
        # Task logic for updating inventory
        pass

    @task
    def update_customer_data():
        # Task logic for updating customer data
        pass

    orders_task = process_orders()
    inventory_task = update_inventory()
    customer_task = update_customer_data()

dataset_time_scheduling_pipeline()

In the example, the DAG will be run under two conditions:

  • When the time-based schedule is met (daily at midnight UTC)
  • When the combined dataset condition is met, when there are updates to both orders and inventory data, or when there are updates to customer data, regardless of the other datasets

This flexibility enables you to create sophisticated scheduling rules that cater to the unique requirements of your data pipelines, so they run when necessary and incorporate the latest data updates from multiple sources.

For more details on data-aware scheduling, refer to Data-aware scheduling in the Airflow documentation.

Dataset event REST API endpoints

Prior to the introduction of this feature, making your Airflow environment aware of changes to datasets in external systems was a challenge—there was no option to mark a dataset as externally updated. With the new dataset event endpoints feature, you can programmatically initiate dataset-related events. The REST API has endpoints to create, list, and delete dataset events.

This capability enables external systems and applications to seamlessly integrate and interact with your Amazon MWAA environment. It significantly improves your ability to expand your data pipeline’s capacity for dynamic data management.

As an example, running the following code from an external system allows you to invoke a dataset event in the target Amazon MWAA environment. This event could then be handled by downstream processes or workflows, enabling greater connectivity and responsiveness in data-driven workflows that rely on timely data updates and interactions.

curl -X POST <https://{web_server_host_name}>/api/v1/datasets/events \
   -H 'Content-Type: application/json' \
   -d '{"dataset_uri": "s3://bucket_name/bucket_key", "extra": { }}'

The following diagram illustrates how the different components in the scenario interact with each other.

To get more details on how to use the Airflow REST API in Amazon MWAA, refer to Introducing Amazon MWAA support for the Airflow REST API and web server auto scaling. To learn more about the dataset event REST API endpoints, refer to Dataset UI Enhancements in the Airflow documentation.

Airflow 2.9.2 also includes features to ease the operation and monitoring of your environments. Let’s explore some of these new capabilities.

Dag auto-pausing

Customers are using Amazon MWAA to build complex data pipelines with multiple interconnected tasks and dependencies. When one of these pipelines encounters an issue or failure, it can result in a cascade of unnecessary and redundant task runs, leading to wasted resources. This problem is particularly prevalent in scenarios where pipelines run at frequent intervals, such as hourly or daily. A common scenario is a critical pipeline that starts failing during the evening, and due to the failure, it continues to run and fails repeatedly until someone manually intervenes the next morning. This can result in dozens of unnecessary tasks, consuming valuable compute resources and potentially causing data corruption or inconsistencies.

The DAG auto-pausing feature aims to address this challenge by introducing two new configuration parameters:

  • max_consecutive_failed_dag_runs_per_dag – This is a global Airflow configuration setting. It allows you to specify the maximum number of consecutive failed DAG runs before the DAG is automatically paused.
  • max_consecutive_failed_dag_runs – This is a DAG-level argument. It overrides the previous global configuration, allowing you to set a custom threshold for each DAG.

In the following code example, we define a DAG with a single PythonOperator. The failing_task is designed to fail by raising a ValueError. The key configuration for DAG auto-pausing is the max_consecutive_failed_dag_runs parameter set in the DAG object. By setting max_consecutive_failed_dag_runs=3, we’re instructing Airflow to automatically pause the DAG after it fails three consecutive times.

from airflow.decorators import dag, task
from datetime import datetime, timedelta

@task
def failing_task():
    raise ValueError("This task is designed to fail")

@dag(
    dag_id="auto_pause",
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(minutes=1),  # Run every minute
    catchup=False,
    max_consecutive_failed_dag_runs=3,  # Set the maximum number of consecutive failed DAG runs
)
def example_dag_with_auto_pause():
    failing_task_instance = failing_task()

example_dag_with_auto_pause()

With this parameter, you can now configure your Airflow DAGs to automatically pause after a specified number of consecutive failures.

To learn more, refer to DAG Auto-pausing in the Airflow documentation.

CLI support for bulk pause and resume of DAGs

As the number of DAGs in your environment grows, managing them becomes increasingly challenging. Whether for upgrading or migrating environments, or other operational activities, you may need to pause or resume multiple DAGs. This process can become a daunting cyclical endeavor because you need to navigate through the Airflow UI, manually pausing or resuming DAGs one at a time. These manual activities are time consuming and increase the risk of human error that can result in missteps and lead to data inconsistencies or pipeline disruptions. The previous CLI commands for pausing and resuming DAGs could only handle one DAG at a time, making it inefficient.

Airflow 2.9.2 improves these CLI commands by adding the capability to treat DAG IDs as regular expressions, allowing you to pause or resume multiple DAGs with a single command. This new feature eliminates the need for repetitive manual intervention or individual DAG operations, significantly reducing the risk of human error, providing reliability and consistency in your data pipelines.

As an example, to pause all DAGs generating daily liquidity reporting using Amazon Redshift as a data source, you can use the following CLI command with a regular expression:

airflow dags pause —treat-dag-id-as-regex -y "^(redshift|daily_liquidity_reporting)"

Custom names for Dynamic Task Mapping

Dynamic Task Mapping was added in Airflow 2.3. This powerful feature allows workflows to create tasks dynamically at runtime based on data. Instead of relying on the DAG author to predict the number of tasks needed in advance, the scheduler can generate the appropriate number of copies of a task based on the output of a previous task. Of course, with great powers comes great responsibilities. By default, dynamically mapped tasks were assigned numeric indexes as names. In complex workflows involving high numbers of mapped tasks, it becomes increasingly challenging to pinpoint the specific tasks that require attention, leading to potential delays and inefficiencies in managing and maintaining your data workflows.

Airflow 2.9 introduces the map_index_template parameter, a highly requested feature that addresses the challenge of task identification in Dynamic Task Mapping. With this capability, you can now provide custom names for your dynamically mapped tasks, enhancing visibility and manageability within the Airflow UI.

See the following example:

from airflow.decorators import dag
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def process_data(data):
    # Perform data processing logic here
    print(f"Processing data: {data}")

@dag(
    dag_id="custom_task_mapping_example",
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
)
def custom_task_mapping_example():
    mapped_processes = PythonOperator.partial(
        task_id="process_data_source",
        python_callable=process_data,
        map_index_template="Processing source={{ task.op_args[0] }}",
    ).expand(op_args=[["source_a"], ["source_b"], ["source_c"]])

custom_task_mapping_example()

The key aspect in the code is the map_index_template parameter specified in the PythonOperator.partial call. This Jinja template instructs Airflow to use the values of the ops_args environment variable as the map index for each dynamically mapped task instance. In the Airflow UI, you will see three task instances with the indexes source_a, source_b, and source_c, making it straightforward to identify and track the tasks associated with each data source. In case of failures, this capability improves monitoring and troubleshooting.

The map_index_template feature goes beyond simple template rendering, offering dynamic injection capabilities into the rendering context. This functionality unlocks greater levels of flexibility and customization when naming dynamically mapped tasks.

Refer to Named mapping in the Airflow documentation to learn more about named mapping.

TaskFlow decorator for Bash commands

Writing complex Bash commands and scripts using the traditional Airflow BashOperator may bring challenges in areas such as code consistency, task dependencies definition, and dynamic command generation. The new @task.bash decorator addresses these challenges, allowing you to define Bash statements using Python functions, making the code more readable and maintainable. It seamlessly integrates with Airflow’s TaskFlow API, enabling you to define dependencies between tasks and create complex workflows. You can also use Airflow’s scheduling and monitoring capabilities while maintaining a consistent coding style.

The following sample code showcases how the @task.bash decorator simplifies the integration of Bash commands into DAGs, while using the full capabilities of Python for dynamic command generation and data processing:

from airflow.decorators import dag, task
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Sample customer data
customer_data = """
id,name,age,city
1,John Doe,35,New York
2,Jane Smith,42,Los Angeles
3,Michael Johnson,28,Chicago
4,Emily Williams,31,Houston
5,David Brown,47,Phoenix
"""

# Sample order data
order_data = """
order_id,customer_id,product,quantity,price
101,1,Product A,2,19.99
102,2,Product B,1,29.99
103,3,Product A,3,19.99
104,4,Product C,2,14.99
105,5,Product B,1,29.99
"""

@dag(
    dag_id='task-bash-customer_order_analysis',
    default_args=default_args,
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(days=1),
    catchup=False,
)
def customer_order_analysis_dag():
    @task.bash
    def clean_data():
        # Clean customer data
        customer_cleaning_commands = """
            echo '{}' > cleaned_customers.csv
            cat cleaned_customers.csv | sed 's/,/;/g' > cleaned_customers.csv
            cat cleaned_customers.csv | awk 'NR > 1' > cleaned_customers.csv
        """.format(customer_data)

        # Clean order data
        order_cleaning_commands = """
            echo '{}' > cleaned_orders.csv
            cat cleaned_orders.csv | sed 's/,/;/g' > cleaned_orders.csv
            cat cleaned_orders.csv | awk 'NR > 1' > cleaned_orders.csv
        """.format(order_data)

        return customer_cleaning_commands + "\n" + order_cleaning_commands

    @task.bash
    def transform_data(cleaned_customers, cleaned_orders):
        # Transform customer data
        customer_transform_commands = """
            cat {cleaned_customers} | awk -F';' '{{printf "%s,%s,%s\\n", $1, $2, $3}}' > transformed_customers.csv
        """.format(cleaned_customers=cleaned_customers)

        # Transform order data
        order_transform_commands = """
            cat {cleaned_orders} | awk -F';' '{{printf "%s,%s,%s,%s,%s\\n", $1, $2, $3, $4, $5}}' > transformed_orders.csv
        """.format(cleaned_orders=cleaned_orders)

        return customer_transform_commands + "\n" + order_transform_commands

    @task.bash
    def analyze_data(transformed_customers, transformed_orders):
        analysis_commands = """
            # Calculate total revenue
            total_revenue=$(awk -F',' '{{sum += $5 * $4}} END {{printf "%.2f", sum}}' {transformed_orders})
            echo "Total revenue: $total_revenue"

            # Find customers with multiple orders
            customers_with_multiple_orders=$(
                awk -F',' '{{orders[$2]++}} END {{for (c in orders) if (orders[c] > 1) printf "%s,", c}}' {transformed_orders}
            )
            echo "Customers with multiple orders: $customers_with_multiple_orders"

            # Find most popular product
            popular_product=$(
                awk -F',' '{{products[$3]++}} END {{max=0; for (p in products) if (products[p] > max) {{max=products[p]; popular=p}}}} END {{print popular}}'
            {transformed_orders})
            echo "Most popular product: $popular_product"
        """.format(transformed_customers=transformed_customers, transformed_orders=transformed_orders)

        return analysis_commands

    cleaned_data = clean_data()
    transformed_data = transform_data(cleaned_data, cleaned_data)
    analysis_results = analyze_data(transformed_data, transformed_data)

customer_order_analysis_dag()

You can learn more about the @task.bash decorator in the Airflow documentation.

Set up a new Airflow 2.9.2 environment in Amazon MWAA

You can initiate the setup in your account and preferred AWS Region using the AWS Management Console, API, or AWS Command Line Interface (AWS CLI). If you’re adopting infrastructure as code (IaC), you can automate the setup using AWS CloudFormation, the AWS Cloud Development Kit (AWS CDK), or Terraform scripts.

Upon successful creation of an Airflow 2.9 environment in Amazon MWAA, certain packages are automatically installed on the scheduler and worker nodes. For a complete list of installed packages and their versions, refer to Apache Airflow provider packages installed on Amazon MWAA environments. You can install additional packages using a requirements file.

Upgrade from older versions of Airflow to version 2.9.2

You can take advantage of these latest capabilities by upgrading your older Airflow version 2.x-based environments to version 2.9 using in-place version upgrades. To learn more about in-place version upgrades, refer to Upgrading the Apache Airflow version or Introducing in-place version upgrades with Amazon MWAA.

Conclusion

In this post, we announced the availability of Apache Airflow 2.9 environments in Amazon MWAA. We discussed how some of the latest features added in the release enable you to design more reactive, event-driven workflows, such as DAG scheduling based on the result of logical operations, and the availability of endpoints in the REST API to programmatically create dataset events. We also provided some sample code to show the implementation in Amazon MWAA.

For the complete list of changes, refer to Airflow’s release notes. For additional details and code examples on Amazon MWAA, visit the Amazon MWAA User Guide and the Amazon MWAA examples GitHub repo.

Apache, Apache Airflow, and Airflow are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries.


About the authors

Hernan Garcia is a Senior Solutions Architect at AWS, based out of Amsterdam, working with enterprises in the Financial Services Industry. He specializes in application modernization and supports customers in the adoption of serverless technologies.

Parnab Basak is a Solutions Architect and a Serverless Specialist at AWS. He specializes in creating new solutions that are cloud native using modern software development practices like serverless, DevOps, and analytics. Parnab works closely in the analytics and integration services space helping customers adopt AWS services for their workflow orchestration needs.

Run Apache XTable on Amazon MWAA to translate open table formats

Post Syndicated from Matthias Rudolph original https://aws.amazon.com/blogs/big-data/run-apache-xtable-on-amazon-mwaa-to-translate-open-table-formats/

Open table formats (OTFs) like Apache Iceberg are being increasingly adopted, for example, to improve transactional consistency of a data lake or to consolidate batch and streaming data pipelines on a single file format and reduce complexity. In practice, architects need to integrate the chosen format with the various layers of a modern data platform. However, the level of support for the different OTFs varies across common analytical services.

Commercial vendors and the open source community have recognized this situation and are working on interoperability between table formats. One approach is to make a single physical dataset readable in different formats by translating its metadata and avoiding reprocessing of actual data files. Apache XTable is an open source solution that follows this approach and provides abstractions and tools for the translation of open table format metadata.

In this post, we show you how to get started with Apache XTable on AWS and how you can use it in a batch pipeline orchestrated with Amazon Managed Workflows for Apache Airflow (Amazon MWAA). To understand how XTable and similar solutions work, we start with a high-level background on metadata management in an OTF and then dive deeper into XTable and its usage.

Open table formats

Open table formats overcome the gaps of traditional storage formats of data lakes such as Apache Hive tables. They provide abstractions and capabilities known from relational databases like transactional consistency and the ability to create, update, or delete single records. In addition, they help manage schema evolution.

In order to understand how the XTable metadata translation approach works, you must first understand how the metadata of an OTF is represented on the storage layer.

An OTF comprises a data layer and a metadata layer, which are both represented as files on storage. The data layer contains the data files. The metadata layer contains metadata files that keep track of the data files and the transactionally consistent sequence of changes to these. The following figure illustrates this configuration.

Inspecting the files of an Iceberg table on storage, we identify the metadata layer through the folder metadata. Adjacent to it are the data files—in this example, as snappy-compressed Parquet:

<table base folder>
├── metadata # contains metadata files
│ ├── 00000-6c64b16d-affa-4f0e-8635-a996ec13a7fa.metadata.json
│ ├── 23ba9e94-7819-4661-b698-f512f5b51054-m0.avro
│ └── snap-5514083086862319381-1-23ba9e94-7819-4661-b698-f512f5b51054.avro
└── part-00011-587322f1-1007-4500-a5cf-8022f6e7fa3c-c000.snappy.parquet # data files

Comparable to Iceberg, in Delta Lake, the metadata layer is represented through the folder _delta_log:

<table base folder>
├── _delta_log # contains metadata files
│ └── 00000000000000000000.json
└── part-00011-587322f1-1007-4500-a5cf-8022f6e7fa3c-c000.snappy.parquet # data files

Although the metadata layer varies in structure and capabilities between OTFs, it’s eventually just files on storage. Typically, it resides in the table’s base folder adjacent to the data files.

Now, the question emerges: what if metadata files of multiple different formats are stored in parallel for the same table?

Current approaches to interoperability do exactly that, as we will see in the next section.

Apache XTable

XTable is currently provided as a standalone Java binary. It translates the metadata layer between Apache Hudi, Apache Iceberg, or Delta Lake without rewriting data files and integrates with Iceberg-compatible catalogs like the AWS Glue Data Catalog.

In practice, XTable reads the latest snapshot of an input table and creates additional metadata for configurable target formats. It adds this additional metadata to the table on the storage layer—in addition to existing metadata.

Through this, you can choose either format, source or target, read the respective metadata, and get the same consistent view on the table’s data.

The following diagram illustrates the metadata translation process.

Let’s assume you have an existing Delta Lake table that you want to make readable as an Iceberg table. To run XTable, you invoke its Java binary and provide a dataset config file that specifies source and target format, as well as source table paths:

java -jar utilities-0.1.0-SNAPSHOT-bundled.jar \
	--datasetConfig datasetConfig.yaml

A minimal datasetConfig.yaml looks as follows, assuming the table is stored on Amazon Simple Storage Service (Amazon S3):

---
sourceFormat: DELTA
targetFormats:
  - ICEBERG
datasets:
  - tableBasePath: s3://<URI to base folder of table>
    tableName: <table name>
...

As shown in the following listing, XTable adds the Iceberg-specific metadata folder to the table’s base path in addition to the existing _delta_log folder. Now, clients can read the table in either Delta Lake or Iceberg format.

<table base folder>
├── _delta_log # Previously existing Delta Lake metadata
│   └── ...
├── metadata   # Added by XTable: Apache Iceberg metadata
│   └── ...
└── part-00011-587322f1-1007-4500-a5cf-8022f6e7fa3c-c000.snappy.parquet # data files

To register the Iceberg table in Data Catalog, pass a further config file to XTable that is responsible for Iceberg catalogs:

java -jar utilities-0.1.0-SNAPSHOT-bundled.jar \
	--datasetConfig datasetConfig.yaml \
	-- icebergCatalogConfig glueDataCatalog.yaml

The minimal contents of glueDataCatalog.yaml are as follows. It configures XTable to use the Data Catalog-specific IcebergCatalog implementation provided by the iceberg-aws module, which is part of the Apache Iceberg core project:

---
catalogImpl: org.apache.iceberg.aws.glue.GlueCatalog
catalogName: glue
catalogOptions:
  warehouse: s3://<URI to base folder of Iceberg tables>
  catalog-impl: org.apache.iceberg.aws.glue.GlueCatalog
  io-impl: org.apache.iceberg.aws.s3.S3FileIO
... 

Run Apache XTable as an Airflow Operator

You can use XTable in batch data pipelines that write tables on the data lake and make sure these are readable in different file formats. For instance, operating in the Delta Lake ecosystem, a data pipeline might create Delta tables, which need to be accessible as Iceberg tables as well.

One tool to orchestrate data pipelines on AWS is Amazon MWAA, which is a managed service for Apache Airflow. In the following sections, we explore how XTable can run within a custom Airflow Operator on Amazon MWAA. We elaborate on the initial design of such an Operator and demonstrate its deployment on Amazon MWAA.

Why a custom Operator? Although XTable could also be invoked from a BashOperator directly, we choose to wrap this step in a custom operator to allow for configuration through a native Airflow programming language (Python) and operator parameters only. For a background on how to write custom operators, see Creating a custom operator.

The following diagram illustrates the dependency between the operator and XTable’s binary.

Input parameters of the Operator

XTable’s primary inputs are YAML-based configuration files:

  • Dataset config – Contains source format, target formats, and source tables
  • Iceberg catalog config (optional) – Contains the reference to an external Iceberg catalog into which to register the table in the target format

We choose to reflect the data structures of the YAML files in the Operator’s input parameters, as listed in the following table.

Parameter Type Values
dataset_config dict Contents of dataset config as dict literal
iceberg_catalog_config dict Contents of Iceberg catalog config as dict literal

As the Operator runs, the YAML files are generated from the input parameters.

Refer to XTable’s GitHub repository for a full reference of all possible dict keys.

Example parameterisation

The following example shows the configuration to translate a table from Delta Lake to both Iceberg and Hudi. The attribute dataset_config reflects the structure of the dataset config file through a Python dict literal:

from mwaa_plugin.plugin import XtableOperator
operator = XtableOperator(
    task_id="xtableTask",
    dataset_config={
        "sourceFormat": "DELTA",
        "targetFormats": ["ICEBERG", "HUDI"],
        "datasets": [
            {
                "tableBasePath": "s3://datalake/sales",
                "tableName": "sales",
                "namespace": "table",
            }
        ],
    }
)

Sample code: The full source code of the sample XtableOperator and all other code used in this post is provided through this GitHub repository.

Solution overview

To deploy the custom operator to Amazon MWAA, we upload it together with DAGs into the configured DAG folder.

Besides the operator itself, we also need to upload XTable’s executable JAR. As of writing this post, the JAR needs to be compiled by the user from source code. To simplify this, we provide a container-based build script.

Prerequisites

We assume you have at least an environment consisting of Amazon MWAA itself, an S3 bucket, and an AWS Identity and Access Management (IAM) role for Amazon MWAA that has read access to the bucket and optionally write access to the AWS Glue Data Catalog.

In addition, you need one of the following container runtimes to run the provided build script for XTable:

  • Finch
  • Docker

Build and deploy the XTableOperator

To compile XTable, you can use the provided build script and complete the following steps:

  1. Clone the sample code from GitHub:
    git clone https://github.com/aws-samples/apache-xtable-on-aws-samples.git
    cd apache-xtable-on-aws-samples

  2. Run the build script:
    ./build-airflow-operator.sh

  3. Because the Airflow operator uses the library JPype to invoke XTable’s JAR, add a dependency in the Amazon MWAA requirement.txt file:
    JPype1==1.5.0

    For a background on installing additional Python libraries on Amazon MWAA, see Installing Python dependencies.
    Because XTable is Java-based, a Java 11 runtime environment (JRE) is required on Amazon MWAA. You can use the Amazon MWAA startup script to install a JRE.

  4. Add the following lines to an existing startup script or create a new one as provided in the sample code base of this post:
    if [[ "${MWAA_AIRFLOW_COMPONENT}" != "webserver" ]]
    then
        sudo yum install -y java-11-amazon-corretto-headless
    fi

    For more information about this mechanism, see Using a startup script with Amazon MWAA.

  5. Upload xtable_operator/, requirements.txt, startup.sh and .airflowignore to the S3 bucket and respective paths from which Amazon MWAA will read files.
    Make sure the IAM role for Amazon MWAA has appropriate read permissions.
    With regard to the Customer Operator, make sure to upload the local folder xtable_operator/ and .airflowignore into the configured DAG folder.

  6. Update the configuration of your Amazon MWAA environment as follows and start the update process:
    1. Add or update the S3 URI to the requirements.txt file through the Requirements file configuration option.
    2. Add or update the S3 URI to the startup.sh script through Startup script configuration option.
  7. Optionally, you can use the AWS Glue Data Catalog as an Iceberg catalog. In case you create Iceberg metadata and want to register it in the AWS Glue Data Catalog, the Amazon MWAA role needs permissions to create or modify tables in AWS Glue. The following listing shows a minimal policy for this. It constrains permissions to a defined database in AWS Glue:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "glue:GetDatabase",
                    "glue:CreateTable",
                    "glue:GetTables",
                    "glue:UpdateTable",
                    "glue:GetDatabases",
                    "glue:GetTable"
                ],
                "Resource": [
                    "arn:aws:glue:<AWS Region>:<AWS Account ID>:catalog",
                    "arn:aws:glue:<AWS Region>:<AWS Account ID>:database/<Database name>",
                    "arn:aws:glue:<AWS Region>:<AWS Account ID>:table/<Database name>/*"
                ]
            }
        ]
    }

Using the XTableOperator in practice: Delta Lake to Apache Iceberg

Let’s look into a practical example that uses the XTableOperator. We continue the scenario of a data pipeline in the Delta Lake ecosystem and assume it is implemented as a DAG on Amazon MWAA. The following figure shows our example batch pipeline.

The pipeline uses an Apache Spark job that is run by AWS Glue to write a Delta table into an S3 bucket. Additionally, the table is made accessible as an Iceberg table without data duplication. Finally, we want to load the Iceberg table into Amazon Redshift, which is a fully managed, petabyte-scale data warehouse service in the cloud.

As shown in the following screenshot of the graph visualization of the example DAG, we run the XTableOperator after creating the Delta table through a Spark job. Then we use the RedshiftDataOperator to refresh a materialized view, which is used in downstream transformations as a source table. Materialized views are a common construct to precompute complex queries on large tables. In this example, we use them to simplify data loading into Amazon Redshift because of the incremental update capabilities in combination with Iceberg.

The input parameters of the XTableOperator are as follows:

operator = XtableOperator(
    task_id="xtableTask",
    dataset_config={
        "sourceFormat": "DELTA",
        "targetFormats": ["ICEBERG"],
        "datasets": [
            {
                "tableBasePath": "s3://<datalake>/<sales>",
                "tableName": "sales",
                "namespace": "table",
            }
        ],
    },
    iceberg_catalog_config={
        "catalogImpl": "org.apache.iceberg.aws.glue.GlueCatalog",
        "catalogName": "glue",
        "catalogOptions": {
            "warehouse": "s3://datalake/sales",
            "catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
            "io-impl": "org.apache.iceberg.aws.s3.S3FileIO"
    }
)

The XTableOperator creates Apache Iceberg metadata on Amazon S3 and registers a table accordingly in the Data Catalog. The following screenshot shows the created Iceberg table. AWS Glue stores a pointer to Iceberg’s most recent metadata file. As updates are applied to the table and new metadata files are created, XTable updates the pointer after each job.

Amazon Redshift is able to discover the Iceberg table through the Data Catalog and read it using Amazon Redshift Spectrum.

Summary

In this post, we showed how Apache XTable translates the metadata layer of open table formats without data duplication. This provides advantages from both a cost and data integrity perspective—especially in large-scale environment—and allows for a migration of an existing historical estate of datasets. We also discussed how a you can implement a custom Airflow Operator that embeds Apache XTable into data pipelines on Amazon MWAA.

For further reading, visit What’s new with Amazon MWAA and Apache XTable’s website. For more examples of other customer operators, refer to the following GitHub repository.


About the Authors

Matthias Rudolph is an Associate Solutions Architect, digitalizing the German manufacturing industry.

Stephen Said is a Senior Solutions Architect and works with Retail/CPG customers. His areas of interest are data platforms and cloud-native software engineering.

Amazon MWAA best practices for managing Python dependencies

Post Syndicated from Mike Ellis original https://aws.amazon.com/blogs/big-data/amazon-mwaa-best-practices-for-managing-python-dependencies/

Customers with data engineers and data scientists are using Amazon Managed Workflows for Apache Airflow (Amazon MWAA) as a central orchestration platform for running data pipelines and machine learning (ML) workloads. To support these pipelines, they often require additional Python packages, such as Apache Airflow Providers. For example, a pipeline may require the Snowflake provider package for interacting with a Snowflake warehouse, or the Kubernetes provider package for provisioning Kubernetes workloads. As a result, they need to manage these Python dependencies efficiently and reliably, providing compatibility with each other and the base Apache Airflow installation.

Python includes the tool pip to handle package installations. To install a package, you add the name to a special file named requirements.txt. The pip install command instructs pip to read the contents of your requirements file, determine dependencies, and install the packages. Amazon MWAA runs the pip install command using this requirements.txt file during initial environment startup and subsequent updates. For more information, see How it works.

Creating a reproducible and stable requirements file is key for reducing pip installation and DAG errors. Additionally, this defined set of requirements provides consistency across nodes in an Amazon MWAA environment. This is most important during worker auto scaling, where additional worker nodes are provisioned and having different dependencies could lead to inconsistencies and task failures. Additionally, this strategy promotes consistency across different Amazon MWAA environments, such as dev, qa, and prod.

This post describes best practices for managing your requirements file in your Amazon MWAA environment. It defines the steps needed to determine your required packages and package versions, create and verify your requirements.txt file with package versions, and package your dependencies.

Best practices

The following sections describe the best practices for managing Python dependencies.

Specify package versions in the requirements.txt file

When creating a Python requirements.txt file, you can specify just the package name, or the package name and a specific version. Adding a package without version information instructs the pip installer to download and install the latest available version, subject to compatibility with other installed packages and any constraints. The package versions selected during environment creation may be different than the version selected during an auto scaling event later on. This version change can create package conflicts leading to pip install errors. Even if the updated package installs properly, code changes in the package can affect task behavior, leading to inconsistencies in output. To avoid these risks, it’s best practice to add the version number to each package in your requirements.txt file.

Use the constraints file for your Apache Airflow version

A constraints file contains the packages, with versions, verified to be compatible with your Apache Airflow version. This file adds an additional validation layer to prevent package conflicts. Because the constraints file plays such an important role in preventing conflicts, beginning with Apache Airflow v2.7.2 on Amazon MWAA, your requirements file must include a --constraint statement. If a --constraint statement is not supplied, Amazon MWAA will specify a compatible constraints file for you.

Constraint files are available for each Airflow version and Python version combination. The URLs have the following form:

https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt

The official Apache Airflow constraints are guidelines, and if your workflows require newer versions of a provider package, you may need to modify your constraints file and include it in your DAG folder. When doing so, the best practices outlined in this post become even more important to guard against package conflicts.

Create a .zip archive of all dependencies

Creating a .zip file containing the packages in your requirements file and specifying this as the package repository source makes sure the exact same wheel files are used during your initial environment setup and subsequent node configurations. The pip installer will use these local files for installation rather than connecting to the external PyPI repository.

Test the requirements.txt file and dependency .zip file

Testing your requirements file before release to production is key to avoiding installation and DAG errors. Testing both locally, with the MWAA local runner, and in a dev or staging Amazon MWAA environment, are best practices before deploying to production. You can use continuous integration and delivery (CI/CD) deployment strategies to perform the requirements and package installation testing, as described in Automating a DAG deployment with Amazon Managed Workflows for Apache Airflow.

Solution overview

This solution uses the MWAA local runner, an open source utility that replicates an Amazon MWAA environment locally. You use the local runner to build and validate your requirements file, and package the dependencies. In this example, you install the snowflake and dbt-cloud provider packages. You then use the MWAA local runner and a constraints file to determine the exact version of each package compatible with Apache Airflow. With this information, you then update the requirements file, pinning each package to a version, and retest the installation. When you have a successful installation, you package your dependencies and test in a non-production Amazon MWAA environment.

We use MWAA local runner v2.8.1 for this walkthrough and walk through the following steps:

  1. Download and build the MWAA local runner.
  2. Create and test a requirements file with package versions.
  3. Package dependencies.
  4. Deploy the requirements file and dependencies to a non-production Amazon MWAA environment.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Set up the MWAA local runner

First, you download the MWAA local runner version matching your target MWAA environment, then you build the image.

Complete the following steps to configure the local runner:

  1. Clone the MWAA local runner repository with the following command:
    git clone [email protected]:aws/aws-mwaa-local-runner.git -b v2.8.1

  2. With Docker running, build the container with the following command:
    cd aws-mwaa-local-runner
     ./mwaa-local-env build-image

Create and test a requirements file with package versions

Building a versioned requirements file makes sure all Amazon MWAA components have the same package versions installed. To determine the compatible versions for each package, you start with a constraints file and an un-versioned requirements file, allowing pip to resolve the dependencies. Then you create your versioned requirements file from pip’s installation output.

The following diagram illustrates this workflow.

Requirements file testing process

To build an initial requirements file, complete the following steps:

  1. In your MWAA local runner directory, open requirements/requirements.txt in your preferred editor.

The default requirements file will look similar to the following:

--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.11.txt"
apache-airflow-providers-snowflake==5.2.1
apache-airflow-providers-mysql==5.5.1
  1. Replace the existing packages with the following package list:
--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.11.txt"
apache-airflow-providers-snowflake
apache-airflow-providers-dbt-cloud[http]
  1. Save requirements.txt.
  2. In a terminal, run the following command to generate the pip install output:
./mwaa-local-env test-requirements

test-requirements runs pip install, which handles resolving the compatible package versions. Using a constraints file makes sure the selected packages are compatible with your Airflow version. The output will look similar to the following:

Successfully installed apache-airflow-providers-dbt-cloud-3.5.1 apache-airflow-providers-snowflake-5.2.1 pyOpenSSL-23.3.0 snowflake-connector-python-3.6.0 snowflake-sqlalchemy-1.5.1 sortedcontainers-2.4.0

The message beginning with Successfully installed is the output of interest. This shows which dependencies, and their specific version, pip installed. You use this list to create your final versioned requirements file.

Your output will also contain Requirement already satisfied messages for packages already available in the base Amazon MWAA environment. You do not add these packages to your requirements.txt file.

  1. Update the requirements file with the list of versioned packages from the test-requirements command. The updated file will look similar to the following code:
--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.11.txt"
apache-airflow-providers-snowflake==5.2.1
apache-airflow-providers-dbt-cloud[http]==3.5.1
pyOpenSSL==23.3.0
snowflake-connector-python==3.6.0
snowflake-sqlalchemy==1.5.1
sortedcontainers==2.4.0

Next, you test the updated requirements file to confirm no conflicts exist.

  1. Rerun the requirements-test function:
./mwaa-local-env test-requirements

A successful test will not produce any errors. If you encounter dependency conflicts, return to the previous step and update the requirements file with additional packages, or package versions, based on pip’s output.

Package dependencies

If your Amazon MWAA environment has a private webserver, you must package your dependencies into a .zip file, upload the file to your S3 bucket, and specify the package location in your Amazon MWAA instance configuration. Because a private webserver can’t access the PyPI repository through the internet, pip will install the dependencies from the .zip file.

If you’re using a public webserver configuration, you also benefit from a static .zip file, which makes sure the package information remains unchanged until it is explicitly rebuilt.

This process uses the versioned requirements file created in the previous section and the package-requirements feature in the MWAA local runner.

To package your dependencies, complete the following steps:

  1. In a terminal, navigate to the directory where you installed the local runner.
  2. Download the constraints file for your Python version and your version of Apache Airflow and place it in the plugins directory. For this post, we use Python 3.11 and Apache Airflow v2.8.1:
curl -o plugins/constraints-2.8.1-3.11.txt https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.11.txt
  1. In your requirements file, update the constraints URL to the local downloaded file.

The –-constraint statement instructs pip to compare the package versions in your requirements.txt file to the allowed version in the constraints file. Downloading a specific constraints file to your plugins directory enables you to control the constraint file location and contents.

The updated requirements file will look like the following code:

--constraint "/usr/local/airflow/plugins/constraints-2.8.1-3.11.txt"
apache-airflow-providers-snowflake==5.2.1
apache-airflow-providers-dbt-cloud[http]==3.5.1
pyOpenSSL==23.3.0
snowflake-connector-python==3.6.0
snowflake-sqlalchemy==1.5.1
sortedcontainers==2.4.0
  1. Run the following command to create the .zip file:
./mwaa-local-env package-requirements

package-requirements creates an updated requirements file named packaged_requirements.txt and zips all dependencies into plugins.zip. The updated requirements file looks like the following code:

--find-links /usr/local/airflow/plugins
--no-index
--constraint "/usr/local/airflow/plugins/constraints-2.8.1-3.11.txt"
apache-airflow-providers-snowflake==5.2.1
apache-airflow-providers-dbt-cloud[http]==3.5.1
pyOpenSSL==23.3.0
snowflake-connector-python==3.6.0
snowflake-sqlalchemy==1.5.1
sortedcontainers==2.4.0

Note the reference to the local constraints file and the plugins directory. The –-find-links statement instructs pip to install packages from /usr/local/airflow/plugins rather than the public PyPI repository.

Deploy the requirements file

After you achieve an error-free requirements installation and package your dependencies, you’re ready to deploy the assets to a non-production Amazon MWAA environment. Even when verifying and testing requirements with the MWAA local runner, it’s best practice to deploy and test the changes in a non-prod Amazon MWAA environment before deploying to production. For more information about creating a CI/CD pipeline to test changes, refer to Deploying to Amazon Managed Workflows for Apache Airflow.

To deploy your changes, complete the following steps:

  1. Upload your requirements.txt file and plugins.zip file to your Amazon MWAA environment’s S3 bucket.

For instructions on specifying a requirements.txt version, refer to Specifying the requirements.txt version on the Amazon MWAA console. For instructions on specifying a plugins.zip file, refer to Installing custom plugins on your environment.

The Amazon MWAA environment will update and install the packages in your plugins.zip file.

After the update is complete, verify the provider package installation in the Apache Airflow UI.

  1. Access the Apache Airflow UI in Amazon MWAA.
  2. From the Apache Airflow menu bar, choose Admin, then Providers.

The list of providers, and their versions, is shown in a table. In this example, the page reflects the installation of apache-airflow-providers-db-cloud version 3.5.1 and apache-airflow-providers-snowflake version 5.2.1. This list only contains the provider packages installed, not all supporting Python packages. Provider packages that are part of the base Apache Airflow installation will also appear in the list. The following image is an example of the package list; note the apache-airflow-providers-db-cloud and apache-airflow-providers-snowflake packages and their versions.

Airflow UI with installed packages

To verify all package installations, view the results in Amazon CloudWatch Logs. Amazon MWAA creates a log stream for the requirements installation and the stream contains the pip install output. For instructions, refer to Viewing logs for your requirements.txt.

A successful installation results in the following message:

Successfully installed apache-airflow-providers-dbt-cloud-3.5.1 apache-airflow-providers-snowflake-5.2.1 pyOpenSSL-23.3.0 snowflake-connector-python-3.6.0 snowflake-sqlalchemy-1.5.1 sortedcontainers-2.4.0

If you encounter any installation errors, you should determine the package conflict, update the requirements file, run the local runner test, re-package the plugins, and deploy the updated files.

Clean up

If you created an Amazon MWAA environment specifically for this post, delete the environment and S3 objects to avoid incurring additional charges.

Conclusion

In this post, we discussed several best practices for managing Python dependencies in Amazon MWAA and how to use the MWAA local runner to implement these practices. These best practices reduce DAG and pip installation errors in your Amazon MWAA environment. For additional details and code examples on Amazon MWAA, visit the Amazon MWAA User Guide and the Amazon MWAA examples GitHub repo.

Apache, Apache Airflow, and Airflow are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries.


About the Author


Mike Ellis is a Technical Account Manager at AWS and an Amazon MWAA specialist. In addition to assisting customers with Amazon MWAA, he contributes to the Apache Airflow open source project.

Disaster recovery strategies for Amazon MWAA – Part 2

Post Syndicated from Chandan Rupakheti original https://aws.amazon.com/blogs/big-data/disaster-recovery-strategies-for-amazon-mwaa-part-2/

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a fully managed orchestration service that makes it straightforward to run data processing workflows at scale. Amazon MWAA takes care of operating and scaling Apache Airflow so you can focus on developing workflows. However, although Amazon MWAA provides high availability within an AWS Region through features like Multi-AZ deployment of Airflow components, recovering from a Regional outage requires a multi-Region deployment.

In Part 1 of this series, we highlighted challenges for Amazon MWAA disaster recovery and discussed best practices to improve resiliency. In particular, we discussed two key strategies: backup and restore and warm standby. In this post, we dive deep into the implementation for both strategies and provide a deployable solution to realize the architectures in your own AWS account.

The solution for this post is hosted on GitHub. The README in the repository offers tutorials as well as further workflow details for both backup and restore and warm standby strategies.

Backup and restore architecture

The backup and restore strategy involves periodically backing up Amazon MWAA metadata to Amazon Simple Storage Service (Amazon S3) buckets in the primary Region. The backups are replicated to an S3 bucket in the secondary Region. In case of a failure in the primary Region, a new Amazon MWAA environment is created in the secondary Region and hydrated with the backed-up metadata to restore the workflows.

The project uses the AWS Cloud Development Kit (AWS CDK) and is set up like a standard Python project. Refer to the detailed deployment steps in the README file to deploy it in your own accounts.

The following diagram shows the architecture of the backup and restore strategy and its key components:

  • Primary Amazon MWAA environment – The environment in the primary Region hosts the workflows
  • Metadata backup bucket – The bucket in the primary Region stores periodic backups of Airflow metadata tables
  • Replicated backup bucket – The bucket in the secondary Region syncs metadata backups through Amazon S3 cross-Region replication
  • Secondary Amazon MWAA environment – This environment is created on-demand during recovery in the secondary Region
  • Backup workflow – This workflow periodically backups up Airflow metadata to the S3 buckets in the primary Region
  • Recovery workflow – This workflow monitors the primary Amazon MWAA environment and initiates failover when needed in the secondary Region

 

The backup restore architecture

Figure 1: The backup restore architecture

There are essentially two workflows that work in conjunction to achieve the backup and restore functionality in this architecture. Let’s explore both workflows in detail and the steps as outlined in Figure 1.

Backup workflow

The backup workflow is responsible for periodically taking a backup of your Airflow metadata tables and storing them in the backup S3 bucket. The steps are as follows:

  • [1.a] You can deploy the provided solution from your continuous integration and delivery (CI/CD) pipeline. The pipeline includes a DAG deployed to the DAGs S3 bucket, which performs backup of your Airflow metadata. This is the bucket where you host all of your DAGs for your environment.
  • [1.b] The solution enables cross-Region replication of the DAGs bucket. Any new changes to the primary Region bucket, including DAG files, plugins, and requirements.txt files, are replicated to the secondary Region DAGs bucket. However, for existing objects, a one-time replication needs to be performed using S3 Batch Replication.
  • [1.c] The DAG deployed to take metadata backup runs periodically. The metadata backup doesn’t include some of the auto-generated tables and the list of tables to be backed up is configurable. By default, the solution backs up variable, connection, slot pool, log, job, DAG run, trigger, task instance, and task fail tables. The backup interval is also configurable and should be based on the Recovery Point Objective (RPO), which is the data loss time during a failure that can be sustained by your business.
  • [1.d] Similar to the DAGs bucket, the backup bucket is also synced using cross-Region replication, through which the metadata backup becomes available in the secondary Region.

Recovery workflow

The recovery workflow runs periodically in the secondary Region monitoring the primary Amazon MWAA environment. It has two functions:

  • Store the environment configuration of the primary Amazon MWAA environment in the secondary backup bucket, which is used to recreate an identical Amazon MWAA environment in the secondary Region during failure
  • Perform the failover when a failure is detected

The following are the steps for when the primary Amazon MWAA environment is healthy (see Figure 1):

  • [2.a] The Amazon EventBridge scheduler starts the AWS Step Functions workflow on a provided schedule.
  • [2.b] The workflow, using AWS Lambda, checks Amazon CloudWatch in the primary Region for the SchedulerHeartbeat metrics of the primary Amazon MWAA environment. The environment in the primary Region sends heartbeats to CloudWatch every 5 seconds by default. However, to not invoke a recovery workflow spuriously, we use a default aggregation period of 5 minutes to check the heartbeat metrics. Therefore, it can take up to 5 minutes to detect a primary environment failure.
  • [2.c] Assuming that the heartbeat was detected in 2.b, the workflow makes the cross-Region GetEnvironment call to the primary Amazon MWAA environment.
  • [2.d] The response from the GetEnvironment call is stored in the secondary backup S3 bucket to be used in case of a failure in the subsequent iterations of the workflow. This makes sure the latest configuration of your primary environment is used to recreate a new environment in the secondary Region. The workflow completes successfully after storing the configuration.

The following are the steps for the case when the primary environment is unhealthy (see Figure 1):

  • [2.a] The EventBridge scheduler starts the Step Functions workflow on a provided schedule.
  • [2.b] The workflow, using Lambda, checks CloudWatch in the primary Region for the scheduler heartbeat metrics and detects failure. The scheduler heartbeat check using the CloudWatch API is the recommended approach to detect failure. However, you can implement a custom strategy for failure detection in the Lambda function such as deploying a DAG to periodically send custom metrics to CloudWatch or other data stores as heartbeats and using the function to check that metrics. With the current CloudWatch-based strategy, the unavailability of the CloudWatch API may spuriously invoke the recovery flow.
  • [2.c] Skipped
  • [2.d] The workflow reads the previously stored environment details from the backup S3 bucket.
  • [2.e] The environment details read from the previous step is used to recreate an identical environment in the secondary Region using the CreateEnvironment API call. The API also needs other secondary Region specific configurations such as VPC, subnets, and security groups that are read from the user-supplied configuration file or environment variables during the solution deployment. The workflow in a polling loop waits until the environment becomes available and invokes the DAG to restore metadata from the backup S3 bucket. This DAG is deployed to the DAGs S3 bucket as a part of the solution deployment.
  • [2.f] The DAG for restoring metadata completes hydrating the newly created environment and notifies the Step Functions workflow of completion using the task token integration. The new environment now starts running the active workflows and the recovery completes successfully.

Considerations

Consider the following when using the backup and restore method:

  • Recovery Time Objective – From failure detection to workflows running in the secondary Region, failover can take over 30 minutes. This includes new environment creation, Airflow startup, and metadata restore.
  • Cost – This strategy avoids the overhead of running a passive environment in the secondary Region. Costs are limited to periodic backup storage, cross-Region data transfer charges, and minimal compute for the recovery workflow.
  • Data loss – The RPO depends on the backup frequency. There is a design trade-off to consider here. Although shorter intervals between backups can minimize potential data loss, too frequent backups can adversely affect the performance of the metadata database and consequently the primary Airflow environment. Also, the solution can’t recover an actively running workflow midway. All active workflows are started fresh in the secondary Region based on the provided schedule.
  • Ongoing management – The Amazon MWAA environment and dependencies are automatically kept in sync across Regions in this architecture. As specified in the Step 1.b of the backup workflow, the DAGs S3 bucket will need a one-time deployment of the existing resources for the solution to work.

Warm standby architecture

The warm standby strategy involves deploying identical Amazon MWAA environments in two Regions. Periodic metadata backups from the primary Region are used to rehydrate the standby environment in case of failover.

The project uses the AWS CDK and is set up like a standard Python project. Refer to the detailed deployment steps in the README file to deploy it in your own accounts.

The following diagram shows the architecture of the warm standby strategy and its key components:

  • Primary Amazon MWAA environment – The environment in the primary Region hosts the workflows during normal operation
  • Secondary Amazon MWAA environment – The environment in the secondary Region acts as a warm standby ready to take over at any time
  • Metadata backup bucket – The bucket in the primary Region stores periodic backups of Airflow metadata tables
  • Replicated backup bucket – The bucket in the secondary Region syncs metadata backups through S3 Cross-Region Replication.
  • Backup workflow – This workflow periodically backups up Airflow metadata to the S3 buckets in both Regions
  • Recovery workflow – This workflow monitors the primary environment and initiates failover to the secondary environment when needed

 

The warm standby architecture

Figure 2: The warm standby architecture

Similar to the backup and restore strategy, the backup workflow (Steps 1a–1d) periodically backups up critical Amazon MWAA metadata to S3 buckets in the primary Region, which is synced in the secondary Region.

The recovery workflow runs periodically in the secondary Region monitoring the primary environment. On failure detection, it initiates the failover procedure. The steps are as follows (see Figure 2):

  • [2.a] The EventBridge scheduler starts the Step Functions workflow on a provided schedule.
  • [2.b] The workflow checks CloudWatch in the primary Region for the scheduler heartbeat metrics and detects failure. If the primary environment is healthy, the workflow completes without further actions.
  • [2.c] The workflow invokes the DAG to restore metadata from the backup S3 bucket.
  • [2.d] The DAG for restoring metadata completes hydrating the passive environment and notifies the Step Functions workflow of completion using the task token integration. The passive environment starts running the active workflows on the provided schedules.

Because the secondary environment is already warmed up, the failover is faster with recovery times in minutes.

Considerations

Consider the following when using the warm standby method:

  • Recovery Time Objective – With a warm standby ready, the RTO can be as low as 5 minutes. This includes just the metadata restore and reenabling DAGs in the secondary Region.
  • Cost – This strategy has an added cost of running similar environments in two Regions at all times. With auto scaling for workers, the warm instance can maintain a minimal footprint; however, the web server and scheduler components of Amazon MWAA will remain active in the secondary environment at all times. The trade-off is significantly lower RTO.
  • Data loss – Similar to the backup and restore model, the RPO depends on the backup frequency. Faster backup cycles minimize potential data loss but can adversely affect performance of the metadata database and consequently the primary Airflow environment.
  • Ongoing management – This approach comes with some management overhead. Unlike the backup and restore strategy, any changes to the primary environment configurations need to be manually reapplied to the secondary environment to keep the two environments in sync. Automated synchronization of the secondary environment configurations is a future work.

Shared considerations

Although the backup and restore and warm standby strategies differ in their implementation, they share some common considerations:

  • Periodically test failover to validate recovery procedures, RTO, and RPO.
  • Enable Amazon MWAA environment logging to help debug issues during failover.
  • Use the AWS CDK or AWS CloudFormation to manage the infrastructure definition. For more details, see the following GitHub repo or Quick start tutorial for Amazon Managed Workflows for Apache Airflow, respectively.
  • Automate deployments of environment configurations and disaster recovery workflows through CI/CD pipelines.
  • Monitor key CloudWatch metrics like SchedulerHeartbeat to detect primary environment failures.

Conclusion

In this series, we discussed how backup and restore and warm standby strategies offer configurable data protection based on your RTO, RPO, and cost requirements. Both use periodic metadata replication and restoration to minimize the area of effect of Regional outages.

Which strategy resonates more with your use case? Feel free to try out our solution and share any feedback or questions in the comments section!


About the Authors

Chandan RupakhetiChandan Rupakheti is a Senior Solutions Architect at AWS. His main focus at AWS lies in the intersection of Analytics, Serverless, and AdTech services. He is a passionate technical leader, researcher, and mentor with a knack for building innovative solutions in the cloud. Outside of his professional life, he loves spending time with his family and friends besides listening and playing music.

Parnab Basak is a Senior Solutions Architect and a Serverless Specialist at AWS. He specializes in creating new solutions that are cloud native using modern software development practices like serverless, DevOps, and analytics. Parnab works closely in the analytics and integration services space helping customers adopt AWS services for their workflow orchestration needs.

Introducing Amazon MWAA support for the Airflow REST API and web server auto scaling

Post Syndicated from Mansi Bhutada original https://aws.amazon.com/blogs/big-data/introducing-amazon-mwaa-support-for-the-airflow-rest-api-and-web-server-auto-scaling/

Apache Airflow is a popular platform for enterprises looking to orchestrate complex data pipelines and workflows. Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed service that streamlines the setup and operation of secure and highly available Airflow environments in the cloud.

In this post, we’re excited to introduce two new features that address common customer challenges and unlock new possibilities for building robust, scalable, and flexible data orchestration solutions using Amazon MWAA. First, the Airflow REST API support enables programmatic interaction with Airflow resources like connections, Directed Acyclic Graphs (DAGs), DAGRuns, and Task instances. Second, the option to horizontally scale web server capacity helps you handle increased demand, whether from REST API requests, command line interface (CLI) usage, or more concurrent Airflow UI users. Both features are available for all actively supported Amazon MWAA versions, including version 2.4.3 and newer.

Airflow REST API support

A frequently requested feature from Amazon MWAA customers has been the ability to interact with their workflows programmatically using Airflow’s APIs. The introduction of REST API support in Amazon MWAA addresses this need, providing a standardized way to access and manage your Airflow environment. With the new REST API, you can now invoke DAG runs, manage datasets, or get the status of Airflow’s metadata database, trigger, and scheduler—all without relying on the Airflow web UI or CLI.

Another example is building monitoring dashboards that aggregate the status of your DAGs across multiple Amazon MWAA environments, or invoke workflows in response to events from external systems, such as completed database jobs or new user signups.

This feature opens up a world of possibilities for integrating your Amazon MWAA environments with other systems and building custom solutions that use the power of your data orchestration pipelines.

To demonstrate this new capability, we use the REST API to invoke a new DAG run. Follow the process detailed in the following sections.

Authenticate with the Airflow REST API

For a user to authenticate with the REST API, they need the necessary permissions to create a web login token, similar to how it works with the Airflow UI. Refer to Creating an Apache Airflow web login token for more details. The user’s AWS Identity and Access Management (IAM) role or policy must include the CreateWebLoginToken permission to generate a token for authenticating. Furthermore, the user’s permissions for interacting with the REST API are determined by the Airflow role assigned to them within Amazon MWAA. The Airflow roles govern the user’s ability to perform various operations, such as invoking DAG runs, checking statuses, or modifying configurations, through the REST API endpoints.

The following is an example of the authentication process:

def get_session_info(region, env_name):
    """
    Retrieves the web server hostname and session cookie for an MWAA environment.
    
    Args:
        region (str): The AWS region where the MWAA environment is located.
        env_name (str): The name of the MWAA environment.

    Returns:
        tuple: A tuple containing the web server hostname and session cookie, or (None, None) on failure.
    """

    logging.basicConfig(level=logging.INFO)

    try:
        # Initialize MWAA client and request a web login token
        mwaa = boto3.client('mwaa', region_name=region)
        response = mwaa.create_web_login_token(Name=env_name)
        
        # Extract the web server hostname and login token
        web_server_host_name = response["WebServerHostname"]
        web_token = response["WebToken"]
        
        # Construct the URL needed for authentication 
        login_url = f"https://{web_server_host_name}/aws_mwaa/login"
        login_payload = {"token": web_token}

        # Make a POST request to the MWAA login url using the login payload
        response = requests.post(
            login_url,
            data=login_payload,
            timeout=10
        )

        # Check if login was succesfull 
        if response.status_code == 200:
        
            # Return the hostname and the session cookie 
            return (
                web_server_host_name,
                response.cookies["session"]
            )
        else:
            # Log an error
            logging.error("Failed to log in: HTTP %d", response.status_code)
            return None
    except requests.RequestException as e:
         # Log any exceptions raised during the request to the MWAA login endpoint
        logging.error("Request failed: %s", str(e))
        return None
    except Exception as e:
        # Log any other unexpected exceptions
        logging.error("An unexpected error occurred: %s", str(e))
        return None

The get_session_info function uses the AWS SDK for Python (Boto3) and the python request library for the initial steps required for authentication, retrieving a web token and a session cookie, which is valid for 12 hours. These will be used for subsequent REST API requests.

Invoke the Airflow REST API endpoint

When authentication is complete, you have the credentials to start sending requests to the API endpoints. In the following example, we use the endpoint /dags/{dag_id}/dagRuns to initiate a DAG run:

def trigger_dag(region, env_name, dag_name):
    """
    Triggers a DAG in a specified MWAA environment using the Airflow REST API.

    Args:
    region (str): AWS region where the MWAA environment is hosted.
    env_name (str): Name of the MWAA environment.
    dag_name (str): Name of the DAG to trigger.
    """

    logging.info(f"Attempting to trigger DAG {dag_name} in environment {env_name} at region {region}")

    # Retrieve the web server hostname and session cookie for authentication
    try:
        web_server_host_name, session_cookie = get_session_info(region, env_name)
        if not session_cookie:
            logging.error("Authentication failed, no session cookie retrieved.")
            return
    except Exception as e:
        logging.error(f"Error retrieving session info: {str(e)}")
        return

    # Prepare headers and payload for the request
    cookies = {"session": session_cookie}
    json_body = {"conf": {}}

    # Construct the URL for triggering the DAG
    url = f"https://{web_server_host_name}/api/v1/dags/{dag_id}/dagRuns"

    # Send the POST request to trigger the DAG
    try:
        response = requests.post(url, cookies=cookies, json=json_body)
        # Check the response status code to determine if the DAG was triggered successfully
        if response.status_code == 200:
            logging.info("DAG triggered successfully.")
        else:
            logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}")
    except requests.RequestException as e:
        logging.error(f"Request to trigger DAG failed: {str(e)}")

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)

    # Check if the correct number of arguments is provided
    if len(sys.argv) != 4:
        logging.error("Incorrect usage. Proper format: python script_name.py <region> <env_name> <dag_name>")
        sys.exit(1)

    region = sys.argv[1]
    env_name = sys.argv[2]
    dag_name = sys.argv[3]

    # Trigger the DAG with the provided arguments
    trigger_dag(region, env_name, dag_name)

The following is the complete code of trigger_dag.py:

import sys
import boto3
import requests
import logging

def get_session_info(region, env_name):

    """
    Retrieves the web server hostname and session cookie for an MWAA environment.
    
    Args:
        region (str): The AWS region where the MWAA environment is located.
        env_name (str): The name of the MWAA environment.

    Returns:
        tuple: A tuple containing the web server hostname and session cookie, or (None, None) on failure.
    """

    logging.basicConfig(level=logging.INFO)

    try:
        # Initialize MWAA client and request a web login token
        mwaa = boto3.client('mwaa', region_name=region)
        response = mwaa.create_web_login_token(Name=env_name)
        
        # Extract the web server hostname and login token
        web_server_host_name = response["WebServerHostname"]
        web_token = response["WebToken"]
        
        # Construct the URL needed for authentication 
        login_url = f"https://{web_server_host_name}/aws_mwaa/login"
        login_payload = {"token": web_token}

        # Make a POST request to the MWAA login url using the login payload
        response = requests.post(
            login_url,
            data=login_payload,
            timeout=10
        )

        # Check if login was succesfull 
        if response.status_code == 200:
        
            # Return the hostname and the session cookie 
            return (
                web_server_host_name,
                response.cookies["session"]
            )
        else:
            # Log an error
            logging.error("Failed to log in: HTTP %d", response.status_code)
            return None
    except requests.RequestException as e:
         # Log any exceptions raised during the request to the MWAA login endpoint
        logging.error("Request failed: %s", str(e))
        return None
    except Exception as e:
        # Log any other unexpected exceptions
        logging.error("An unexpected error occurred: %s", str(e))
        return None

def trigger_dag(region, env_name, dag_name):
    """
    Triggers a DAG in a specified MWAA environment using the Airflow REST API.

    Args:
    region (str): AWS region where the MWAA environment is hosted.
    env_name (str): Name of the MWAA environment.
    dag_name (str): Name of the DAG to trigger.
    """

    logging.info(f"Attempting to trigger DAG {dag_name} in environment {env_name} at region {region}")

    # Retrieve the web server hostname and session cookie for authentication
    try:
        web_server_host_name, session_cookie = get_session_info(region, env_name)
        if not session_cookie:
            logging.error("Authentication failed, no session cookie retrieved.")
            return
    except Exception as e:
        logging.error(f"Error retrieving session info: {str(e)}")
        return

    # Prepare headers and payload for the request
    cookies = {"session": session_cookie}
    json_body = {"conf": {}}

    # Construct the URL for triggering the DAG
    url = f"https://{web_server_host_name}/api/v1/dags/{dag_name}/dagRuns"

    # Send the POST request to trigger the DAG
    try:
        response = requests.post(url, cookies=cookies, json=json_body)
        # Check the response status code to determine if the DAG was triggered successfully
        if response.status_code == 200:
            logging.info("DAG triggered successfully.")
        else:
            logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}")
    except requests.RequestException as e:
        logging.error(f"Request to trigger DAG failed: {str(e)}")

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)

    # Check if the correct number of arguments is provided
    if len(sys.argv) != 4:
        logging.error("Incorrect usage. Proper format: python script_name.py <region> <env_name> <dag_name>")
        sys.exit(1)

    region = sys.argv[1]
    env_name = sys.argv[2]
    dag_name = sys.argv[3]

    # Trigger the DAG with the provided arguments
    trigger_dag(region, env_name, dag_name)

Run the request script

Run the request script with the following code, providing your AWS Region, Amazon MWAA environment name, and DAG name:

python3 trigger_dag.py <region> <env_name> <dag_name>

Validate the API result

The following screenshot shows the result in the CLI.

Check the DAG run in the Airflow UI

The following screenshot shows the DAG run status in the Airflow UI.

You can use any other endpoint in the REST API to enable programmatic control, automation, integration, and management of Airflow workflows and resources. To learn more about the Airflow REST API and its various endpoints, refer to the Airflow documentation.

Web server auto scaling

Another key request from Amazon MWAA customers has been the ability to dynamically scale their web servers to handle fluctuating workloads. Previously, you were constrained by two web servers provided with an Airflow environment on Amazon MWAA and had no way to horizontally scale web server capacity, which could lead to performance issues during peak loads. The new web server auto scaling feature in Amazon MWAA solves this problem. By automatically scaling the number of web servers based on CPU utilization and active connection count, Amazon MWAA makes sure your Airflow environment can seamlessly accommodate increased demand, whether from REST API requests, CLI usage, or more concurrent Airflow UI users.

Set up web server auto scaling

To set up auto scaling for your Amazon MWAA environment web servers, follow these steps:

  1. On the Amazon MWAA console, navigate to the environment you want to configure auto scaling for.
  2. Choose Edit.
  3. Choose Next.
  4. On the Configure advanced settings page, in the Environment class section, add the maximum and minimum web server count. For this example, we set the upper limit to 5 and lower limit to 2.

These settings allow Amazon MWAA to automatically scale up the Airflow web server when demand increases and scale down conservatively when demand decreases, optimizing resource usage and cost.

Trigger auto scaling programmatically

After you configure auto scaling, you might want to test how it behaves under simulated conditions. Using the Python code structure we discussed earlier for invoking a DAG, you can also use the Airflow REST API to simulate a load test and see how well your auto scaling setup responds. For the purpose of load testing, we have configured our Amazon MWAA environment with an mw1.small instance class. The following is an example implementation using load_test.py:

import sys
import time
import boto3
import requests
import logging
import concurrent.futures

def get_session_info(region, env_name):
    """
    Retrieves the web server hostname and session cookie for an MWAA environment.
    
    Args:
        region (str): The AWS region where the MWAA environment is located.
        env_name (str): The name of the MWAA environment.

    Returns:
        tuple: A tuple containing the web server hostname and session cookie, or (None, None) on failure.
    """
    try:
        # Create an MWAA client in the specified region
        mwaa = boto3.client('mwaa', region_name=region)
        # Request a web login token for the specified environment
        response = mwaa.create_web_login_token(Name=env_name)
        web_server_host_name = response["WebServerHostname"]
        web_token = response["WebToken"]

        # Construct the login URL and payload for authentication
        login_url = f"https://{web_server_host_name}/aws_mwaa/login"
        login_payload = {"token": web_token}

        # Authenticate and obtain the session cookie
        response = requests.post(login_url, data=login_payload, timeout=10)
        if response.status_code == 200:
            return web_server_host_name, response.cookies["session"]
        else:
            logging.error(f"Failed to log in: HTTP {response.status_code}")
            return None, None
    except requests.RequestException as e:
        logging.error(f"Request failed: {e}")
        return None, None
    except Exception as e:
        logging.error(f"An unexpected error occurred: {e}")
        return None, None
    
def call_rest_api(web_server_host_name, session_cookie):
    """
    Calls the Airflow web server API to fetch details of all DAGs and measures the time taken for the call.

    Args:
        web_server_host_name (str): The hostname of the MWAA web server.
        session_cookie (str): The session cookie for authentication.
    """
    # Define the endpoint for fetching all DAGs
    url = f"https://{web_server_host_name}/api/v1/dags"
    headers = {'Content-Type': 'application/json', 'Cookie': f'session={session_cookie}'}

    try:
        start_time = time.time()
        response = requests.get(url, headers=headers)
        elapsed_time = time.time() - start_time

        if response.status_code == 200:
            logging.info(f"API call successful, fetched {len(response.json()['dags'])} DAGs, took {elapsed_time:.2f} seconds")
        else:
            logging.error(f"API call failed with status code: {response.status_code}, took {elapsed_time:.2f} seconds")
    except requests.RequestException as e:
        logging.error(f"Error during API call: {e}")

def run_load_test(web_server_host_name, session_cookie, qps, duration):
    """
    Performs a load test by sending concurrent requests at a specified rate over a given duration.

    Args:
        web_server_host_name (str): The hostname of the MWAA web server.
        session_cookie (str): The session cookie for authentication.
        qps (int): Queries per second.
        duration (int): Duration of the test in seconds.
    """
    interval = 1.0 / qps
    start_time = time.time()

    with concurrent.futures.ThreadPoolExecutor(max_workers=qps) as executor:
        while time.time() - start_time < duration:
            futures = [executor.submit(call_rest_api, web_server_host_name, session_cookie) for _ in range(qps)]
            concurrent.futures.wait(futures)
            time.sleep(interval)
    
    logging.info("Load test completed.")

def main(region, env_name, qps, duration):
    """
    Main function to execute the load testing script.

    Args:
        region (str): AWS region where the MWAA environment is hosted.
        env_name (str): Name of the MWAA environment.
        qps (int): Queries per second.
        duration (int): Duration in seconds.
    """
    web_server_host_name, session_cookie = get_session_info(region, env_name)
    if not web_server_host_name or not session_cookie:
        logging.error("Failed to retrieve session information")
        return

    run_load_test(web_server_host_name, session_cookie, qps, duration)

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    if len(sys.argv) != 5:
        logging.error("Incorrect usage. Proper format: python load_test.py <region> <env_name> <qps> <duration>")
        sys.exit(1)

    region = sys.argv[1]
    env_name = sys.argv[2]
    qps = int(sys.argv[3])
    duration = int(sys.argv[4])

    main(region, env_name, qps, duration)

The Python code uses thread pooling and concurrency concepts to help test the auto scaling performance of your web server by simulating traffic. This script automates the process of sending a specific number of requests per second to your web server, enabling you to trigger an auto scaling event.

You can use the following command to run the script. You have to provide the Region, Amazon MWAA environment name, how many queries per seconds you want to run against the web server, and the duration for which you want the load test to run.

python load_test.py <region> <env_name> <qps> <duration>

For example:

python load_test.py us_west_2 MyMWAAEnvironment 10 1080

The preceding command will run 10 queries per second for 18 minutes.

When the script is running, you will start seeing rows that show how long (in seconds) it took for the web server to process the request.

This time will gradually start to increase. As active connection count or CPU usage increase, Amazon MWAA will dynamically scale the web servers to accommodate the load.

As new web servers come online, your environment will be able to handle increased load, and the response time will drop. Amazon MWAA provides web server container metrics in the AWS/MWAA service namespace in Amazon CloudWatch, allowing you to monitor the web server performance. The following screenshots show an example of the auto scaling event.

Recommendation

Determining the appropriate minimum and maximum web server count involves carefully considering your typical workload patterns, performance requirements, and cost constrains. To set these values, consider metrics like the required REST API throughput at peak times and the maximum number of concurrent UI users you expect to have. It’s important to note that Amazon MWAA can support up to 10 queries per second (QPS) for the Airflow REST API at full scale for any environment size, provided you follow the recommended number of DAGs.

Amazon MWAA integration with CloudWatch provides granular metrics and monitoring capabilities to help you find the optimal configuration for your specific use case. If you anticipate periods of consistently high demand or increased workloads for an extended duration, you can configure your Amazon MWAA environment to maintain a higher minimum number of web servers. By setting the minimum web server setting to 2 or more, you can make sure your environment always has sufficient capacity to handle load peaks without needing to wait for auto scaling to provision additional resources. This comes at the cost of running more web server instances, which is a trade-off between cost-optimization and responsiveness.

Conclusion

Today, we are announcing the availability of the Airflow REST API and web server auto scaling in Amazon MWAA. The REST API provides a standardized way to programmatically interact with and manage resources in your Amazon MWAA environments. This enables seamless integration, automation, and extensibility of Amazon MWAA within your organization’s existing data and application landscape. With web server auto scaling, you can automatically increase the number of web server instances based on resource utilization, and Amazon MWAA makes sure your Airflow workflows can handle fluctuating workloads without manual intervention.

These features lay the foundation for you to build more robust, scalable, and flexible data orchestration pipelines. We encourage you to use them to streamline your data engineering operations and unlock new possibilities for your business.

To start building with Amazon MWAA, see Get started with Amazon Managed Workflows for Apache Airflow.

Stay tuned for future updates and enhancements to Amazon MWAA that will continue to enhance the developer experience and unlock new opportunities for data-driven organizations.


About the Authors

Mansi Bhutada is an ISV Solutions Architect based in the Netherlands. She helps customers design and implement well-architected solutions in AWS that address their business problems. She is passionate about data analytics and networking. Beyond work, she enjoys experimenting with food, playing pickleball, and diving into fun board games.

Kartikay KhatorKartikay Khator is a Solutions Architect within the Global Life Sciences at AWS, where he dedicates his efforts to developing innovative and scalable solutions that cater to the evolving needs of customers. His expertise lies in harnessing the capabilities of AWS Analytics services. Extending beyond his professional pursuits, he finds joy and fulfillment in the world of running and hiking. Having already completed two marathons, he is currently preparing for his next marathon challenge.

Kamen SharlandjievKamen Sharlandjiev is a Sr. Big Data and ETL Solutions Architect, MWAA and AWS Glue ETL expert. He’s on a mission to make life easier for customers who are facing complex data integration and orchestration challenges. His secret weapon? Fully managed AWS services that can get the job done with minimal effort. Follow Kamen on LinkedIn to keep up to date with the latest MWAA and AWS Glue features and news!

Orchestrate an end-to-end ETL pipeline using Amazon S3, AWS Glue, and Amazon Redshift Serverless with Amazon MWAA

Post Syndicated from Radhika Jakkula original https://aws.amazon.com/blogs/big-data/orchestrate-an-end-to-end-etl-pipeline-using-amazon-s3-aws-glue-and-amazon-redshift-serverless-with-amazon-mwaa/

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow that you can use to set up and operate data pipelines in the cloud at scale. Apache Airflow is an open source tool used to programmatically author, schedule, and monitor sequences of processes and tasks, referred to as workflows. With Amazon MWAA, you can use Apache Airflow and Python to create workflows without having to manage the underlying infrastructure for scalability, availability, and security.

By using multiple AWS accounts, organizations can effectively scale their workloads and manage their complexity as they grow. This approach provides a robust mechanism to mitigate the potential impact of disruptions or failures, making sure that critical workloads remain operational. Additionally, it enables cost optimization by aligning resources with specific use cases, making sure that expenses are well controlled. By isolating workloads with specific security requirements or compliance needs, organizations can maintain the highest levels of data privacy and security. Furthermore, the ability to organize multiple AWS accounts in a structured manner allows you to align your business processes and resources according to your unique operational, regulatory, and budgetary requirements. This approach promotes efficiency, flexibility, and scalability, enabling large enterprises to meet their evolving needs and achieve their goals.

This post demonstrates how to orchestrate an end-to-end extract, transform, and load (ETL) pipeline using Amazon Simple Storage Service (Amazon S3), AWS Glue, and Amazon Redshift Serverless with Amazon MWAA.

Solution overview

For this post, we consider a use case where a data engineering team wants to build an ETL process and give the best experience to their end-users when they want to query the latest data after new raw files are added to Amazon S3 in the central account (Account A in the following architecture diagram). The data engineering team wants to separate the raw data into its own AWS account (Account B in the diagram) for increased security and control. They also want to perform the data processing and transformation work in their own account (Account B) to compartmentalize duties and prevent any unintended changes to the source raw data present in the central account (Account A). This approach allows the team to process the raw data extracted from Account A to Account B, which is dedicated for data handling tasks. This makes sure the raw and processed data can be maintained securely separated across multiple accounts, if required, for enhanced data governance and security.

Our solution uses an end-to-end ETL pipeline orchestrated by Amazon MWAA that looks for new incremental files in an Amazon S3 location in Account A, where the raw data is present. This is done by invoking AWS Glue ETL jobs and writing to data objects in a Redshift Serverless cluster in Account B. The pipeline then starts running stored procedures and SQL commands on Redshift Serverless. As the queries finish running, an UNLOAD operation is invoked from the Redshift data warehouse to the S3 bucket in Account A.

Because security is important, this post also covers how to configure an Airflow connection using AWS Secrets Manager to avoid storing database credentials within Airflow connections and variables.

The following diagram illustrates the architectural overview of the components involved in the orchestration of the workflow.

The workflow consists of the following components:

  • The source and target S3 buckets are in a central account (Account A), whereas Amazon MWAA, AWS Glue, and Amazon Redshift are in a different account (Account B). Cross-account access has been set up between S3 buckets in Account A with resources in Account B to be able to load and unload data.
  • In the second account, Amazon MWAA is hosted in one VPC and Redshift Serverless in a different VPC, which are connected through VPC peering. A Redshift Serverless workgroup is secured inside private subnets across three Availability Zones.
  • Secrets like user name, password, DB port, and AWS Region for Redshift Serverless are stored in Secrets Manager.
  • VPC endpoints are created for Amazon S3 and Secrets Manager to interact with other resources.
  • Usually, data engineers create an Airflow Directed Acyclic Graph (DAG) and commit their changes to GitHub. With GitHub actions, they are deployed to an S3 bucket in Account B (for this post, we upload the files into S3 bucket directly). The S3 bucket stores Airflow-related files like DAG files, requirements.txt files, and plugins. AWS Glue ETL scripts and assets are stored in another S3 bucket. This separation helps maintain organization and avoid confusion.
  • The Airflow DAG uses various operators, sensors, connections, tasks, and rules to run the data pipeline as needed.
  • The Airflow logs are logged in Amazon CloudWatch, and alerts can be configured for monitoring tasks. For more information, see Monitoring dashboards and alarms on Amazon MWAA.

Prerequisites

Because this solution centers around using Amazon MWAA to orchestrate the ETL pipeline, you need to set up certain foundational resources across accounts beforehand. Specifically, you need to create the S3 buckets and folders, AWS Glue resources, and Redshift Serverless resources in their respective accounts prior to implementing the full workflow integration using Amazon MWAA.

Deploy resources in Account A using AWS CloudFormation

In Account A, launch the provided AWS CloudFormation stack to create the following resources:

  • The source and target S3 buckets and folders. As a best practice, the input and output bucket structures are formatted with hive style partitioning as s3://<bucket>/products/YYYY/MM/DD/.
  • A sample dataset called products.csv, which we use in this post.

Upload the AWS Glue job to Amazon S3 in Account B

In Account B, create an Amazon S3 location called aws-glue-assets-<account-id>-<region>/scripts (if not present). Replace the parameters for the account ID and Region in the sample_glue_job.py script and upload the AWS Glue job file to the Amazon S3 location.

Deploy resources in Account B using AWS CloudFormation

In Account B, launch the provided CloudFormation stack template to create the following resources:

  • The S3 bucket airflow-<username>-bucket to store Airflow-related files with the following structure:
    • dags – The folder for DAG files.
    • plugins – The file for any custom or community Airflow plugins.
    • requirements – The requirements.txt file for any Python packages.
    • scripts – Any SQL scripts used in the DAG.
    • data – Any datasets used in the DAG.
  • A Redshift Serverless environment. The name of the workgroup and namespace are prefixed with sample.
  • An AWS Glue environment, which contains the following:
    • An AWS Glue crawler, which crawls the data from the S3 source bucket sample-inp-bucket-etl-<username> in Account A.
    • A database called products_db in the AWS Glue Data Catalog.
    • An ELT job called sample_glue_job. This job can read files from the products table in the Data Catalog and load data into the Redshift table products.
  • A VPC gateway endpointto Amazon S3.
  • An Amazon MWAA environment. For detailed steps to create an Amazon MWAA environment using the Amazon MWAA console, refer to Introducing Amazon Managed Workflows for Apache Airflow (MWAA).

launch stack 1

Create Amazon Redshift resources

Create two tables and a stored procedure on an Redshift Serverless workgroup using the products.sql file.

In this example, we create two tables called products and products_f. The name of the stored procedure is sp_products.

Configure Airflow permissions

After the Amazon MWAA environment is created successfully, the status will show as Available. Choose Open Airflow UI to view the Airflow UI. DAGs are automatically synced from the S3 bucket and visible in the UI. However, at this stage, there are no DAGs in the S3 folder.

Add the customer managed policy AmazonMWAAFullConsoleAccess, which grants Airflow users permissions to access AWS Identity and Access Management (IAM) resources, and attach this policy to the Amazon MWAA role. For more information, see Accessing an Amazon MWAA environment.

The policies attached to the Amazon MWAA role have full access and must only be used for testing purposes in a secure test environment. For production deployments, follow the least privilege principle.

Set up the environment

This section outlines the steps to configure the environment. The process involves the following high-level steps:

  1. Update any necessary providers.
  2. Set up cross-account access.
  3. Establish a VPC peering connection between the Amazon MWAA VPC and Amazon Redshift VPC.
  4. Configure Secrets Manager to integrate with Amazon MWAA.
  5. Define Airflow connections.

Update the providers

Follow the steps in this section if your version of Amazon MWAA is less than 2.8.1 (the latest version as of writing this post).

Providers are packages that are maintained by the community and include all the core operators, hooks, and sensors for a given service. The Amazon provider is used to interact with AWS services like Amazon S3, Amazon Redshift Serverless, AWS Glue, and more. There are over 200 modules within the Amazon provider.

Although the version of Airflow supported in Amazon MWAA is 2.6.3, which comes bundled with the Amazon provided package version 8.2.0, support for Amazon Redshift Serverless was not added until the Amazon provided package version 8.4.0. Because the default bundled provider version is older than when Redshift Serverless support was introduced, the provider version must be upgraded in order to use that functionality.

The first step is to update the constraints file and requirements.txt file with the correct versions. Refer to Specifying newer provider packages for steps to update the Amazon provider package.

  1. Specify the requirements as follows:
    --constraint "/usr/local/airflow/dags/constraints-3.10-mod.txt"
    apache-airflow-providers-amazon==8.4.0

  2. Update the version in the constraints file to 8.4.0 or higher.
  3. Add the constraints-3.11-updated.txt file to the /dags folder.

Refer to Apache Airflow versions on Amazon Managed Workflows for Apache Airflow for correct versions of the constraints file depending on the Airflow version.

  1. Navigate to the Amazon MWAA environment and choose Edit.
  2. Under DAG code in Amazon S3, for Requirements file, choose the latest version.
  3. Choose Save.

This will update the environment and new providers will be in effect.

  1. To verify the providers version, go to Providers under the Admin table.

The version for the Amazon provider package should be 8.4.0, as shown in the following screenshot. If not, there was an error while loading requirements.txt. To debug any errors, go to the CloudWatch console and open the requirements_install_ip log in Log streams, where errors are listed. Refer to Enabling logs on the Amazon MWAA console for more details.

Set up cross-account access

You need to set up cross-account policies and roles between Account A and Account B to access the S3 buckets to load and unload data. Complete the following steps:

  1. In Account A, configure the bucket policy for bucket sample-inp-bucket-etl-<username> to grant permissions to the AWS Glue and Amazon MWAA roles in Account B for objects in bucket sample-inp-bucket-etl-<username>:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "AWS": [
                        "arn:aws:iam::<account-id-of- AcctB>:role/service-role/<Glue-role>",
                        "arn:aws:iam::<account-id-of-AcctB>:role/service-role/<MWAA-role>"
                    ]
                },
                "Action": [
                    "s3:GetObject",
    "s3:PutObject",
    		   "s3:PutObjectAcl",
    		   "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-inp-bucket-etl-<username>/*",
                    "arn:aws:s3:::sample-inp-bucket-etl-<username>"
                ]
            }
        ]
    }
    

  2. Similarly, configure the bucket policy for bucket sample-opt-bucket-etl-<username> to grant permissions to Amazon MWAA roles in Account B to put objects in this bucket:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "AWS": "arn:aws:iam::<account-id-of-AcctB>:role/service-role/<MWAA-role>"
                },
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:PutObjectAcl",
                    "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>/*",
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>"
                ]
            }
        ]
    }
    

  3. In Account A, create an IAM policy called policy_for_roleA, which allows necessary Amazon S3 actions on the output bucket:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "VisualEditor0",
                "Effect": "Allow",
                "Action": [
                    "kms:Decrypt",
                    "kms:Encrypt",
                    "kms:GenerateDataKey"
                ],
                "Resource": [
                    "<KMS_KEY_ARN_Used_for_S3_encryption>"
                ]
            },
            {
                "Sid": "VisualEditor1",
                "Effect": "Allow",
                "Action": [
                    "s3:PutObject",
                    "s3:GetObject",
                    "s3:GetBucketAcl",
                    "s3:GetBucketCors",
                    "s3:GetEncryptionConfiguration",
                    "s3:GetBucketLocation",
                    "s3:ListAllMyBuckets",
                    "s3:ListBucket",
                    "s3:ListBucketMultipartUploads",
                    "s3:ListBucketVersions",
                    "s3:ListMultipartUploadParts"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>",
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>/*"
                ]
            }
        ]
    }

  4. Create a new IAM role called RoleA with Account B as the trusted entity role and add this policy to the role. This allows Account B to assume RoleA to perform necessary Amazon S3 actions on the output bucket.
  5. In Account B, create an IAM policy called s3-cross-account-access with permission to access objects in the bucket sample-inp-bucket-etl-<username>, which is in Account A.
  6. Add this policy to the AWS Glue role and Amazon MWAA role:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:PutObjectAcl"
                ],
                "Resource": "arn:aws:s3:::sample-inp-bucket-etl-<username>/*"
            }
        ]
    }

  7. In Account B, create the IAM policy policy_for_roleB specifying Account A as a trusted entity. The following is the trust policy to assume RoleA in Account A:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "CrossAccountPolicy",
                "Effect": "Allow",
                "Action": "sts:AssumeRole",
                "Resource": "arn:aws:iam::<account-id-of-AcctA>:role/RoleA"
            }
        ]
    }

  8. Create a new IAM role called RoleB with Amazon Redshift as the trusted entity type and add this policy to the role. This allows RoleB to assume RoleA in Account A and also to be assumable by Amazon Redshift.
  9. Attach RoleB to the Redshift Serverless namespace, so Amazon Redshift can write objects to the S3 output bucket in Account A.
  10. Attach the policy policy_for_roleB to the Amazon MWAA role, which allows Amazon MWAA to access the output bucket in Account A.

Refer to How do I provide cross-account access to objects that are in Amazon S3 buckets? for more details on setting up cross-account access to objects in Amazon S3 from AWS Glue and Amazon MWAA. Refer to How do I COPY or UNLOAD data from Amazon Redshift to an Amazon S3 bucket in another account? for more details on setting up roles to unload data from Amazon Redshift to Amazon S3 from Amazon MWAA.

Set up VPC peering between the Amazon MWAA and Amazon Redshift VPCs

Because Amazon MWAA and Amazon Redshift are in two separate VPCs, you need to set up VPC peering between them. You must add a route to the route tables associated with the subnets for both services. Refer to Work with VPC peering connections for details on VPC peering.

Make sure that CIDR range of the Amazon MWAA VPC is allowed in the Redshift security group and the CIDR range of the Amazon Redshift VPC is allowed in the Amazon MWAA security group, as shown in the following screenshot.

If any of the preceding steps are configured incorrectly, you are likely to encounter a “Connection Timeout” error in the DAG run.

Configure the Amazon MWAA connection with Secrets Manager

When the Amazon MWAA pipeline is configured to use Secrets Manager, it will first look for connections and variables in an alternate backend (like Secrets Manager). If the alternate backend contains the needed value, it is returned. Otherwise, it will check the metadata database for the value and return that instead. For more details, refer to Configuring an Apache Airflow connection using an AWS Secrets Manager secret.

Complete the following steps:

  1. Configure a VPC endpoint to link Amazon MWAA and Secrets Manager (com.amazonaws.us-east-1.secretsmanager).

This allows Amazon MWAA to access credentials stored in Secrets Manager.

  1. To provide Amazon MWAA with permission to access Secrets Manager secret keys, add the policy called SecretsManagerReadWrite to the IAM role of the environment.
  2. To create the Secrets Manager backend as an Apache Airflow configuration option, go to the Airflow configuration options, add the following key-value pairs, and save your settings.

This configures Airflow to look for connection strings and variables at the airflow/connections/* and airflow/variables/* paths:

secrets.backend: airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend secrets.backend_kwargs: {"connections_prefix" : "airflow/connections", "variables_prefix" : "airflow/variables"}

  1. To generate an Airflow connection URI string, go to AWS CloudShell and enter into a Python shell.
  2. Run the following code to generate the connection URI string:
    import urllib.parse
    conn_type = 'redshift'
    host = 'sample-workgroup.<account-id-of-AcctB>.us-east-1.redshift-serverless.amazonaws.com' #Specify the Amazon Redshift workgroup endpoint
    port = '5439'
    login = 'admin' #Specify the username to use for authentication with Amazon Redshift
    password = '<password>' #Specify the password to use for authentication with Amazon Redshift
    role_arn = urllib.parse.quote_plus('arn:aws:iam::<account_id>:role/service-role/<MWAA-role>')
    database = 'dev'
    region = 'us-east-1' #YOUR_REGION
    conn_string = '{0}://{1}:{2}@{3}:{4}?role_arn={5}&database={6}&region={7}'.format(conn_type, login, password, host, port, role_arn, database, region)
    print(conn_string)
    

The connection string should be generated as follows:

redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA role ARN>&database=dev&region=<region>
  1. Add the connection in Secrets Manager using the following command in the AWS Command Line Interface (AWS CLI).

This can also be done from the Secrets Manager console. This will be added in Secrets Manager as plaintext.

aws secretsmanager create-secret --name airflow/connections/secrets_redshift_connection --description "Apache Airflow to Redshift Cluster" --secret-string "redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA role ARN>&database=dev&region=us-east-1" --region=us-east-1

Use the connection airflow/connections/secrets_redshift_connection in the DAG. When the DAG is run, it will look for this connection and retrieve the secrets from Secrets Manager. In case of RedshiftDataOperator, pass the secret_arn as a parameter instead of connection name.

You can also add secrets using the Secrets Manager console as key-value pairs.

  1. Add another secret in Secrets Manager in and save it as airflow/connections/redshift_conn_test.

Create an Airflow connection through the metadata database

You can also create connections in the UI. In this case, the connection details will be stored in an Airflow metadata database. If the Amazon MWAA environment is not configured to use the Secrets Manager backend, it will check the metadata database for the value and return that. You can create an Airflow connection using the UI, AWS CLI, or API. In this section, we show how to create a connection using the Airflow UI.

  1. For Connection Id, enter a name for the connection.
  2. For Connection Type, choose Amazon Redshift.
  3. For Host, enter the Redshift endpoint (without port and database) for Redshift Serverless.
  4. For Database, enter dev.
  5. For User, enter your admin user name.
  6. For Password, enter your password.
  7. For Port, use port 5439.
  8. For Extra, set the region and timeout parameters.
  9. Test the connection, then save your settings.

Create and run a DAG

In this section, we describe how to create a DAG using various components. After you create and run the DAG, you can verify the results by querying Redshift tables and checking the target S3 buckets.

Create a DAG

In Airflow, data pipelines are defined in Python code as DAGs. We create a DAG that consists of various operators, sensors, connections, tasks, and rules:

  • The DAG starts with looking for source files in the S3 bucket sample-inp-bucket-etl-<username> under Account A for the current day using S3KeySensor. S3KeySensor is used to wait for one or multiple keys to be present in an S3 bucket.
    • For example, our S3 bucket is partitioned as s3://bucket/products/YYYY/MM/DD/, so our sensor should check for folders with the current date. We derived the current date in the DAG and passed this to S3KeySensor, which looks for any new files in the current day folder.
    • We also set wildcard_match as True, which enables searches on bucket_key to be interpreted as a Unix wildcard pattern. Set the mode to reschedule so that the sensor task frees the worker slot when the criteria is not met and it’s rescheduled at a later time. As a best practice, use this mode when poke_interval is more than 1 minute to prevent too much load on a scheduler.
  • After the file is available in the S3 bucket, the AWS Glue crawler runs using GlueCrawlerOperator to crawl the S3 source bucket sample-inp-bucket-etl-<username> under Account A and updates the table metadata under the products_db database in the Data Catalog. The crawler uses the AWS Glue role and Data Catalog database that were created in the previous steps.
  • The DAG uses GlueCrawlerSensor to wait for the crawler to complete.
  • When the crawler job is complete, GlueJobOperator is used to run the AWS Glue job. The AWS Glue script name (along with location) and is passed to the operator along with the AWS Glue IAM role. Other parameters like GlueVersion, NumberofWorkers, and WorkerType are passed using the create_job_kwargs parameter.
  • The DAG uses GlueJobSensor to wait for the AWS Glue job to complete. When it’s complete, the Redshift staging table products will be loaded with data from the S3 file.
  • You can connect to Amazon Redshift from Airflow using three different operators:
    • PythonOperator.
    • SQLExecuteQueryOperator, which uses a PostgreSQL connection and redshift_default as the default connection.
    • RedshiftDataOperator, which uses the Redshift Data API and aws_default as the default connection.

In our DAG, we use SQLExecuteQueryOperator and RedshiftDataOperator to show how to use these operators. The Redshift stored procedures are run RedshiftDataOperator. The DAG also runs SQL commands in Amazon Redshift to delete the data from the staging table using SQLExecuteQueryOperator.

Because we configured our Amazon MWAA environment to look for connections in Secrets Manager, when the DAG runs, it retrieves the Redshift connection details like user name, password, host, port, and Region from Secrets Manager. If the connection is not found in Secrets Manager, the values are retrieved from the default connections.

In SQLExecuteQueryOperator, we pass the connection name that we created in Secrets Manager. It looks for airflow/connections/secrets_redshift_connection and retrieves the secrets from Secrets Manager. If Secrets Manager is not set up, the connection created manually (for example, redshift-conn-id) can be passed.

In RedshiftDataOperator, we pass the secret_arn of the airflow/connections/redshift_conn_test connection created in Secrets Manager as a parameter.

  • As final task, RedshiftToS3Operator is used to unload data from the Redshift table to an S3 bucket sample-opt-bucket-etl in Account B. airflow/connections/redshift_conn_test from Secrets Manager is used for unloading the data.
  • TriggerRule is set to ALL_DONE, which enables the next step to run after all upstream tasks are complete.
  • The dependency of tasks is defined using the chain() function, which allows for parallel runs of tasks if needed. In our case, we want all tasks to run in sequence.

The following is the complete DAG code. The dag_id should match the DAG script name, otherwise it won’t be synced into the Airflow UI.

from datetime import datetime
from airflow import DAG 
from airflow.decorators import task
from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator
from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
from airflow.providers.amazon.aws.sensors.glue_crawler import GlueCrawlerSensor
from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator
from airflow.utils.trigger_rule import TriggerRule


dag_id = "data_pipeline"
vYear = datetime.today().strftime("%Y")
vMonth = datetime.today().strftime("%m")
vDay = datetime.today().strftime("%d")
src_bucket_name = "sample-inp-bucket-etl-<username>"
tgt_bucket_name = "sample-opt-bucket-etl-<username>"
s3_folder="products"
#Please replace the variable with the glue_role_arn
glue_role_arn_key = "arn:aws:iam::<account_id>:role/<Glue-role>"
glue_crawler_name = "products"
glue_db_name = "products_db"
glue_job_name = "sample_glue_job"
glue_script_location="s3://aws-glue-assets-<account_id>-<region>/scripts/sample_glue_job.py"
workgroup_name = "sample-workgroup"
redshift_table = "products_f"
redshift_conn_id_name="secrets_redshift_connection"
db_name = "dev"
secret_arn="arn:aws:secretsmanager:us-east-1:<account_id>:secret:airflow/connections/redshift_conn_test-xxxx"
poll_interval = 10

@task
def get_role_name(arn: str) -> str:
    return arn.split("/")[-1]

@task
def get_s3_loc(s3_folder: str) -> str:
    s3_loc  = s3_folder + "/year=" + vYear + "/month=" + vMonth + "/day=" + vDay + "/*.csv"
    return s3_loc

with DAG(
    dag_id=dag_id,
    schedule="@once",
    start_date=datetime(2021, 1, 1),
    tags=["example"],
    catchup=False,
) as dag:
    role_arn = glue_role_arn_key
    glue_role_name = get_role_name(role_arn)
    s3_loc = get_s3_loc(s3_folder)


    # Check for new incremental files in S3 source/input bucket
    sensor_key = S3KeySensor(
        task_id="sensor_key",
        bucket_key=s3_loc,
        bucket_name=src_bucket_name,
        wildcard_match=True,
        #timeout=18*60*60,
        #poke_interval=120,
        timeout=60,
        poke_interval=30,
        mode="reschedule"
    )

    # Run Glue crawler
    glue_crawler_config = {
        "Name": glue_crawler_name,
        "Role": role_arn,
        "DatabaseName": glue_db_name,
    }

    crawl_s3 = GlueCrawlerOperator(
        task_id="crawl_s3",
        config=glue_crawler_config,
    )

    # GlueCrawlerOperator waits by default, setting as False to test the Sensor below.
    crawl_s3.wait_for_completion = False

    # Wait for Glue crawler to complete
    wait_for_crawl = GlueCrawlerSensor(
        task_id="wait_for_crawl",
        crawler_name=glue_crawler_name,
    )

    # Run Glue Job
    submit_glue_job = GlueJobOperator(
        task_id="submit_glue_job",
        job_name=glue_job_name,
        script_location=glue_script_location,
        iam_role_name=glue_role_name,
        create_job_kwargs={"GlueVersion": "4.0", "NumberOfWorkers": 10, "WorkerType": "G.1X"},
    )

    # GlueJobOperator waits by default, setting as False to test the Sensor below.
    submit_glue_job.wait_for_completion = False

    # Wait for Glue Job to complete
    wait_for_job = GlueJobSensor(
        task_id="wait_for_job",
        job_name=glue_job_name,
        # Job ID extracted from previous Glue Job Operator task
        run_id=submit_glue_job.output,
        verbose=True,  # prints glue job logs in airflow logs
    )

    wait_for_job.poke_interval = 5

    # Execute the Stored Procedure in Redshift Serverless using Data Operator
    execute_redshift_stored_proc = RedshiftDataOperator(
        task_id="execute_redshift_stored_proc",
        database=db_name,
        workgroup_name=workgroup_name,
        secret_arn=secret_arn,
        sql="""CALL sp_products();""",
        poll_interval=poll_interval,
        wait_for_completion=True,
    )

    # Execute the Stored Procedure in Redshift Serverless using SQL Operator
    delete_from_table = SQLExecuteQueryOperator(
        task_id="delete_from_table",
        conn_id=redshift_conn_id_name,
        sql="DELETE FROM products;",
        trigger_rule=TriggerRule.ALL_DONE,
    )

    # Unload the data from Redshift table to S3
    transfer_redshift_to_s3 = RedshiftToS3Operator(
        task_id="transfer_redshift_to_s3",
        s3_bucket=tgt_bucket_name,
        s3_key=s3_loc,
        schema="PUBLIC",
        table=redshift_table,
        redshift_conn_id=redshift_conn_id_name,
    )

    transfer_redshift_to_s3.trigger_rule = TriggerRule.ALL_DONE

    #Chain the tasks to be executed
    chain(
        sensor_key,
        crawl_s3,
        wait_for_crawl,
        submit_glue_job,
        wait_for_job,
        execute_redshift_stored_proc,
        delete_from_table,
        transfer_redshift_to_s3
        )
    

Verify the DAG run

After you create the DAG file (replace the variables in the DAG script) and upload it to the s3://sample-airflow-instance/dags folder, it will be automatically synced with the Airflow UI. All DAGs appear on the DAGs tab. Toggle the ON option to make the DAG runnable. Because our DAG is set to schedule="@once", you need to manually run the job by choosing the run icon under Actions. When the DAG is complete, the status is updated in green, as shown in the following screenshot.

In the Links section, there are options to view the code, graph, grid, log, and more. Choose Graph to visualize the DAG in a graph format. As shown in the following screenshot, each color of the node denotes a specific operator, and the color of the node outline denotes a specific status.

Verify the results

On the Amazon Redshift console, navigate to the Query Editor v2 and select the data in the products_f table. The table should be loaded and have the same number of records as S3 files.

On the Amazon S3 console, navigate to the S3 bucket s3://sample-opt-bucket-etl in Account B. The product_f files should be created under the folder structure s3://sample-opt-bucket-etl/products/YYYY/MM/DD/.

Clean up

Clean up the resources created as part of this post to avoid incurring ongoing charges:

  1. Delete the CloudFormation stacks and S3 bucket that you created as prerequisites.
  2. Delete the VPCs and VPC peering connections, cross-account policies and roles, and secrets in Secrets Manager.

Conclusion

With Amazon MWAA, you can build complex workflows using Airflow and Python without managing clusters, nodes, or any other operational overhead typically associated with deploying and scaling Airflow in production. In this post, we showed how Amazon MWAA provides an automated way to ingest, transform, analyze, and distribute data between different accounts and services within AWS. For more examples of other AWS operators, refer to the following GitHub repository; we encourage you to learn more by trying out some of these examples.


About the Authors


Radhika Jakkula is a Big Data Prototyping Solutions Architect at AWS. She helps customers build prototypes using AWS analytics services and purpose-built databases. She is a specialist in assessing wide range of requirements and applying relevant AWS services, big data tools, and frameworks to create a robust architecture.

Sidhanth Muralidhar is a Principal Technical Account Manager at AWS. He works with large enterprise customers who run their workloads on AWS. He is passionate about working with customers and helping them architect workloads for costs, reliability, performance, and operational excellence at scale in their cloud journey. He has a keen interest in data analytics as well.

Dynamic DAG generation with YAML and DAG Factory in Amazon MWAA

Post Syndicated from Jayesh Shinde original https://aws.amazon.com/blogs/big-data/dynamic-dag-generation-with-yaml-and-dag-factory-in-amazon-mwaa/

Amazon Managed Workflow for Apache Airflow (Amazon MWAA) is a managed service that allows you to use a familiar Apache Airflow environment with improved scalability, availability, and security to enhance and scale your business workflows without the operational burden of managing the underlying infrastructure. In Airflow, Directed Acyclic Graphs (DAGs) are defined as Python code. Dynamic DAGs refer to the ability to generate DAGs on the fly during runtime, typically based on some external conditions, configurations, or parameters. Dynamic DAGs helps you to create, schedule, and run tasks within a DAG based on data and configurations that may change over time.

There are various ways to introduce dynamism in Airflow DAGs (dynamic DAG generation) using environment variables and external files. One of the approaches is to use the DAG Factory YAML based configuration file method. This library aims to facilitate the creation and configuration of new DAGs by using declarative parameters in YAML. It allows default customizations and is open-source, making it simple to create and customize new functionalities.

In this post, we explore the process of creating Dynamic DAGs with YAML files, using the DAG Factory library. Dynamic DAGs offer several benefits:

  1. Enhanced code reusability – By structuring DAGs through YAML files, we promote reusable components, reducing redundancy in your workflow definitions.
  2. Streamlined maintenance – YAML-based DAG generation simplifies the process of modifying and updating workflows, ensuring smoother maintenance procedures.
  3. Flexible parameterization – With YAML, you can parameterize DAG configurations, facilitating dynamic adjustments to workflows based on varying requirements.
  4. Improved scheduler efficiency – Dynamic DAGs enable more efficient scheduling, optimizing resource allocation and enhancing overall workflow runs
  5. Enhanced scalability – YAML-driven DAGs allow for parallel runs, enabling scalable workflows capable of handling increased workloads efficiently.

By harnessing the power of YAML files and the DAG Factory library, we unleash a versatile approach to building and managing DAGs, empowering you to create robust, scalable, and maintainable data pipelines.

Overview of solution

In this post, we will use an example DAG file that is designed to process a COVID-19 data set. The workflow process involves processing an open source data set offered by WHO-COVID-19-Global. After we install the DAG-Factory Python package, we create a YAML file that has definitions of various tasks. We process the country-specific death count by passing Country as a variable, which creates individual country-based DAGs.

The following diagram illustrates the overall solution along with data flows within logical blocks.

Overview of the Solution

Prerequisites

For this walkthrough, you should have the following prerequisites:

Additionally, complete the following steps (run the setup in an AWS Region where Amazon MWAA is available):

  1. Create an Amazon MWAA environment (if you don’t have one already). If this is your first time using Amazon MWAA, refer to Introducing Amazon Managed Workflows for Apache Airflow (MWAA).

Make sure the AWS Identity and Access Management (IAM) user or role used for setting up the environment has IAM policies attached for the following permissions:

The access policies mentioned here are just for the example in this post. In a production environment, provide only the needed granular permissions by exercising least privilege principles.

  1. Create an unique (within an account) Amazon S3 bucket name while creating your Amazon MWAA environment, and create folders called dags and requirements.
    Amazon S3 Bucket
  2. Create and upload a requirements.txt file with the following content to the requirements folder. Replace {environment-version} with your environment’s version number, and {Python-version} with the version of Python that’s compatible with your environment:
    --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-{Airflow-version}/constraints-{Python-version}.txt"
    dag-factory==0.19.0
    pandas==2.1.4

Pandas is needed just for the example use case described in this post, and dag-factory is the only required plug-in. It is recommended to check the compatibility of the latest version of dag-factory with Amazon MWAA. The boto and psycopg2-binary libraries are included with the Apache Airflow v2 base install and don’t need to be specified in your requirements.txt file.

  1. Download the WHO-COVID-19-global data file to your local machine and upload it under the dags prefix of your S3 bucket.

Make sure that you are pointing to the latest AWS S3 bucket version of your requirements.txt file for the additional package installation to happen. This should typically take between 15 – 20 minutes depending on your environment configuration.

Validate the DAGs

When your Amazon MWAA environment shows as Available on the Amazon MWAA console, navigate to the Airflow UI by choosing Open Airflow UI next to your environment.

Validate the DAG

Verify the existing DAGs by navigating to the DAGs tab.

Verify the DAG

Configure your DAGs

Complete the following steps:

  1. Create empty files named dynamic_dags.yml, example_dag_factory.py and process_s3_data.py on your local machine.
  2. Edit the process_s3_data.py file and save it with following code content, then upload the file back to the Amazon S3 bucket dags folder. We are doing some basic data processing in the code:
    1. Read the file from an Amazon S3 location
    2. Rename the Country_code column as appropriate to the country.
    3. Filter data by the given country.
    4. Write the processed final data into CSV format and upload back to S3 prefix.
import boto3
import pandas as pd
import io
   
def process_s3_data(COUNTRY):
### Top level Variables replace S3_BUCKET with your bucket name ###
    s3 = boto3.client('s3')
    S3_BUCKET = "my-mwaa-assets-bucket-sfj33ddkm"
    INPUT_KEY = "dags/WHO-COVID-19-global-data.csv"
    OUTPUT_KEY = "dags/count_death"
### get csv file ###
   response = s3.get_object(Bucket=S3_BUCKET, Key=INPUT_KEY)
   status = response['ResponseMetadata']['HTTPStatusCode']
   if status == 200:
### read csv file and filter based on the country to write back ###
       df = pd.read_csv(response.get("Body"))
       df.rename(columns={"Country_code": "country"}, inplace=True)
       filtered_df = df[df['country'] == COUNTRY]
       with io.StringIO() as csv_buffer:
                   filtered_df.to_csv(csv_buffer, index=False)
                   response = s3.put_object(
                       Bucket=S3_BUCKET, Key=OUTPUT_KEY + '_' + COUNTRY + '.csv', Body=csv_buffer.getvalue()
                   )
       status = response['ResponseMetadata']['HTTPStatusCode']
       if status == 200:
           print(f"Successful S3 put_object response. Status - {status}")
       else:
           print(f"Unsuccessful S3 put_object response. Status - {status}")
   else:
       print(f"Unsuccessful S3 get_object response. Status - {status}")
  1. Edit the dynamic_dags.yml and save it with the following code content, then upload the file back to the dags folder. We are stitching various DAGs based on the country as follows:
    1. Define the default arguments that are passed to all DAGs.
    2. Create a DAG definition for individual countries by passing op_args
    3. Map the process_s3_data function with python_callable_name.
    4. Use Python Operator to process csv file data stored in Amazon S3 bucket.
    5. We have set schedule_interval as 10 minutes, but feel free to adjust this value as needed.
default:
  default_args:
    owner: "airflow"
    start_date: "2024-03-01"
    retries: 1
    retry_delay_sec: 300
  concurrency: 1
  max_active_runs: 1
  dagrun_timeout_sec: 600
  default_view: "tree"
  orientation: "LR"
  schedule_interval: "*/10 * * * *"
 
module3_dynamic_dag_Australia:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Australia"
 
module3_dynamic_dag_Brazil:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Brazil"
 
module3_dynamic_dag_India:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "India"
 
module3_dynamic_dag_Japan:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Japan"
 
module3_dynamic_dag_Mexico:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Mexico"
 
module3_dynamic_dag_Russia:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Russia"
 
module3_dynamic_dag_Spain:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Spain"
  1. Edit the file example_dag_factory.py and save it with the following code content, then upload the file back to dags folder. The code cleans the existing the DAGs and generates clean_dags() method and the creating new DAGs using the generate_dags() method from the DagFactory instance.
from airflow import DAG
import dagfactory
  
config_file = "/usr/local/airflow/dags/dynamic_dags.yml"
example_dag_factory = dagfactory.DagFactory(config_file)
  
## to clean up or delete any existing DAGs ##
example_dag_factory.clean_dags(globals())
## generate and create new DAGs ##
example_dag_factory.generate_dags(globals())
  1. After you upload the files, go back to the Airflow UI console and navigate to the DAGs tab, where you will find new DAGs.
    List the new DAGs
  2. Once you upload the files, go back to the Airflow UI console and under the DAGs tab you will find new DAGs are appearing as shown below:DAGs

You can enable DAGs by making them active and testing them individually. Upon activation, an additional CSV file named count_death_{COUNTRY_CODE}.csv is generated in the dags folder.

Cleaning up

There may be costs associated with using the various AWS services discussed in this post. To prevent incurring future charges, delete the Amazon MWAA environment after you have completed the tasks outlined in this post, and empty and delete the S3 bucket.

Conclusion

In this blog post we demonstrated how to use the dag-factory library to create dynamic DAGs. Dynamic DAGs are characterized by their ability to generate results with each parsing of the DAG file based on configurations. Consider using dynamic DAGs in the following scenarios:

  • Automating migration from a legacy system to Airflow, where flexibility in DAG generation is crucial
  • Situations where only a parameter changes between different DAGs, streamlining the workflow management process
  • Managing DAGs that are reliant on the evolving structure of a source system, providing adaptability to changes
  • Establishing standardized practices for DAGs across your team or organization by creating these blueprints, promoting consistency and efficiency
  • Embracing YAML-based declarations over complex Python coding, simplifying DAG configuration and maintenance processes
  • Creating data driven workflows that adapt and evolve based on the data inputs, enabling efficient automation

By incorporating dynamic DAGs into your workflow, you can enhance automation, adaptability, and standardization, ultimately improving the efficiency and effectiveness of your data pipeline management.

To learn more about Amazon MWAA DAG Factory, visit Amazon MWAA for Analytics Workshop: DAG Factory. For additional details and code examples on Amazon MWAA, visit the Amazon MWAA User Guide and the Amazon MWAA examples GitHub repository.


About the Authors

 Jayesh Shinde is Sr. Application Architect with AWS ProServe India. He specializes in creating various solutions that are cloud centered using modern software development practices like serverless, DevOps, and analytics.

Harshd Yeola is Sr. Cloud Architect with AWS ProServe India helping customers to migrate and modernize their infrastructure into AWS. He specializes in building DevSecOps and scalable infrastructure using containers, AIOPs, and AWS Developer Tools and services.

Introducing Amazon MWAA larger environment sizes

Post Syndicated from Hernan Garcia original https://aws.amazon.com/blogs/big-data/introducing-amazon-mwaa-larger-environment-sizes/

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed service for Apache Airflow that streamlines the setup and operation of the infrastructure to orchestrate data pipelines in the cloud. Customers use Amazon MWAA to manage the scalability, availability, and security of their Apache Airflow environments. As they design more intensive, complex, and ever-growing data processing pipelines, customers have asked us for additional underlying resources to provide greater concurrency and capacity for their tasks and workflows.

To address this, today, we are announcing the availability of larger environment classes in Amazon MWAA. In this post, we dive into the capabilities of these new XL and 2XL environments, the scenarios they are well suited for, and how you can set up or upgrade your existing Amazon MWAA environment to take advantage of the increased resources.

Current challenges

When you create an Amazon MWAA environment, a set of managed Amazon Elastic Container Service (Amazon ECS) with AWS Fargate containers are provisioned with defined virtual CPUs and RAM.

As you work with larger, complex, resource-intensive workloads, or run thousands of Directed Acyclic Graphs (DAGs) per day, you may start exhausting CPU availability on schedulers and workers, or reaching memory limits in workers. Running Apache Airflow at scale puts proportionally greater load on the Airflow metadata database, sometimes leading to CPU and memory issues on the underlying Amazon Relational Database Service (Amazon RDS) cluster. A resource-starved metadata database may lead to dropped connections from your workers, failing tasks prematurely.

To improve performance and resiliency of your tasks, consider following Apache Airflow best practices to author DAGs. As an alternative, you can create multiple Amazon MWAA environments to distribute workloads. However, this requires additional engineering and management effort.

New environment classes

With today’s release, you can now create XL and 2XL environments in Amazon MWAA in addition to the existing environment classes. They have two and four times the compute, and three and six times the memory, respectively, of the current large Amazon MWAA environment instance class. These instances add compute and RAM linearly to directly improve capacity and performance of all Apache Airflow components. The following table summarizes the environment capabilities.

. Scheduler and Worker CPU / RAM

Web Server

CPU / RAM

Concurrent Tasks DAG Capacity
mw1.xlarge 8 vCPUs / 24 GB 4 vCPUs / 12 GB 40 tasks (default) Up to 2000
mw1.2xlarge 16 vCPUs / 48 GB 8 vCPUs / 24 GB 80 tasks (default) Up to 4000

With the introduction of these larger environments, your Amazon Aurora metadata database will now use larger, memory-optimized instances powered by AWS Graviton2. With the Graviton2 family of processors, you get compute, storage, and networking improvements, and the reduction of your carbon footprint offered by the AWS family of processors.

Pricing

Amazon MWAA pricing dimensions remains unchanged, and you only pay for what you use:

  • The environment class
  • Additional worker instances
  • Additional scheduler instances
  • Metadata database storage consumed

You now get two additional options in the first three dimensions: XL and 2XL for environment class, additional workers, and schedulers instances. Metadata database storage pricing remains the same. Refer to Amazon Managed Workflows for Apache Airflow Pricing for rates and more details.

Observe Amazon MWAA performance to plan scaling to larger environments

Before you start using the new environment classes, it’s important to understand if you are in a scenario that relates to capacity issues, such as metadata database out of memory, or workers or schedulers running at high CPU usage. Understanding the performance of your environment resources is key to troubleshooting issues related to capacity. We recommend following the guidance described in Introducing container, database, and queue utilization metrics for the Amazon MWAA environment to better understand the state of Amazon MWAA environments, and get insights to right-size your instances.

In the following test, we simulate a high load scenario, use the CloudWatch observability metrics to identify common problems, and make an informed decision to plan scaling to larger environments to mitigate the issues.

During our tests, we ran a complex DAG that dynamically creates over 500 tasks and uses external sensors to wait for a task completion in a different DAG. After running on an Amazon MWAA large environment class with auto scaling set up to a maximum of 10 worker nodes, we noticed the following metrics and values in the CloudWatch dashboard.

The worker nodes have reached maximum CPU capacity, causing the number of queued tasks to keep increasing. The metadata database CPU utilization has peaked at over 65% capacity, and the available database free memory has been reduced. In this situation, we could further increase the worker nodes to scale, but that would put additional load on the metadata database CPU. This might lead to a drop in the number of worker database connections and available free database memory.

With new environment classes, you can vertically scale to increase available resources by editing the environment and selecting a higher class of environment, as shown in the following screenshot.

From the list of environments, we select the one in use for this test. Choose Edit to navigate to the Configure advanced settings page, and select the appropriate xlarge or 2xlarge environment as required.

After you save the change, the environment upgrade will take 20–30 minutes to complete. Any running DAG that got interrupted during the upgrade is scheduled for a retry, depending on the way you configured the retries for your DAGs. You can now choose to invoke them manually or wait for the next scheduled run.

After we upgraded the environment class, we tested the same DAG and observed the metrics were showing improved values because more resources are now available. With this XL environment, you can run more tasks on fewer worker nodes, and therefore the number of queued tasks kept decreasing. Alternately, if you have tasks that require more memory and/or CPU, you can reduce the tasks per worker, but still achieve a high number of tasks per worker with a larger environment size. For example, if you have a large environment where the worker node CPU is maxed out with celery.worker_autoscale (the Airflow configuration that defines the number of tasks per worker) Set at 20,20, you can increase to an XL environment and set celery.worker_autoscale to 20,20 on the XL, rather than the default 40 tasks per worker on an XL environment and the CPU load should reduce significantly.

Set up a new XL environment in Amazon MWAA

You can get started with Amazon MWAA in your account and preferred AWS Region using the AWS Management Console, API, or AWS Command Line Interface (AWS CLI). If you’re adopting infrastructure as code (IaC), you can automate the setup using AWS CloudFormation, the AWS Cloud Development Kit (AWS CDK), or Terraform scripts.

Amazon MWAA XL and 2XL environment classes are available today in all Regions where Amazon MWAA is currently available.

Conclusion

Today, we are announcing the availability of two new environment classes in Amazon MWAA. With XL and 2XL environment classes, you can orchestrate larger volumes of complex or resource-intensive workflows. If you are running DAGs with a high number of dependencies, running thousands of DAGs across multiple environments, or in a scenario that requires you to heavily use workers for compute, you can now overcome the related capacity issues by increasing your environment resources in a few straightforward steps.

In this post, we discussed the capabilities of the two new environment classes, including pricing and some common resource constraint problems they solve. We provided guidance and an example of how to observe your existing environments to plan scaling to XL or 2XL, and we described how you can upgrade existing environments to use the increased resources.

For additional details and code examples on Amazon MWAA, visit the Amazon MWAA User Guide and the Amazon MWAA examples GitHub repo.

Apache, Apache Airflow, and Airflow are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries.


About the Authors

Hernan Garcia is a Senior Solutions Architect at AWS based in the Netherlands. He works in the financial services industry, supporting enterprises in their cloud adoption. He is passionate about serverless technologies, security, and compliance. He enjoys spending time with family and friends, and trying out new dishes from different cuisines.

Jeetendra Vaidya is a Senior Solutions Architect at AWS, bringing his expertise to the realms of AI/ML, serverless, and data analytics domains. He is passionate about assisting customers in architecting secure, scalable, reliable, and cost-effective solutions.

Sriharsh Adari is a Senior Solutions Architect at AWS, where he helps customers work backward from business outcomes to develop innovative solutions on AWS. Over the years, he has helped multiple customers on data platform transformations across industry verticals. His core area of expertise includes technology strategy, data analytics, and data science. In his spare time, he enjoys playing sports, watching TV shows, and playing Tabla.

Top Architecture Blog Posts of 2023

Post Syndicated from Andrea Courtright original https://aws.amazon.com/blogs/architecture/top-architecture-blog-posts-of-2023/

2023 was a rollercoaster year in tech, and we at the AWS Architecture Blog feel so fortunate to have shared in the excitement. As we move into 2024 and all of the new technologies we could see, we want to take a moment to highlight the brightest stars from 2023.

As always, thanks to our readers and to the many talented and hardworking Solutions Architects and other contributors to our blog.

I give you our 2023 cream of the crop!

#10: Build a serverless retail solution for endless aisle on AWS

In this post, Sandeep and Shashank help retailers and their customers alike in this guided approach to finding inventory that doesn’t live on shelves.

Building endless aisle architecture for order processing

Figure 1. Building endless aisle architecture for order processing

Check it out!

#9: Optimizing data with automated intelligent document processing solutions

Who else dreads wading through large amounts of data in multiple formats? Just me? I didn’t think so. Using Amazon AI/ML and content-reading services, Deependra, Anirudha, Bhajandeep, and Senaka have created a solution that is scalable and cost-effective to help you extract the data you need and store it in a format that works for you.

AI-based intelligent document processing engine

Figure 2: AI-based intelligent document processing engine

Check it out!

#8: Disaster Recovery Solutions with AWS managed services, Part 3: Multi-Site Active/Passive

Disaster recovery posts are always popular, and this post by Brent and Dhruv is no exception. Their creative approach in part 3 of this series is most helpful for customers who have business-critical workloads with higher availability requirements.

Warm standby with managed services

Figure 3. Warm standby with managed services

Check it out!

#7: Simulating Kubernetes-workload AZ failures with AWS Fault Injection Simulator

Continuing with the theme of “when bad things happen,” we have Siva, Elamaran, and Re’s post about preparing for workload failures. If resiliency is a concern (and it really should be), the secret is test, test, TEST.

Architecture flow for Microservices to simulate a realistic failure scenario

Figure 4. Architecture flow for Microservices to simulate a realistic failure scenario

Check it out!

#6: Let’s Architect! Designing event-driven architectures

Luca, Laura, Vittorio, and Zamira weren’t content with their four top-10 spots last year – they’re back with some things you definitely need to know about event-driven architectures.

Let's Architect

Figure 5. Let’s Architect artwork

Check it out!

#5: Use a reusable ETL framework in your AWS lake house architecture

As your lake house increases in size and complexity, you could find yourself facing maintenance challenges, and Ashutosh and Prantik have a solution: frameworks! The reusable ETL template with AWS Glue templates might just save you a headache or three.

Reusable ETL framework architecture

Figure 6. Reusable ETL framework architecture

Check it out!

#4: Invoking asynchronous external APIs with AWS Step Functions

It’s possible that AWS’ menagerie of services doesn’t have everything you need to run your organization. (Possible, but not likely; we have a lot of amazing services.) If you are using third-party APIs, then Jorge, Hossam, and Shirisha’s architecture can help you maintain a secure, reliable, and cost-effective relationship among all involved.

Invoking Asynchronous External APIs architecture

Figure 7. Invoking Asynchronous External APIs architecture

Check it out!

#3: Announcing updates to the AWS Well-Architected Framework

The Well-Architected Framework continues to help AWS customers evaluate their architectures against its six pillars. They are constantly striving for improvement, and Haleh’s diligence in keeping us up to date has not gone unnoticed. Thank you, Haleh!

Well-Architected logo

Figure 8. Well-Architected logo

Check it out!

#2: Let’s Architect! Designing architectures for multi-tenancy

The practically award-winning Let’s Architect! series strikes again! This time, Luca, Laura, Vittorio, and Zamira were joined by Federica to discuss multi-tenancy and why that concept is so crucial for SaaS providers.

Let's Architect

Figure 9. Let’s Architect

Check it out!

And finally…

#1: Understand resiliency patterns and trade-offs to architect efficiently in the cloud

Haresh, Lewis, and Bonnie revamped this 2022 post into a masterpiece that completely stole our readers’ hearts and is among the top posts we’ve ever made!

Resilience patterns and trade-offs

Figure 10. Resilience patterns and trade-offs

Check it out!

Bonus! Three older special mentions

These three posts were published before 2023, but we think they deserve another round of applause because you, our readers, keep coming back to them.

Thanks again to everyone for their contributions during a wild year. We hope you’re looking forward to the rest of 2024 as much as we are!

Introducing Amazon MWAA support for Apache Airflow version 2.8.1

Post Syndicated from Mansi Bhutada original https://aws.amazon.com/blogs/big-data/introducing-amazon-mwaa-support-for-apache-airflow-version-2-8-1/

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow that makes it straightforward to set up and operate end-to-end data pipelines in the cloud.

Organizations use Amazon MWAA to enhance their business workflows. For example, C2i Genomics uses Amazon MWAA in their data platform to orchestrate the validation of algorithms processing cancer genomics data in billions of records. Twitch, a live streaming platform, manages and orchestrates the training and deployment of its recommendation models for over 140 million active users. They use Amazon MWAA to scale, while significantly improving security and reducing infrastructure management overhead.

Today, we are announcing the availability of Apache Airflow version 2.8.1 environments on Amazon MWAA. In this post, we walk you through some of the new features and capabilities of Airflow now available in Amazon MWAA, and how you can set up or upgrade your Amazon MWAA environment to version 2.8.1.

Object storage

As data pipelines scale, engineers struggle to manage storage across multiple systems with unique APIs, authentication methods, and conventions for accessing data, requiring custom logic and storage-specific operators. Airflow now offers a unified object storage abstraction layer that handles these details, letting engineers focus on their data pipelines. Airflow object storage uses fsspec to enable consistent data access code across different object storage systems, thereby streamlining infrastructure complexity.

The following are some of the feature’s key benefits:

  • Portable workflows – You can switch storage services with minimal changes in your Directed Acyclic Graphs (DAGs)
  • Efficient data transfers – You can stream data instead of loading into memory
  • Reduced maintenance – You don’t need separate operators, making your pipelines straightforward to maintain
  • Familiar programming experience – You can use Python modules, like shutil, for file operations

To use object storage with Amazon Simple Storage Service (Amazon S3), you need to install the package extra s3fs with the Amazon provider (apache-airflow-providers-amazon[s3fs]==x.x.x).

In the sample code below, you can see how to move data directly from Google Cloud Storage to Amazon S3. Because Airflow’s object storage uses shutil.copyfileobj, the objects’ data is read in chunks from gcs_data_source and streamed to amazon_s3_data_target.

gcs_data_source = ObjectStoragePath("gcs://source-bucket/prefix/", conn_id="google_cloud_default")

amazon_s3_data_target = ObjectStoragePath("s3://target-bucket/prefix/", conn_id="aws_default ")

with DAG(
    dag_id="copy_from_gcs_to_amazon_s3",
    start_date=datetime(2024, 2, 26),
    schedule="0 0 * * *",
    catchup=False,    
    tags=["2.8", "ObjectStorage"],
) as dag:

    def list_objects(path: ObjectStoragePath) -> list[ObjectStoragePath]:
        objects = [f for f in path.iterdir() if f.is_file()]
        return objects

    def copy_object(path: ObjectStoragePath, object: ObjectStoragePath):    
        object.copy(dst=path)

    objects_list = list_objects(path=gcs_data_source)
    copy_object.partial(path=amazon_s3_data_target).expand(object=objects_list)

For more information on Airflow object storage, refer to Object Storage.

XCom UI

XCom (cross-communications) allows for the passing of data between tasks, facilitating communication and coordination between them. Previously, developers had to switch to a diffferent view to see XComs related to a task. With Airflow 2.8, XCom key-values are rendered directly on a tab within the Airflow Grid view, as shown in the following screenshot.

The new XCom tab provides the following benefits:

  • Improved XCom visibility – A dedicated tab in the UI provides a convenient and user-friendly way to see all XComs associated with a DAG or task.
  • Improved debugging – Being able to see XCom values directly in the UI is helpful for debugging DAGs. You can quickly see the output of upstream tasks without needing to manually pull and inspect them using Python code.

Task context logger

Managing task lifecycles is crucial for the smooth operation of data pipelines in Airflow. However, certain challenges have persisted, particularly in scenarios where tasks are unexpectedly stopped. This can occur due to various reasons, including scheduler timeouts, zombie tasks (tasks that remain in a running state without sending heartbeats), or instances where the worker runs out of memory.

Traditionally, such failures, particularly those triggered by core Airflow components like the scheduler or executor, weren’t recorded within the task logs. This limitation required users to troubleshoot outside the Airflow UI, complicating the process of pinpointing and resolving issues.

Airflow 2.8 introduced a significant improvement that addresses this problem. Airflow components, including the scheduler and executor, can now use the new TaskContextLogger to forward error messages directly to the task logs. This feature allows you to see all the relevant error messages related to a task’s run in one place. This simplifies the process of figuring out why a task failed, offering a complete perspective of what went wrong within a single log view.

The following screenshot shows how the task is detected as zombie, and the scheduler log is being included as part of the task log.

You need to set the environment configuration parameter enable_task_context_logger to True, to enable the feature. Once it’s enabled, Airflow can ship logs from the scheduler, the executor, or callback run context to the task logs, and make them available in the Airflow UI.

Listener hooks for datasets

Datasets were introduced in Airflow 2.4 as a logical grouping of data sources to create data-aware scheduling and dependencies between DAGs. For example, you can schedule a consumer DAG to run when a producer DAG updates a dataset. Listeners enable Airflow users to create subscriptions to certain events happening in the environment. In Airflow 2.8, listeners are added for two datasets events: on_dataset_created and on_dataset_changed, effectively allowing Airflow users to write custom code to react to dataset management operations. For example, you can trigger an external system, or send a notification.

Using listener hooks for datasets is straightforward. Complete the following steps to create a listener for on_dataset_changed:

  1. Create the listener (dataset_listener.py):
    from airflow import Dataset
    from airflow.listeners import hookimpl
    
    @hookimpl
    def on_dataset_changed(dataset: Dataset):
        """Following custom code is executed when a dataset is changed."""
        print("Invoking external endpoint")
    
        """Validating a specific dataset"""
        if dataset.uri == "s3://bucket-prefix/object-key.ext":
            print ("Execute specific/different action for this dataset")

  2. Create a plugin to register the listener in your Airflow environment (dataset_listener_plugin.py):
    from airflow.plugins_manager import AirflowPlugin
    from plugins import listener_code
    
    class DatasetListenerPlugin(AirflowPlugin):
        name = "dataset_listener_plugin"
        listeners = [dataset_listener]

For more information on how to install plugins in Amazon MWAA, refer to Installing custom plugins.

Set up a new Airflow 2.8.1 environment in Amazon MWAA

You can initiate the setup in your account and preferred Region using the AWS Management Console, API, or AWS Command Line Interface (AWS CLI). If you’re adopting infrastructure as code (IaC), you can automate the setup using AWS CloudFormation, the AWS Cloud Development Kit (AWS CDK), or Terraform scripts.

Upon successful creation of an Airflow version 2.8.1 environment in Amazon MWAA, certain packages are automatically installed on the scheduler and worker nodes. For a complete list of installed packages and their versions, refer to Apache Airflow provider packages installed on Amazon MWAA environments. You can install additional packages using a requirements file.

Upgrade from older versions of Airflow to version 2.8.1

You can take advantage of these latest capabilities by upgrading your older Airflow version 2.x-based environments to version 2.8.1 using in-place version upgrades. To learn more about in-place version upgrades, refer to Upgrading the Apache Airflow version or Introducing in-place version upgrades with Amazon MWAA.

Conclusion

In this post, we discussed some important features introduced in Airflow version 2.8, such as object storage, the new XCom tab added to the grid view, task context logging, listener hooks for datasets, and how you can start using them. We also provided some sample code to show implementations in Amazon MWAA. For the complete list of changes, refer to Airflow’s release notes.

For additional details and code examples on Amazon MWAA, visit the Amazon MWAA User Guide and the Amazon MWAA examples GitHub repo.

Apache, Apache Airflow, and Airflow are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries.


About the Authors

Mansi Bhutada is an ISV Solutions Architect based in the Netherlands. She helps customers design and implement well-architected solutions in AWS that address their business problems. She is passionate about data analytics and networking. Beyond work, she enjoys experimenting with food, playing pickleball, and diving into fun board games.

Hernan Garcia is a Senior Solutions Architect at AWS based in the Netherlands. He works in the financial services industry, supporting enterprises in their cloud adoption. He is passionate about serverless technologies, security, and compliance. He enjoys spending time with family and friends, and trying out new dishes from different cuisines.

Disaster recovery strategies for Amazon MWAA – Part 1

Post Syndicated from Parnab Basak original https://aws.amazon.com/blogs/big-data/disaster-recovery-strategies-for-amazon-mwaa-part-1/

In the dynamic world of cloud computing, ensuring the resilience and availability of critical applications is paramount. Disaster recovery (DR) is the process by which an organization anticipates and addresses technology-related disasters. For organizations implementing critical workload orchestration using Amazon Managed Workflows for Apache Airflow (Amazon MWAA), it is crucial to have a DR plan in place to ensure business continuity.

In this series, we explore the need for Amazon MWAA disaster recovery and prescribe solutions that will sustain Amazon MWAA environments against unintended disruptions. This lets you to define, avoid, and handle disruption risks as part of your business continuity plan. This post focuses on designing the overall DR architecture. A future post in this series will focus on implementing the individual components using AWS services.

The need for Amazon MWAA disaster recovery

Amazon MWAA, a fully managed service for Apache Airflow, brings immense value to organizations by automating workflow orchestration for extract, transform, and load (ETL), DevOps, and machine learning (ML) workloads. Amazon MWAA has a distributed architecture with multiple components such as scheduler, worker, web server, queue, and database. This makes it difficult to implement a comprehensive DR strategy.

An active Amazon MWAA environment continuously parses Airflow Directed Acyclic Graphs (DAGs), reading them from a configured Amazon Simple Storage Service (Amazon S3) bucket. DAG source unavailability due to network unreachability, unintended corruption, or deletes leads to extended downtime and service disruption.

Within Airflow, the metadata database is a core component storing configuration variables, roles, permissions, and DAG run histories. A healthy metadata database is therefore critical for your Airflow environment. As with any core Airflow component, having a backup and disaster recovery plan in place for the metadata database is essential.

Amazon MWAA deploys Airflow components to multiple Availability Zones within your VPC in your preferred AWS Region. This provides fault tolerance and automatic recovery against a single Availability Zone failure. For mission-critical workloads, being resilient to the impairments of a unitary Region through multi-Region deployments is additionally important to ensure high availability and business continuity.

Balancing between costs to maintain redundant infrastructures, complexity, and recovery time is essential for Amazon MWAA environments. Organizations aim for cost-effective solutions that minimize their Recovery Time Objective (RTO) and Recovery Point Objective (RPO) to meet their service level agreements, be economically viable, and meet their customers’ demands.

Detect disasters in the primary environment: Proactive monitoring through metrics and alarms

Prompt detection of disasters in the primary environment is crucial for timely disaster recovery. Monitoring the Amazon CloudWatch SchedulerHeartbeat metric provides insights into Airflow health of an active Amazon MWAA environment. You can add other health check metrics to the evaluation criteria, such as checking the availability of upstream or downstream systems and network reachability. Combined with CloudWatch alarms, you can send notifications when these thresholds over a number of time periods are not met. You can add alarms to dashboards to monitor and receive alerts about your AWS resources and applications across multiple Regions.

AWS publishes our most up-to-the-minute information on service availability on the Service Health Dashboard. You can check at any time to get current status information, or subscribe to an RSS feed to be notified of interruptions to each individual service in your operating Region. The AWS Health Dashboard provides information about AWS Health events that can affect your account.

By combining metric monitoring, available dashboards, and automatic alarming, you can promptly detect unavailability of your primary environment, enabling proactive measures to transition to your DR plan. It is critical to factor in incident detection, notification, escalation, discovery, and declaration into your DR planning and implementation to provide realistic and achievable objectives that provide business value.

In the following sections, we discuss two Amazon MWAA DR strategy solutions and their architecture.

DR strategy solution 1: Backup and restore

The backup and restore strategy involves generating Airflow component backups in the same or different Region as your primary Amazon MWAA environment. To ensure continuity, you can asynchronously replicate these to your DR Region, with minimal performance impact on your primary Amazon MWAA environment. In the event of a rare primary Regional impairment or service disruption, this strategy will create a new Amazon MWAA environment and recover historical data to it from existing backups. However, it’s important to note that during the recovery process, there will be a period where no Airflow environments are operational to process workflows until the new environment is fully provisioned and marked as available.

This strategy provides a low-cost and low-complexity solution that is also suitable for mitigating against data loss or corruption within your primary Region. The amount of data being backed up and the time to create a new Amazon MWAA environment (typically 20–30 minutes) affects how quickly restoration can happen. To enable infrastructure to be redeployed quickly without errors, deploy using infrastructure as code (IaC). Without IaC, it may be complex to restore an analogous DR environment, which will lead to increased recovery times and possibly exceed your RTO.

Let’s explore the setup required when your primary Amazon MWAA environment is actively running, as shown in the following figure.

Backup and Restore - Pre

The solution comprises three key components. The first component is the primary environment, where the Airflow workflows are initially deployed and actively running. The second component is the disaster monitoring component, comprised of CloudWatch and a combination of an AWS Step Functions state machine and a AWS Lambda function. The third component is for creating and storing backups of all configurations and metadata that is required to restore. This can be in the same Region as your primary or replicated to your DR Region using S3 Cross-Region Replication (CRR). For CRR, you also pay for inter-Region data transfer out from Amazon S3 to each destination Region.

The first three steps in the workflow are as follows:

  1. As part of your backup creation process, Airflow metadata is replicated to an S3 bucket using an export DAG utility, run periodically based on your RPO interval.
  2. Your existing primary Amazon MWAA environment automatically emits the status of its scheduler’s health to the CloudWatch SchedulerHeartbeat metric.
  3. A multi-step Step Functions state machine is triggered from a periodic Amazon EventBridge schedule to monitor the scheduler’s health status. As the primary step of the state machine, a Lambda function evaluates the status of the SchedulerHeartbeat metric. If the metric is deemed healthy, no action is taken.

The following figure illustrates the additional steps in the solution workflow.

Backup and Restore post

  1. When the heartbeat count deviates from the normal count for a period of time, a series of actions are initiated to recover to a new Amazon MWAA environment in the DR Region. These actions include starting creation of a new Amazon MWAA environment, replicating the primary environment configurations, and then waiting for the new environment to become available.
  2. When the environment is available, an import DAG utility is run to restore the metadata contents from the backups. Any DAG runs that were interrupted during the impairment of the primary environment need to be manually rerun to maintain service level agreements. Future DAG runs are queued to run as per their next configured schedule.

DR strategy solution 2: Active-passive environments with periodic data synchronization

The active-passive environments with periodic data synchronization strategy focuses on maintaining recurrent data synchronization between an active primary and a passive Amazon MWAA DR environment. By periodically updating and synchronizing DAG stores and metadata databases, this strategy ensures that the DR environment remains current or nearly current with the primary. The DR Region can be the same or a different Region than your primary Amazon MWAA environment. In the event of a disaster, backups are available to revert to a previous known good state to minimize data loss or corruption.

This strategy provides low RTO and RPO with frequent synchronization, allowing quick recovery with minimal data loss. The infrastructure costs and code deployments are compounded to maintain both the primary and DR Amazon MWAA environments. Your DR environment is available immediately to run DAGs on.

The following figure illustrates the setup required when your primary Amazon MWAA environment is actively running.

Active Passive pre

The solution comprises four key components. Similar to the backup and restore solution, the first component is the primary environment, where the workflow is initially deployed and is actively running. The second component is the disaster monitoring component, consisting of CloudWatch and a combination of a Step Functions state machine and Lambda function. The third component creates and stores backups for all configurations and metadata required for the database synchronization. This can be in the same Region as your primary or replicated to your DR Region using Amazon S3 Cross-Region Replication. As mentioned earlier, for CRR, you also pay for inter-Region data transfer out from Amazon S3 to each destination Region. The last component is a passive Amazon MWAA environment that has the same Airflow code and environment configurations as the primary. The DAGs are deployed in the DR environment using the same continuous integration and continuous delivery (CI/CD) pipeline as the primary. Unlike the primary, DAGs are kept in a paused state to not cause duplicate runs.

The first steps of the workflow are similar to the backup and restore strategy:

  1. As part of your backup creation process, Airflow metadata is replicated to an S3 bucket using an export DAG utility, run periodically based on your RPO interval.
  2. Your existing primary Amazon MWAA environment automatically emits the status of its scheduler’s health to CloudWatch SchedulerHeartbeat metric.
  3. A multi-step Step Functions state machine is triggered from a periodic Amazon EventBridge schedule to monitor scheduler health status. As the primary step of the state machine, a Lambda function evaluates the status of the SchedulerHeartbeat metric. If the metric is deemed healthy, no action is taken.

The following figure illustrates the final steps of the workflow.

Active Passive post

  1. When the heartbeat count deviates from the normal count for a period of time, DR actions are initiated.
  2. As a first step, a Lambda function triggers an import DAG utility to restore the metadata contents from the backups to the passive Amazon MWAA DR environment. When the imports are complete, the same DAG can un-pause the other Airflow DAGs, making them active for future runs. Any DAG runs that were interrupted during the impairment of the primary environment need to be manually rerun to maintain service level agreements. Future DAG runs are queued to run as per their next configured schedule.

Best practices to improve resiliency of Amazon MWAA

To enhance the resiliency of your Amazon MWAA environment and ensure smooth disaster recovery, consider implementing the following best practices:

  • Robust backup and restore mechanisms – Implementing comprehensive backup and restore mechanisms for Amazon MWAA data is essential. Regularly deleting existing metadata based on your organization’s retention policies reduces backup times and makes your Amazon MWAA environment more performant.
  • Automation using IaC – Using automation and orchestration tools such as AWS CloudFormation, the AWS Cloud Development Kit (AWS CDK), or Terraform can streamline the deployment and configuration management of Amazon MWAA environments. This ensures consistency, reproducibility, and faster recovery during DR scenarios.
  • Idempotent DAGs and tasks – In Airflow, a DAG is considered idempotent if rerunning the same DAG with the same inputs multiple times has the same effect as running it only once. Designing idempotent DAGs and keeping tasks atomic decreases recovery time from failures when you have to manually rerun an interrupted DAG in your recovered environment.
  • Regular testing and validation – A robust Amazon MWAA DR strategy should include regular testing and validation exercises. By simulating disaster scenarios, you can identify any gaps in your DR plans, fine-tune processes, and ensure your Amazon MWAA environments are fully recoverable.

Conclusion

In this post, we explored the challenges for Amazon MWAA disaster recovery and discussed best practices to improve resiliency. We examined two DR strategy solutions: backup and restore and active-passive environments with periodic data synchronization. By implementing these solutions and following best practices, you can protect your Amazon MWAA environments, minimize downtime, and mitigate the impact of disasters. Regular testing, validation, and adaptation to evolving requirements are crucial for an effective Amazon MWAA DR strategy. By continuously evaluating and refining your disaster recovery plans, you can ensure the resilience and uninterrupted operation of your Amazon MWAA environments, even in the face of unforeseen events.

For additional details and code examples on Amazon MWAA, refer to the Amazon MWAA User Guide and the Amazon MWAA examples GitHub repo.


About the Authors

Parnab Basak is a Senior Solutions Architect and a Serverless Specialist at AWS. He specializes in creating new solutions that are cloud native using modern software development practices like serverless, DevOps, and analytics. Parnab works closely in the analytics and integration services space helping customers adopt AWS services for their workflow orchestration needs.

Chandan Rupakheti is a Solutions Architect and a Serverless Specialist at AWS. He is a passionate technical leader, researcher, and mentor with a knack for building innovative solutions in the cloud and bringing stakeholders together in their cloud journey. Outside his professional life, he loves spending time with his family and friends besides listening and playing music.

Vinod Jayendra is a Enterprise Support Lead in ISV accounts at Amazon Web Services, where he helps customers in solving their architectural, operational, and cost optimization challenges. With a particular focus on Serverless technologies, he draws from his extensive background in application development to deliver top-tier solutions. Beyond work, he finds joy in quality family time, embarking on biking adventures, and coaching youth sports team.

Rupesh Tiwari is a Senior Solutions Architect at AWS in New York City, with a focus on Financial Services. He has over 18 years of IT experience in the finance, insurance, and education domains, and specializes in architecting large-scale applications and cloud-native big data workloads. In his spare time, Rupesh enjoys singing karaoke, watching comedy TV series, and creating joyful moments with his family.

Orchestrate Amazon EMR Serverless Spark jobs with Amazon MWAA, and data validation using Amazon Athena

Post Syndicated from Gaurav Parekh original https://aws.amazon.com/blogs/big-data/orchestrate-amazon-emr-serverless-spark-jobs-with-amazon-mwaa-and-data-validation-using-amazon-athena/

As data engineering becomes increasingly complex, organizations are looking for new ways to streamline their data processing workflows. Many data engineers today use Apache Airflow to build, schedule, and monitor their data pipelines.

However, as the volume of data grows, managing and scaling these pipelines can become a daunting task. Amazon Managed Workflows for Apache Airflow (Amazon MWAA) can help simplify the process of building, running, and managing data pipelines. By providing Apache Airflow as a fully managed platform, Amazon MWAA allows data engineers to focus on building data workflows instead of worrying about infrastructure.

Today, businesses and organizations require cost-effective and efficient ways to process large amounts of data. Amazon EMR Serverless is a cost-effective and scalable solution for big data processing that can handle large volumes of data. The Amazon Provider in Apache Airflow comes with EMR Serverless operators and is already included in Amazon MWAA, making it easy for data engineers to build scalable and reliable data processing pipelines. You can use EMR Serverless to run Spark jobs on the data, and use Amazon MWAA to manage the workflows and dependencies between these jobs. This integration can also help reduce costs by automatically scaling the resources needed to process data.

Amazon Athena is a serverless, interactive analytics service built on open-source frameworks, supporting open-table and file formats. You can use standard SQL to interact with data. Athena, a serverless and interactive analytics service, makes this possible without the need to manage complex infrastructure.

In this post, we use Amazon MWAA, EMR Serverless, and Athena to build a complete end-to-end data processing pipeline.

Solution overview

The following diagram illustrates the solution architecture.

The workflow includes the following steps:

  1. Create an Amazon MWAA workflow that retrieves data from your input Amazon Simple Storage Service (Amazon S3) bucket.
  2. Use EMR Serverless to process the data stored in Amazon S3. EMR Serverless automatically scales up or down based on the workload, so you don’t need to worry about provisioning or managing any infrastructure.
  3. Use EMR Serverless to transform the data using PySpark code and then store the transformed data back in your S3 bucket.
  4. Use Athena to create an external table based on the S3 dataset and run queries to analyze the transformed data. Athena uses the AWS Glue Data Catalog to store the table metadata.

Prerequisites

You should have the following prerequisites:

Data preparation

To illustrate using EMR Serverless jobs with Apache Spark via Amazon MWAA and data validation using Athena, we use the publicly available NYC taxi dataset. Download the following datasets to your local machine:

  • Green taxi and Yellow taxi trip records – Trip records for yellow and green taxis, which include information such as pick-up and drop-off dates and times, locations, trip distances, and payment types. In our example, we use the latest Parquet files for 2022.
  • Dataset for Taxi zone lookup – A dataset that provides location IDs and corresponding zone details for taxis.

In later steps, we upload these datasets to Amazon S3.

Create solution resources

This section outlines the steps for setting up data processing and transformation.

Create an EMR Serverless application

You can create one or more EMR Serverless applications that use open source analytics frameworks like Apache Spark or Apache Hive. Unlike EMR on EC2, you do not need to delete or terminate EMR Serverless applications. EMR Serverless application is only a definition and once created, can be re-used as long as needed. This makes the MWAA pipeline simpler as now you just have to submit jobs to a pre-created EMR Serverless application.

By default, EMR Serverless application will auto-start on job submission and auto-stop when idle for 15 minutes by default to ensure cost efficiency. You can modify the amount of idle time or choose to turn the feature off.

To create an application using EMR Serverless console, follow the instructions in “Create an EMR Serverless application”. Note down the application ID as we will use it in following steps.

Create an S3 bucket and folders

Complete the following steps to set up your S3 bucket and folders:

  1. On the Amazon S3 console, create an S3 bucket to store the dataset.
  2. Note the name of the S3 bucket to use in later steps.
  3. Create an input_data folder for storing input data.
  4. Within that folder, create three separate folders, one for each dataset: green, yellow, and zone_lookup.

You can download and work with the latest datasets available. For our testing, we use the following files:

  • The green/ folder has the file green_tripdata_2022-06.parquet
  • The yellow/ folder has the file yellow_tripdata_2022-06.parquet
  • The zone_lookup/ folder has the file taxi_zone_lookup.csv

Set up the Amazon MWAA DAG scripts

Complete the following steps to set up your DAG scripts:

  1. Download the following scripts to your local machine:
    1. requirements.txt – A Python dependency is any package or distribution that is not included in the Apache Airflow base install for your Apache Airflow version on your Amazon MWAA environment. For this post, we use Boto3 version >=1.23.9.
    2. blog_dag_mwaa_emrs_ny_taxi.py – This script is a part of the Amazon MWAA DAG and consists of the following tasks: yellow_taxi_zone_lookup, green_taxi_zone_lookup, and ny_taxi_summary,. These tasks involve running Spark jobs to lookup taxi zones, and generating a data summary .
    3. green_zone.py – This PySpark script reads data files for green taxi rides and zone lookup, performs a join operation to combine them, and generates an output file containing green taxi rides with zone information. It utilizes temporary views for the df_green and df_zone data frames, performs column-based joins, and aggregates data like passenger count, trip distance, and fare amount. Lastly, it creates the output_data folder in the specified S3 bucket to write the resulting data frame, df_green_zone, as Parquet files.
    4. yellow_zone.py – This PySpark script processes yellow taxi ride and zone lookup data files by joining them to generate an output file containing yellow taxi rides with zone information. The script accepts a user-provided S3 bucket name and initiates a Spark session with the application name yellow_zone. It reads the yellow taxi files and zone lookup file from the specified S3 bucket, creates temporary views, performs a join based on location ID, and calculates statistics such as passenger count, trip distance, and fare amount. Lastly, it creates the output_data folder in the specified S3 bucket to write the resulting data frame, df_yellow_zone, as Parquet files.
    5. ny_taxi_summary.py – This PySpark script processes the green_zone and yellow_zone files to aggregate statistics on taxi rides, grouping data by service zones and location IDs. It requires an S3 bucket name as a command line argument, creates a SparkSession named ny_taxi_summary, reads the files from S3, performs a join, and generates a new data frame named ny_taxi_summary. It creates an output_data folder in the specified S3 bucket to write the resulting data frame to new Parquet files.
  2. On your local machine, update the blog_dag_mwaa_emrs_ny_taxi.py script with the following information:
    • Update your S3 bucket name in the following two lines:
      S3_LOGS_BUCKET = "<<bucket_name_here>>"
      S3_BASE_BUCKET = "<<bucket_name_here>>"

    • Update your role name ARN:
      JOB_ROLE_ARN = “<<emr_serverless_execution_role ARN here>>”
      e.g. arn:aws:iam::<<ACCOUNT_ID>>:role/<<ROLE_NAME>>

    • Update EMR Serverless Application ID. Use the Application ID created earlier.
      EMR_SERVERLESS_APPLICATION_ID  = “<<emr serverless application ID here>>

  3. Upload the requirements.txt file to the S3 bucket created earlier
  4. In the S3 bucket, create a folder named dags and upload the updated blog_dag_mwaa_emrs_ny_taxi.py file from your local machine.
  5. On the Amazon S3 console, create a new folder named scripts inside the S3 bucket and upload the scripts to this folder from your local machine.

Create an Amazon MWAA environment

To create an Airflow environment, complete the following steps:

  1. On the Amazon MWAA console, choose Create environment.
  2. For Name, enter mwaa_emrs_athena_pipeline.
  3. For Airflow version, choose the latest version (for this post, 2.5.1).
  4. For S3 Bucket, enter the path to your S3 bucket.
  5. For DAGs folder, enter the path to your dags folder.
  6. For Requirements file, enter the path to the requirements.txt file.
  7. Choose Next.
  8. For Virtual private cloud (VPC), choose a VPC that has a minimum of two private subnets.

This will populate two of the private subnets in your VPC.

  1. Under Web server access, select Public network.

This allows the Apache Airflow UI to be accessed over the internet by users granted access to the IAM policy for your environment.

  1. For Security group(s), select Create new security group.
  2. For Environment class, select mw1.small.
  3. For Execution role, choose Create a new role.
  4. For Role name, enter a name.
  5. Leave the other configurations as default and choose Next.
  6. On the next page, choose Create environment.

It may take about 20–30 minutes to create your Amazon MWAA environment.

  1. When the Amazon MWAA environment status changes to Available, navigate to the IAM console and update cluster execution role to add pass role privileges to emr_serverless_execution_role.

Trigger the Amazon MWAA DAG

To trigger the DAG, complete the following steps:

  1. On the Amazon MWAA console, choose Environments in the navigation pane.
  2. Open your environment and choose Open Airflow UI.
  3. Select blog_dag_mwaa_emr_ny_taxi, choose the play icon, and choose Trigger DAG.
  4. When the DAG is running, choose the DAG blog_dag_mwaa_emrs_ny_taxi and choose Graph to locate your DAG run workflow.

The DAG will take approximately 4–6 minutes to run all the scripts. You will see all the complete tasks and the overall status of the DAG will show as success.

To rerun the DAG, remove s3://<<your_s3_bucket here >>/output_data/.

Optionally, to understand how Amazon MWAA runs these tasks, choose the task you want to inspect.

Choose Run to view the task run details.

The following screenshot shows an example of the task logs.

If you like to dive deep in the execution logs, then on the EMR Serverless console, navigate to “Applications”. The Apache Spark driver logs will indicate the initiation of your job along with the details for executors, stages and tasks that were created by EMR Serverless. These logs can be helpful to monitor your job progress and troubleshoot failures.

By default, EMR Serverless will store application logs securely in Amazon EMR managed storage for a period of 30 days. However, you can also specify Amazon S3 or Amazon CloudWatch as your log delivery options during job submission.

Validate the final result set with Athena

Let’s validate the data loaded by the process using Athena SQL queries.

  1. On the Athena console, choose Query editor in the navigation pane.
  2. If you’re using Athena for the first time, under Settings, choose Manage and enter the S3 bucket location that you created earlier (<S3_BUCKET_NAME>/athena), then choose Save.
  3. In the query editor, enter the following query to create an external table:
CREATE EXTERNAL TABLE default.ny_taxi_summary(
  pu_service_zone string, 
  pulocationid bigint, 
  do_service_zone string, 
  dolocationid bigint, 
  passenger_count bigint, 
  trip_distance double, 
  fare_amount double, 
  extra double, 
  mta_tax double, 
  tip_amount double, 
  tolls_amount double, 
  improvement_surcharge double, 
  total_amount double, 
  congestion_surcharge double, 
  airport_fee double)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://<<YOUR-S3-BUCKET Here>>/output_data/ny_taxi_summary/' -- *** Change bucket name to your bucket***
TBLPROPERTIES (
  'classification'='parquet', 
  'compressionType'='none');


Run the following query on the recently created ny_taxi_summary table to retrieve the first 10 rows to validate the data:

select * from default.ny_taxi_summary limit 10;

Clean up

To prevent future charges, complete the following steps:

  1. On the Amazon S3 console, delete the S3 bucket you created to store the Amazon MWAA DAG, scripts, and logs.
  2. On the Athena console, drop the table you created:
    drop table default.ny_taxi_summary;

  3. On the Amazon MWAA console, navigate to the environment that you created and choose Delete.
  4. On the EMR Studio console, delete the application.

To delete the application, navigate to the List applications page. Select the application that you created and choose Actions → Stop to stop the application. After the application is in the STOPPED state, select the same application and choose Actions → Delete.

Conclusion

Data engineering is a critical component of many organizations, and as data volumes continue to grow, it’s essential to find ways to streamline data processing workflows. The combination of Amazon MWAA, EMR Serverless, and Athena provides a powerful solution to build, run, and manage data pipelines efficiently. With this end-to-end data processing pipeline, data engineers can easily process and analyze large amounts of data quickly and cost-effectively without the need to manage complex infrastructure. The integration of these AWS services provides a robust and scalable solution for data processing, helping organizations make informed decisions based on their data insights.

Now that you’ve seen how to submit Spark jobs on EMR Serverless via Amazon MWAA, we encourage you to use Amazon MWAA to create a workflow that will run PySpark jobs via EMR Serverless.

We welcome your feedback and inquiries. Please feel free to reach out to us if you have any questions or comments.


About the authors

Rahul Sonawane is a Principal Analytics Solutions Architect at AWS with AI/ML and Analytics as his area of specialty.

Gaurav Parekh is a Solutions Architect helping AWS customers build large scale modern architecture. He specializes in data analytics and networking. Outside of work, Gaurav enjoys playing cricket, soccer and volleyball.


Audit History

December 2023: This post was reviewed for technical accuracy by Santosh Gantaram, Sr. Technical Account Manager.

Introducing shared VPC support on Amazon MWAA

Post Syndicated from John Jackson original https://aws.amazon.com/blogs/big-data/introducing-shared-vpc-support-on-amazon-mwaa/

In this post, we demonstrate automating deployment of Amazon Managed Workflows for Apache Airflow (Amazon MWAA) using customer-managed endpoints in a VPC, providing compatibility with shared, or otherwise restricted, VPCs.

Data scientists and engineers have made Apache Airflow a leading open source tool to create data pipelines due to its active open source community, familiar Python development as Directed Acyclic Graph (DAG) workflows, and extensive library of pre-built integrations. Amazon MWAA is a managed service for Airflow that makes it easy to run Airflow on AWS without the operational burden of having to manage the underlying infrastructure. For each Airflow environment, Amazon MWAA creates a single-tenant service VPC, which hosts the metadatabase that stores states and the web server that provides the user interface. Amazon MWAA further manages Airflow scheduler and worker instances in a customer-owned and managed VPC, in order to schedule and run tasks that interact with customer resources. Those Airflow containers in the customer VPC access resources in the service VPC via a VPC endpoint.

Many organizations choose to centrally manage their VPC using AWS Organizations, allowing a VPC in an owner account to be shared with resources in a different participant account. However, because creating a new route outside of a VPC is considered a privileged operation, participant accounts can’t create endpoints in owner VPCs. Furthermore, many customers don’t want to extend the security privileges required to create VPC endpoints to all users provisioning Amazon MWAA environments. In addition to VPC endpoints, customers also wish to restrict data egress via Amazon Simple Queue Service (Amazon SQS) queues, and Amazon SQS access is a requirement in the Amazon MWAA architecture.

Shared VPC support for Amazon MWAA adds the ability for you to manage your own endpoints within your VPCs, adding compatibility to shared and otherwise restricted VPCs. Specifying customer-managed endpoints also provides the ability to meet strict security policies by explicitly restricting VPC resource access to just those needed by your Amazon MWAA environments. This post demonstrates how customer-managed endpoints work with Amazon MWAA and provides examples of how to automate the provisioning of those endpoints.

Solution overview

Shared VPC support for Amazon MWAA allows multiple AWS accounts to create their Airflow environments into shared, centrally managed VPCs. The account that owns the VPC (owner) shares the two private subnets required by Amazon MWAA with other accounts (participants) that belong to the same organization from AWS Organizations. After the subnets are shared, the participants can view, create, modify, and delete Amazon MWAA environments in the subnets shared with them.

When users specify the need for a shared, or otherwise policy-restricted, VPC during environment creation, Amazon MWAA will first create the service VPC resources, then enter a pending state for up to 72 hours, with an Amazon EventBridge notification of the change in state. This allows owners to create the required endpoints on behalf of participants based on endpoint service information from the Amazon MWAA console or API, or programmatically via an AWS Lambda function and EventBridge rule, as in the example in this post.

After those endpoints are created on the owner account, the endpoint service in the single-tenant Amazon MWAA VPC will detect the endpoint connection event and resume environment creation. Should there be an issue, you can cancel environment creation by deleting the environment during this pending state.

This feature also allows you to remove the create, modify, and delete VPCE privileges from the AWS Identity and Access Management (IAM) principal creating Amazon MWAA environments, even when not using a shared VPC, because that permission will instead be imposed on the IAM principal creating the endpoint (the Lambda function in our example). Furthermore, the Amazon MWAA environment will provide the SQS queue Amazon Resource Name (ARN) used by the Airflow Celery Executor to queue tasks (the Celery Executor Queue), allowing you to explicitly enter those resources into your network policy rather than having to provide a more open and generalized permission.

In this example, we create the VPC and Amazon MWAA environment in the same account. For shared VPCs across accounts, the EventBridge rule and Lambda function would exist in the owner account, and the Amazon MWAA environment would be created in the participant account. See Sending and receiving Amazon EventBridge events between AWS accounts for more information.

Prerequisites

You should have the following prerequisites:

  • An AWS account
  • An AWS user in that account, with permissions to create VPCs, VPC endpoints, and Amazon MWAA environments
  • An Amazon Simple Storage Service (Amazon S3) bucket in that account, with a folder called dags

Create the VPC

We begin by creating a restrictive VPC using an AWS CloudFormation template, in order to simulate creating the necessary VPC endpoint and modifying the SQS endpoint policy. If you want to use an existing VPC, you can proceed to the next section.

  1. On the AWS CloudFormation console, choose Create stack and choose With new resources (standard).
  2. Under Specify template, choose Upload a template file.
  3. Now we edit our CloudFormation template to restrict access to Amazon SQS. In cfn-vpc-private-bjs.yml, edit the SqsVpcEndoint section to appear as follows:
   SqsVpcEndoint:
     Type: AWS::EC2::VPCEndpoint
     Properties:
       ServiceName: !Sub "com.amazonaws.${AWS::Region}.sqs"
       VpcEndpointType: Interface
       VpcId: !Ref VPC
       PrivateDnsEnabled: true
       SubnetIds:
        - !Ref PrivateSubnet1
        - !Ref PrivateSubnet2
       SecurityGroupIds:
        - !Ref SecurityGroup
       PolicyDocument:
        Statement:
         - Effect: Allow
           Principal: '*'
           Action: '*'
           Resource: []

This additional policy document entry prevents Amazon SQS egress to any resource not explicitly listed.

Now we can create our CloudFormation stack.

  1. On the AWS CloudFormation console, choose Create stack.
  2. Select Upload a template file.
  3. Choose Choose file.
  4. Browse to the file you modified.
  5. Choose Next.
  6. For Stack name, enter MWAA-Environment-VPC.
  7. Choose Next until you reach the review page.
  8. Choose Submit.

Create the Lambda function

We have two options for self-managing our endpoints: manual and automated. In this example, we create a Lambda function that responds to the Amazon MWAA EventBridge notification. You could also use the EventBridge notification to send an Amazon Simple Notification Service (Amazon SNS) message, such as an email, to someone with permission to create the VPC endpoint manually.

First, we create a Lambda function to respond to the EventBridge event that Amazon MWAA will emit.

  1. On the Lambda console, choose Create function.
  2. For Name, enter mwaa-create-lambda.
  3. For Runtime, choose Python 3.11.
  4. Choose Create function.
  5. For Code, in the Code source section, for lambda_function, enter the following code:
    import boto3
    import json
    import logging
    
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    def lambda_handler(event, context):
        if event['detail']['status']=="PENDING":
            detail=event['detail']
            name=detail['name']
            celeryExecutorQueue=detail['celeryExecutorQueue']
            subnetIds=detail['networkConfiguration']['subnetIds']
            securityGroupIds=detail['networkConfiguration']['securityGroupIds']
            databaseVpcEndpointService=detail['databaseVpcEndpointService']
    
            # MWAA does not need to store the VPC ID, but we can get it from the subnets
            client = boto3.client('ec2')
            response = client.describe_subnets(SubnetIds=subnetIds)
            logger.info(response['Subnets'][0]['VpcId'])  
            vpcId=response['Subnets'][0]['VpcId']
            logger.info("vpcId: " + vpcId)       
            
            webserverVpcEndpointService=None
            if detail['webserverAccessMode']=="PRIVATE_ONLY":
                webserverVpcEndpointService=event['detail']['webserverVpcEndpointService']
            
            response = client.describe_vpc_endpoints(
                VpcEndpointIds=[],
                Filters=[
                    {"Name": "vpc-id", "Values": [vpcId]},
                    {"Name": "service-name", "Values": ["*.sqs"]},
                    ],
                MaxResults=1000
            )
            sqsVpcEndpoint=None
            for r in response['VpcEndpoints']:
                if subnetIds[0] in r['SubnetIds'] or subnetIds[0] in r['SubnetIds']:
                    # We are filtering describe by service name, so this must be SQS
                    sqsVpcEndpoint=r
                    break
            
            if sqsVpcEndpoint:
                logger.info("Found SQS endpoint: " + sqsVpcEndpoint['VpcEndpointId'])
    
                logger.info(sqsVpcEndpoint)
                pd = json.loads(sqsVpcEndpoint['PolicyDocument'])
                for s in pd['Statement']:
                    if s['Effect']=='Allow':
                        resource = s['Resource']
                        logger.info(resource)
                        if '*' in resource:
                            logger.info("'*' already allowed")
                        elif celeryExecutorQueue in resource: 
                            logger.info("'"+celeryExecutorQueue+"' already allowed")                
                        else:
                            s['Resource'].append(celeryExecutorQueue)
                            logger.info("Updating SQS policy to " + str(pd))
            
                            client.modify_vpc_endpoint(
                                VpcEndpointId=sqsVpcEndpoint['VpcEndpointId'],
                                PolicyDocument=json.dumps(pd)
                                )
                        break
            
            # create MWAA database endpoint
            logger.info("creating endpoint to " + databaseVpcEndpointService)
            endpointName=name+"-database"
            response = client.create_vpc_endpoint(
                VpcEndpointType='Interface',
                VpcId=vpcId,
                ServiceName=databaseVpcEndpointService,
                SubnetIds=subnetIds,
                SecurityGroupIds=securityGroupIds,
                TagSpecifications=[
                    {
                        "ResourceType": "vpc-endpoint",
                        "Tags": [
                            {
                                "Key": "Name",
                                "Value": endpointName
                            },
                        ]
                    },
                ],           
            )
            logger.info("created VPCE: " + response['VpcEndpoint']['VpcEndpointId'])
                
            # create MWAA web server endpoint (if private)
            if webserverVpcEndpointService:
                endpointName=name+"-webserver"
                logger.info("creating endpoint to " + webserverVpcEndpointService)
                response = client.create_vpc_endpoint(
                    VpcEndpointType='Interface',
                    VpcId=vpcId,
                    ServiceName=webserverVpcEndpointService,
                    SubnetIds=subnetIds,
                    SecurityGroupIds=securityGroupIds,
                    TagSpecifications=[
                        {
                            "ResourceType": "vpc-endpoint",
                            "Tags": [
                                {
                                    "Key": "Name",
                                    "Value": endpointName
                                },
                            ]
                        },
                    ],                  
                )
                logger.info("created VPCE: " + response['VpcEndpoint']['VpcEndpointId'])
    
        return {
            'statusCode': 200,
            'body': json.dumps(event['detail']['status'])
        }

  6. Choose Deploy.
  7. On the Configuration tab of the Lambda function, in the General configuration section, choose Edit.
  8. For Timeout, increate to 5 minutes, 0 seconds.
  9. Choose Save.
  10. In the Permissions section, under Execution role, choose the role name to edit the permissions of this function.
  11. For Permission policies, choose the link under Policy name.
  12. Choose Edit and add a comma and the following statement:
    {
    		"Sid": "Statement1",
    		"Effect": "Allow",
    		"Action": 
    		[
    			"ec2:DescribeVpcEndpoints",
    			"ec2:CreateVpcEndpoint",
    			"ec2:ModifyVpcEndpoint",
                "ec2:DescribeSubnets",
    			"ec2:CreateTags"
    		],
    		"Resource": 
    		[
    			"*"
    		]
    }

The complete policy should look similar to the following:

{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Effect": "Allow",
			"Action": "logs:CreateLogGroup",
			"Resource": "arn:aws:logs:us-east-1:112233445566:*"
		},
		{
			"Effect": "Allow",
			"Action": [
				"logs:CreateLogStream",
				"logs:PutLogEvents"
			],
			"Resource": [
				"arn:aws:logs:us-east-1:112233445566:log-group:/aws/lambda/mwaa-create-lambda:*"
			]
		},
		{
			"Sid": "Statement1",
			"Effect": "Allow",
			"Action": [
				"ec2:DescribeVpcEndpoints",
				"ec2:CreateVpcEndpoint",
				"ec2:ModifyVpcEndpoint",
               	"ec2:DescribeSubnets",
				"ec2:CreateTags"
			],
			"Resource": [
				"*"
			]
		}
	]
}
  1. Choose Next until you reach the review page.
  2. Choose Save changes.

Create an EventBridge rule

Next, we configure EventBridge to send the Amazon MWAA notifications to our Lambda function.

  1. On the EventBridge console, choose Create rule.
  2. For Name, enter mwaa-create.
  3. Select Rule with an event pattern.
  4. Choose Next.
  5. For Creation method, choose User pattern form.
  6. Choose Edit pattern.
  7. For Event pattern, enter the following:
    {
      "source": ["aws.airflow"],
      "detail-type": ["MWAA Environment Status Change"]
    }

  8. Choose Next.
  9. For Select a target, choose Lambda function.

You may also specify an SNS notification in order to receive a message when the environment state changes.

  1. For Function, choose mwaa-create-lambda.
  2. Choose Next until you reach the final section, then choose Create rule.

Create an Amazon MWAA environment

Finally, we create an Amazon MWAA environment with customer-managed endpoints.

  1. On the Amazon MWAA console, choose Create environment.
  2. For Name, enter a unique name for your environment.
  3. For Airflow version, choose the latest Airflow version.
  4. For S3 bucket, choose Browse S3 and choose your S3 bucket, or enter the Amazon S3 URI.
  5. For DAGs folder, choose Browse S3 and choose the dags/ folder in your S3 bucket, or enter the Amazon S3 URI.
  6. Choose Next.
  7. For Virtual Private Cloud, choose the VPC you created earlier.
  8. For Web server access, choose Public network (Internet accessible).
  9. For Security groups, deselect Create new security group.
  10. Choose the shared VPC security group created by the CloudFormation template.

Because the security groups of the AWS PrivateLink endpoints from the earlier step are self-referencing, you must choose the same security group for your Amazon MWAA environment.

  1. For Endpoint management, choose Customer managed endpoints.
  2. Keep the remaining settings as default and choose Next.
  3. Choose Create environment.

When your environment is available, you can access it via the Open Airflow UI link on the Amazon MWAA console.

Clean up

Cleaning up resources that are not actively being used reduces costs and is a best practice. If you don’t delete your resources, you can incur additional charges. To clean up your resources, complete the following steps:

  1. Delete your Amazon MWAA environment, EventBridge rule, and Lambda function.
  2. Delete the VPC endpoints created by the Lambda function.
  3. Delete any security groups created, if applicable.
  4. After the above resources have completed deletion, delete the CloudFormation stack to ensure that you have removed all of the remaining resources.

Summary

This post described how to automate environment creation with shared VPC support in Amazon MWAA. This gives you the ability to manage your own endpoints within your VPC, adding compatibility to shared, or otherwise restricted, VPCs. Specifying customer-managed endpoints also provides the ability to meet strict security policies by explicitly restricting VPC resource access to just those needed by their Amazon MWAA environments. To learn more about Amazon MWAA, refer to the Amazon MWAA User Guide. For more posts about Amazon MWAA, visit the Amazon MWAA resources page.


About the author

John Jackson has over 25 years of software experience as a developer, systems architect, and product manager in both startups and large corporations and is the AWS Principal Product Manager responsible for Amazon MWAA.

Introducing Amazon MWAA support for Apache Airflow version 2.7.2 and deferrable operators

Post Syndicated from Manasi Bhutada original https://aws.amazon.com/blogs/big-data/introducing-amazon-mwaa-support-for-apache-airflow-version-2-7-2-and-deferrable-operators/

Amazon Managed Workflow for Apache Airflow (Amazon MWAA) is a managed service that allows you to use a familiar Apache Airflow environment with improved scalability, availability, and security to enhance and scale your business workflows without the operational burden of managing the underlying infrastructure.

Today, we are announcing the availability of Apache Airflow version 2.7.2 environments and support for deferrable operators on Amazon MWAA. In this post, we provide an overview of deferrable operators and triggers, including a walkthrough of an example showcasing how to use them. We also delve into some of the new features and capabilities of Apache Airflow, and how you can set up or upgrade your Amazon MWAA environment to version 2.7.2.

Deferrable operators and triggers

Standard operators and sensors continuously occupy an Airflow worker slot, regardless of whether they are active or idle. For example, even while waiting for an external system to complete a job, a worker slot is consumed. The Gantt chart below, representing a Directed Acyclic Graph (DAG), showcases this scenario through multiple Amazon Redshift operations.

Gantt chart representing DAG idle time

You can see the time each task spends idling while waiting for the Redshift cluster to be created, snapshotted, and paused. With the introduction of deferrable operators in Apache Airflow 2.2, the polling process can be offloaded to ensure efficient utilization of the worker slot. A deferrable operator can suspend itself and resume once the external job is complete, instead of continuously occupying a worker slot. This minimizes queued tasks and leads to a more efficient utilization of resources within your Amazon MWAA environment. The following figure shows a simplified diagram describing the process flow.

After a task has deferred its run, it frees up the worker slot and assigns the check of completion to a small piece of asynchronous code called a trigger. The trigger runs in a parent process called a triggerer, a service that runs an asyncio event loop. The triggerer has the capability to run triggers in parallel at scale, and to signal tasks to resume when a condition is met.

The Amazon provider package for Apache Airflow has added triggers for popular AWS services like AWS Glue and Amazon EMR. In Amazon MWAA environments running Apache Airflow v2.7.2, the management and operation of the triggerer service is taken care of for you. If you prefer not to use the triggerer service, you can change the configuration mwaa.triggerer_enabled. Additionally, you can define how many triggers each triggerer can run in parallel using the configuration parameter triggerer.default_capacity. This parameter defaults to values based on your Amazon MWAA environment class. Refer to the Configuration reference in the User Guide for detailed configuration values.

When to use deferrable operators

Deferrable operators are particularly useful for tasks that submit jobs to systems external to an Amazon MWAA environment, such as Amazon EMR, AWS Glue, and Amazon SageMaker, or other sensors waiting for a specific event to occur. These tasks can take minutes to hours to complete and are primarily idle operators, making them good candidates to be replaced by their deferrable versions. Some additional use cases include:

  • File system-based operations.
  • Database operations with long running queries.

Using deferrable operators in Amazon MWAA

To use deferrable operators in Amazon MWAA, ensure you’re running Apache Airflow version 2.7 or greater in your Amazon MWAA environment, and the operators or sensors in your DAGs support deferring. Operators in the Amazon provider package expose a deferrable parameter which you can set to True to run the operator in asynchronous mode. For example, you can use S3KeySensor in asynchronous mode as follows:

wait_for_source_data = S3KeySensor (
task_id="WaitForSourceData",
bucket_name="source_bucket_name",
bucket_key = "object_key",
aws_conn_id="aws_default",
deferrable=True
)

You can also utilize various pre-built deferrable operators available in other provider packages, such as Snowflake and Databricks.

Follow the complete sample code in the GitHub repository to understand how deferrable operators work together. You will be building and orchestrating the data pipeline illustrated in the following figure.

The pipeline consists of three stages:

  • A S3KeySensor that waits for a dataset to be uploaded in Amazon Simple Storage Service (Amazon S3)
  • An AWS Glue crawler to classify objects in the dataset and save schemas into the AWS Glue Data Catalog
  • An AWS Glue job that uses the metadata in the Data Catalog to denormalize the source dataset, create Data Catalog tables based on filtered data, and write the resulting data back to Amazon S3 in separate Apache Parquet files.

Setup and Teardown tasks

It’s common to build workflows that require ephemeral resources, for example an S3 bucket to temporarily store data, databases and corresponding datasets to run quality checks, or a compute cluster to train a model in a machine learning (ML) orchestration pipeline. You need to have these resources properly configured before running work tasks, and after their run, ensure they are torn down. Doing this manually is complex. It may lead to poor readability and maintainability of your DAGs, and leave resources running constantly, thereby increasing costs. With Amazon MWAA support for Apache Airflow version 2.7.2, you can use two new types of tasks to support this scenario: setup and teardown tasks.

Setup and teardown tasks ensure that the resources needed for a work task are set up before the task starts its run and then are taken down after it has finished, even if the work task fails. Any task can be configured as a setup or teardown task. Once configured, they have special visibility in the Airflow UI and also special behavior. The following graph describes a simple data quality check pipeline using setup and teardown tasks.

One option to mark setup_db_instance and teardown_db_instance as setup and teardown tasks is to use the as_teardown() method in the teardown task in the dependencies chain declaration. Note that the method receives the setup task as a parameter:

setup_db_instance >> column_quality_check >> row_count_quality_check >> teardown_db_instance.as_teardown(setups=setup_db_instance)

Another option is to use @setup and @teardown decorators:

from airflow.decorators import setup

@setup
def setup_db_instance():
...
return "Resources fully setup"

setup_db_instance()

After you configure the tasks, the graph view shows your setup tasks with an upward arrow and your teardown tasks with a downward arrow. They’re connected by a dotted line depicting the setup/teardown workflow. Any task between the setup and teardown tasks (such as column_quality_check and row_count_quality_check) are in the scope of the workflow. This arrangement involves the following behavior:

  • If you clear column_quality_check or row_count_quality_check, both setup_db_instance and teardown_db_instance will be cleared
  • If setup_db_instance runs successfully, and column_quality_check and row_count_quality_check have completed, regardless of whether they were successful or not, teardown_db_instance will run
  • If setup_db_instance fails or is skipped, then teardown_db_instance will fail or skip
  • If teardown_db_instance fails, by default Airflow ignores its status to evaluate whether the pipeline run was successful

Note that when creating setup and teardown workflows, there can be more than one set of setup and teardown tasks, and they can be parallel and nested. Neither setup nor teardown tasks are limited in number, nor are the worker tasks you can include in the scope of the workflow.

Follow the complete sample code in the GitHub repository to understand how setup and teardown tasks work.

When to use setup and teardown tasks

Setup and teardown tasks are useful to improve the reliability and cost-effectiveness of DAGs, ensuring that required resources are created and deleted in the right time. They can also help simplify complex DAGs by breaking them down into smaller, more manageable tasks, improving maintainability. Some use cases include:

  • Data processing based on ephemeral compute, like Amazon Elastic Compute Cloud (Amazon EC2) instances fleets or EMR clusters
  • ML model training or tuning pipelines
  • Extract, transform, and load (ETL) jobs using external ephemeral data stores to share data among Airflow tasks

With Amazon MWAA support for Apache Airflow version 2.7.2, you can start using setup and teardown tasks to improve your pipelines as of today. To learn more about Setup and Teardown tasks, refer to the Apache Airflow documentation.

Secrets cache

To reflect changes to your DAGs and tasks, the Apache Airflow scheduler parses your DAG files continuously, every 30 seconds by default. If you have variables or connections as top-level code (code outside the operator’s execute methods), a request is generated every time the DAG file is parsed, impacting parsing speed and leading to sub-optimal performance in the DAG file processing. If you are running at scale, it has the potential to affect Airflow performance and scalability as the amount of network communication and load on the metastore database increase. If you’re using an alternative secrets backend, such as AWS Secrets Manager, every DAG parse is a new request to that service, increasing costs.

With Amazon MWAA support for Apache Airflow version 2.7.2, you can use secrets cache for variables and connections. Airflow will cache variables and connections locally so that they can be accessed faster during DAG parsing, without having to fetch them from the secrets backend, environments variables, or metadata database. The following diagram describes the process.

Enabling caching will help lower the DAG parsing time, especially if variables and connections are used in top-level code (which is not a best practice). With the introduction of a secrets cache, the frequency of API calls to the backend is reduced, which in turn lowers the overall cost associated with backend access. However, similar to other caching implementations, a secrets cache may serve outdated values until the time to live (TTL) expires.

When to use the secrets cache feature

You should consider using the secrets cache feature to improve performance and reliability, and to reduce the operating costs of your Airflow tasks. This is particularly useful if your DAG frequently retrieves variables or connections in the top-level Python code.

How to use the secrets cache feature on Amazon MWAA

To enable the secrets cache, you can set the secrets.use_cache environment configuration parameter to True. Once enabled, Airflow will automatically cache secrets when they are accessed. The cache will only be used during DAG files parsing, and not during DAG runtime.

You can also control the TTL of stored values for which the cache is considered valid using the environment configuration parameter secrets.cache_ttl_seconds, which is defaulted to 15 minutes.

Running or failed filters and Cluster Activity page

Identifying DAGs in failed state can be challenging for large Airflow instances. You typically find yourself scrolling through pages searching for failures to address. With Apache Airflow version 2.7.2 environments in Amazon MWAA, you can now filter DAGs currently running and DAGs with failed DAG runs. As you can see in the following screenshot, two status tabs, Running and Failed, were added to the UI.

Another advantage of Amazon MWAA environments using Apache Airflow version 2.7.2 is the new Cluster Activity page for environment-level monitoring.

The Cluster Activity page gathers useful data to monitor your cluster’s live and historical metrics. In the top section of the page, you get live metrics on the number of DAGs ready to be scheduled, the top 5 longest running DAGs, slots used in different pools, and components health (meta database, scheduler, and triggerer). The following screenshot shows an example of this page.

The bottom section of the Cluster Activity page includes historical metrics of DAG runs and task instances states.

Set up a new Apache Airflow v2.7.2 environment in Amazon MWAA

Setting up a new Apache Airflow version 2.7.2 environment in Amazon MWAA not only provides new features, but also leverages Python 3.11 and the Amazon Linux 2023 (AL2023) base image, offering enhanced security, modern tooling, and support for the latest Python libraries and features. You can initiate the set up in your account and preferred Region using the AWS Management Console, API, or AWS Command Line Interface (AWS CLI). If you’re adopting infrastructure as code (IaC), you can automate the setup using AWS CloudFormation, the AWS Cloud Development Kit (AWS CDK), or Terraform scripts.

Upon successful creation of an Apache Airflow version 2.7.2 environment in Amazon MWAA, certain packages are automatically installed on the scheduler and worker nodes. For a complete list of installed packages and their versions, refer to this MWAA documentation. You can install additional packages using a requirements file. Beginning with Apache Airflow version 2.7.2, your requirements file must include a --constraints statement. If you do not provide a constraint, Amazon MWAA will specify one for you to ensure the packages listed in your requirements are compatible with the version of Apache Airflow you are using.

Upgrade from older versions of Apache Airflow to Apache Airflow v2.7.2

Take advantage of these latest capabilities by upgrading your older Apache Airflow v2.x-based environments to version 2.7.2 using in-place version upgrades. To learn more about in-place version upgrades, refer to Upgrading the Apache Airflow version or Introducing in-place version upgrades with Amazon MWAA.

Conclusion

In this post, we discussed deferrable operators along with some significant changes introduced in Apache Airflow version 2.7.2, such as the Cluster Activity page in the UI, the cache for variables and connections, and how you can get started using them in Amazon MWAA.

For additional details and code examples on Amazon MWAA, visit the Amazon MWAA User Guide and the Amazon MWAA examples GitHub repo.

Apache, Apache Airflow, and Airflow are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries.


About the Authors

Manasi Bhutada is an ISV Solutions Architect based in the Netherlands. She helps customers design and implement well architected solutions in AWS that address their business problems. She is passionate about data analytics and networking. Beyond work she enjoys experimenting with food, playing pickleball, and diving into fun board games.

Hernan Garcia is a Senior Solutions Architect at AWS based in the Netherlands. He works in the Financial Services Industry supporting enterprises in their cloud adoption. He is passionate about serverless technologies, security, and compliance. He enjoys spending time with family and friends, and trying out new dishes from different cuisines.

Use Snowflake with Amazon MWAA to orchestrate data pipelines

Post Syndicated from Payal Singh original https://aws.amazon.com/blogs/big-data/use-snowflake-with-amazon-mwaa-to-orchestrate-data-pipelines/

This blog post is co-written with James Sun from Snowflake.

Customers rely on data from different sources such as mobile applications, clickstream events from websites, historical data, and more to deduce meaningful patterns to optimize their products, services, and processes. With a data pipeline, which is a set of tasks used to automate the movement and transformation of data between different systems, you can reduce the time and effort needed to gain insights from the data. Apache Airflow and Snowflake have emerged as powerful technologies for data management and analysis.

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed workflow orchestration service for Apache Airflow that you can use to set up and operate end-to-end data pipelines in the cloud at scale. The Snowflake Data Cloud provides a single source of truth for all your data needs and allows your organizations to store, analyze, and share large amounts of data. The Apache Airflow open-source community provides over 1,000 pre-built operators (plugins that simplify connections to services) for Apache Airflow to build data pipelines.

In this post, we provide an overview of orchestrating your data pipeline using Snowflake operators in your Amazon MWAA environment. We define the steps needed to set up the integration between Amazon MWAA and Snowflake. The solution provides an end-to-end automated workflow that includes data ingestion, transformation, analytics, and consumption.

Overview of solution

The following diagram illustrates our solution architecture.

Solution Overview

The data used for transformation and analysis is based on the publicly available New York Citi Bike dataset. The data (zipped files), which includes rider demographics and trip data, is copied from the public Citi Bike Amazon Simple Storage Service (Amazon S3) bucket in your AWS account. Data is decompressed and stored in a different S3 bucket (transformed data can be stored in the same S3 bucket where data was ingested, but for simplicity, we’re using two separate S3 buckets). The transformed data is then made accessible to Snowflake for data analysis. The output of the queried data is published to Amazon Simple Notification Service (Amazon SNS) for consumption.

Amazon MWAA uses a directed acyclic graph (DAG) to run the workflows. In this post, we run three DAGs:

The following diagram illustrates this workflow.

DAG run workflow

See the GitHub repo for the DAGs and other files related to the post.

Note that in this post, we’re using a DAG to create a Snowflake connection, but you can also create the Snowflake connection using the Airflow UI or CLI.

Prerequisites

To deploy the solution, you should have a basic understanding of Snowflake and Amazon MWAA with the following prerequisites:

  • An AWS account in an AWS Region where Amazon MWAA is supported.
  • A Snowflake account with admin credentials. If you don’t have an account, sign up for a 30-day free trial. Select the Snowflake enterprise edition for the AWS Cloud platform.
  • Access to Amazon MWAA, Secrets Manager, and Amazon SNS.
  • In this post, we’re using two S3 buckets, called airflow-blog-bucket-ACCOUNT_ID and citibike-tripdata-destination-ACCOUNT_ID. Amazon S3 supports global buckets, which means that each bucket name must be unique across all AWS accounts in all the Regions within a partition. If the S3 bucket name is already taken, choose a different S3 bucket name. Create the S3 buckets in your AWS account. We upload content to the S3 bucket later in the post. Replace ACCOUNT_ID with your own AWS account ID or any other unique identifier. The bucket details are as follows:
    • airflow-blog-bucket-ACCOUNT_ID – The top-level bucket for Amazon MWAA-related files.
    • airflow-blog-bucket-ACCOUNT_ID/requirements – The bucket used for storing the requirements.txt file needed to deploy Amazon MWAA.
    • airflow-blog-bucket-ACCOUNT_ID/dags – The bucked used for storing the DAG files to run workflows in Amazon MWAA.
    • airflow-blog-bucket-ACCOUNT_ID/dags/mwaa_snowflake_queries – The bucket used for storing the Snowflake SQL queries.
    • citibike-tripdata-destination-ACCOUNT_ID – The bucket used for storing the transformed dataset.

When implementing the solution in this post, replace references to airflow-blog-bucket-ACCOUNT_ID and citibike-tripdata-destination-ACCOUNT_ID with the names of your own S3 buckets.

Set up the Amazon MWAA environment

First, you create an Amazon MWAA environment. Before deploying the environment, upload the requirements file to the airflow-blog-bucket-ACCOUNT_ID/requirements S3 bucket. The requirements file is based on Amazon MWAA version 2.6.3. If you’re testing on a different Amazon MWAA version, update the requirements file accordingly.

Complete the following steps to set up the environment:

  1. On the Amazon MWAA console, choose Create environment.
  2. Provide a name of your choice for the environment.
  3. Choose Airflow version 2.6.3.
  4. For the S3 bucket, enter the path of your bucket (s3:// airflow-blog-bucket-ACCOUNT_ID).
  5. For the DAGs folder, enter the DAGs folder path (s3:// airflow-blog-bucket-ACCOUNT_ID/dags).
  6. For the requirements file, enter the requirements file path (s3:// airflow-blog-bucket-ACCOUNT_ID/ requirements/requirements.txt).
  7. Choose Next.
  8. Under Networking, choose your existing VPC or choose Create MWAA VPC.
  9. Under Web server access, choose Public network.
  10. Under Security groups, leave Create new security group selected.
  11. For the Environment class, Encryption, and Monitoring sections, leave all values as default.
  12. In the Airflow configuration options section, choose Add custom configuration value and configure two values:
    1. Set Configuration option to secrets.backend and Custom value to airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend.
    2. Set Configuration option to secrets.backend_kwargs and Custom value to {"connections_prefix" : "airflow/connections", "variables_prefix" : "airflow/variables"}.                      Configuration options for secret manager
  13. In the Permissions section, leave the default settings and choose Create a new role.
  14. Choose Next.
  15. When the Amazon MWAA environment us available, assign S3 bucket permissions to the AWS Identity and Access Management (IAM) execution role (created during the Amazon MWAA install).

MWAA execution role
This will direct you to the created execution role on the IAM console.

For testing purposes, you can choose Add permissions and add the managed AmazonS3FullAccess policy to the user instead of providing restricted access. For this post, we provide only the required access to the S3 buckets.

  1. On the drop-down menu, choose Create inline policy.
  2. For Select Service, choose S3.
  3. Under Access level, specify the following:
    1. Expand List level and select ListBucket.
    2. Expand Read level and select GetObject.
    3. Expand Write level and select PutObject.
  4. Under Resources, choose Add ARN.
  5. On the Text tab, provide the following ARNs for S3 bucket access:
    1. arn:aws:s3:::airflow-blog-bucket-ACCOUNT_ID (use your own bucket).
    2. arn:aws:s3:::citibike-tripdata-destination-ACCOUNT_ID (use your own bucket).
    3. arn:aws:s3:::tripdata (this is the public S3 bucket where the Citi Bike dataset is stored; use the ARN as specified here).
  6. Under Resources, choose Add ARN.
  7. On the Text tab, provide the following ARNs for S3 bucket access:
    1. arn:aws:s3:::airflow-blog-bucket-ACCOUNT_ID/* (make sure to include the asterisk).
    2. arn:aws:s3:::citibike-tripdata-destination-ACCOUNT_ID /*.
    3. arn:aws:s3:::tripdata/* (this is the public S3 bucket where the Citi Bike dataset is stored, use the ARN as specified here).
  8. Choose Next.
  9. For Policy name, enter S3ReadWrite.
  10. Choose Create policy.
  11. Lastly, provide Amazon MWAA with permission to access Secrets Manager secret keys.

This step provides the Amazon MWAA execution role for your Amazon MWAA environment read access to the secret key in Secrets Manager.

The execution role should have the policies MWAA-Execution-Policy*, S3ReadWrite, and SecretsManagerReadWrite attached to it.

MWAA execution role policies

When the Amazon MWAA environment is available, you can sign in to the Airflow UI from the Amazon MWAA console using link for Open Airflow UI.

Airflow UI access

Set up an SNS topic and subscription

Next, you create an SNS topic and add a subscription to the topic. Complete the following steps:

  1. On the Amazon SNS console, choose Topics from the navigation pane.
  2. Choose Create topic.
  3. For Topic type, choose Standard.
  4. For Name, enter mwaa_snowflake.
  5. Leave the rest as default.
  6. After you create the topic, navigate to the Subscriptions tab and choose Create subscription.
    SNS topic
  7. For Topic ARN, choose mwaa_snowflake.
  8. Set the protocol to Email.
  9. For Endpoint, enter your email ID (you will get a notification in your email to accept the subscription).

By default, only the topic owner can publish and subscribe to the topic, so you need to modify the Amazon MWAA execution role access policy to allow Amazon SNS access.

  1. On the IAM console, navigate to the execution role you created earlier.
  2. On the drop-down menu, choose Create inline policy.
    MWAA execution role SNS policy
  3. For Select service, choose SNS.
  4. Under Actions, expand Write access level and select Publish.
  5. Under Resources, choose Add ARN.
  6. On the Text tab, specify the ARN arn:aws:sns:<<region>>:<<our_account_ID>>:mwaa_snowflake.
  7. Choose Next.
  8. For Policy name, enter SNSPublishOnly.
  9. Choose Create policy.

Configure a Secrets Manager secret

Next, we set up Secrets Manager, which is a supported alternative database for storing Snowflake connection information and credentials.

To create the connection string, the Snowflake host and account name is required. Log in to your Snowflake account, and under the Worksheets menu, choose the plus sign and select SQL worksheet. Using the worksheet, run the following SQL commands to find the host and account name.

Run the following query for the host name:

WITH HOSTLIST AS 
(SELECT * FROM TABLE(FLATTEN(INPUT => PARSE_JSON(SYSTEM$allowlist()))))
SELECT REPLACE(VALUE:host,'"','') AS HOST
FROM HOSTLIST
WHERE VALUE:type = 'SNOWFLAKE_DEPLOYMENT_REGIONLESS';

Run the following query for the account name:

WITH HOSTLIST AS 
(SELECT * FROM TABLE(FLATTEN(INPUT => PARSE_JSON(SYSTEM$allowlist()))))
SELECT REPLACE(VALUE:host,'.snowflakecomputing.com','') AS ACCOUNT
FROM HOSTLIST
WHERE VALUE:type = 'SNOWFLAKE_DEPLOYMENT';

Next, we configure the secret in Secrets Manager.

  1. On the Secrets Manager console, choose Store a new secret.
  2. For Secret type, choose Other type of secret.
  3. Under Key/Value pairs, choose the Plaintext tab.
  4. In the text field, enter the following code and modify the string to reflect your Snowflake account information:

{"host": "<<snowflake_host_name>>", "account":"<<snowflake_account_name>>","user":"<<snowflake_username>>","password":"<<snowflake_password>>","schema":"mwaa_schema","database":"mwaa_db","role":"accountadmin","warehouse":"dev_wh"}

For example:

{"host": "xxxxxx.snowflakecomputing.com", "account":"xxxxxx" ,"user":"xxxxx","password":"*****","schema":"mwaa_schema","database":"mwaa_db", "role":"accountadmin","warehouse":"dev_wh"}

The values for the database name, schema name, and role should be as mentioned earlier. The account, host, user, password, and warehouse can differ based on your setup.

Secret information

Choose Next.

  1. For Secret name, enter airflow/connections/snowflake_accountadmin.
  2. Leave all other values as default and choose Next.
  3. Choose Store.

Take note of the Region in which the secret was created under Secret ARN. We later define it as a variable in the Airflow UI.

Configure Snowflake access permissions and IAM role

Next, log in to your Snowflake account. Ensure the account you are using has account administrator access. Create a SQL worksheet. Under the worksheet, create a warehouse named dev_wh.

The following is an example SQL command:

CREATE OR REPLACE WAREHOUSE dev_wh 
 WITH WAREHOUSE_SIZE = 'xsmall' 
 AUTO_SUSPEND = 60 
 INITIALLY_SUSPENDED = true
 AUTO_RESUME = true
 MIN_CLUSTER_COUNT = 1
 MAX_CLUSTER_COUNT = 5;

For Snowflake to read data from and write data to an S3 bucket referenced in an external (S3 bucket) stage, a storage integration is required. Follow the steps defined in Option 1: Configuring a Snowflake Storage Integration to Access Amazon S3(only perform Steps 1 and 2, as described in this section).

Configure access permissions for the S3 bucket

While creating the IAM policy, a sample policy document code is needed (see the following code), which provides Snowflake with the required permissions to load or unload data using a single bucket and folder path. The bucket name used in this post is citibike-tripdata-destination-ACCOUNT_ID. You should modify it to reflect your bucket name.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:PutObject",
        "s3:GetObject",
        "s3:GetObjectVersion",
        "s3:DeleteObject",
        "s3:DeleteObjectVersion"
      ],
      "Resource": "arn:aws:s3::: citibike-tripdata-destination-ACCOUNT_ID/*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "s3:ListBucket",
        "s3:GetBucketLocation"
      ],
      "Resource": "arn:aws:s3:::citibike-tripdata-destination-ACCOUNT_ID"
    }
  ]
}

Create the IAM role

Next, you create the IAM role to grant privileges on the S3 bucket containing your data files. After creation, record the Role ARN value located on the role summary page.

Snowflake IAM role

Configure variables

Lastly, configure the variables that will be accessed by the DAGs in Airflow. Log in to the Airflow UI and on the Admin menu, choose Variables and the plus sign.

Airflow variables

Add four variables with the following key/value pairs:

  • Key aws_role_arn with value <<snowflake_aws_role_arn>> (the ARN for role mysnowflakerole noted earlier)
  • Key destination_bucket with value <<bucket_name>> (for this post, the bucket used in citibike-tripdata-destination-ACCOUNT_ID)
  • Key target_sns_arn with value <<sns_Arn>> (the SNS topic in your account)
  • Key sec_key_region with value <<region_of_secret_deployment>> (the Region where the secret in Secrets Manager was created)

The following screenshot illustrates where to find the SNS topic ARN.

SNS topic ARN

The Airflow UI will now have the variables defined, which will be referred to by the DAGs.

Airflow variables list

Congratulations, you have completed all the configuration steps.

Run the DAG

Let’s look at how to run the DAGs. To recap:

  • DAG1 (create_snowflake_connection_blog.py) – Creates the Snowflake connection in Apache Airflow. This connection will be used to authenticate with Snowflake. The Snowflake connection string is stored in Secrets Manager, which is referenced in the DAG file.
  • DAG2 (create-snowflake_initial-setup_blog.py) – Creates the database, schema, storage integration, and stage in Snowflake.
  • DAG3 (run_mwaa_datapipeline_blog.py) – Runs the data pipeline, which will unzip files from the source public S3 bucket and copy them to the destination S3 bucket. The next task will create a table in Snowflake to store the data. Then the data from the destination S3 bucket will be copied into the table using a Snowflake stage. After the data is successfully copied, a view will be created in Snowflake, on top of which the SQL queries will be run.

To run the DAGs, complete the following steps:

  1. Upload the DAGs to the S3 bucket airflow-blog-bucket-ACCOUNT_ID/dags.
  2. Upload the SQL query files to the S3 bucket airflow-blog-bucket-ACCOUNT_ID/dags/mwaa_snowflake_queries.
  3. Log in to the Apache Airflow UI.
  4. Locate DAG1 (create_snowflake_connection_blog), un-pause it, and choose the play icon to run it.

You can view the run state of the DAG using the Grid or Graph view in the Airflow UI.

Dag1 run

After DAG1 runs, the Snowflake connection snowflake_conn_accountadmin is created on the Admin, Connections menu.

  1. Locate and run DAG2 (create-snowflake_initial-setup_blog).

Dag2 run

After DAG2 runs, the following objects are created in Snowflake:

  • The database mwaa_db
  • The schema mwaa_schema
  • The storage integration mwaa_citibike_storage_int
  • The stage mwaa_citibike_stg

Before running the final DAG, the trust relationship for the IAM user needs to be updated.

  1. Log in to your Snowflake account using your admin account credentials.
  2. Open your SQL worksheet created earlier and run the following command:
DESC INTEGRATION mwaa_citibike_storage_int;

mwaa_citibike_storage_int is the name of the integration created by the DAG2 in the previous step.

From the output, record the property value of the following two properties:

  • STORAGE_AWS_IAM_USER_ARN – The IAM user created for your Snowflake account.
  • STORAGE_AWS_EXTERNAL_ID – The external ID that is needed to establish a trust relationship.

Now we grant the Snowflake IAM user permissions to access bucket objects.

  1. On the IAM console, choose Roles in the navigation pane.
  2. Choose the role mysnowflakerole.
  3. On the Trust relationships tab, choose Edit trust relationship.
  4. Modify the policy document with the DESC STORAGE INTEGRATION output values you recorded. For example:
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::5xxxxxxxx:user/mgm4-s- ssca0079"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "AWSPARTNER_SFCRole=4_vsarJrupIjjJh77J9Nxxxx/j98="
        }
      }
    }
  ]
}

The AWS role ARN and ExternalId will be different for your environment based on the output of the DESC STORAGE INTEGRATION query

Trust relationship

  1. Locate and run the final DAG (run_mwaa_datapipeline_blog).

At the end of the DAG run, the data is ready for querying. In this example, the query (finding the top start and destination stations) is run as part of the DAG and the output can be viewed from the Airflow XCOMs UI.

Xcoms

In the DAG run, the output is also published to Amazon SNS and based on the subscription, an email notification is sent out with the query output.

Email

Another method to visualize the results is directly from the Snowflake console using the Snowflake worksheet. The following is an example query:

SELECT START_STATION_NAME,
COUNT(START_STATION_NAME) C FROM MWAA_DB.MWAA_SCHEMA.CITIBIKE_VW 
GROUP BY 
START_STATION_NAME ORDER BY C DESC LIMIT 10;

Snowflake visual

There are different ways to visualize the output based on your use case.

As we observed, DAG1 and DAG2 need to be run only one time to set up the Amazon MWAA connection and Snowflake objects. DAG3 can be scheduled to run every week or month. With this solution, the user examining the data doesn’t have to log in to either Amazon MWAA or Snowflake. You can have an automated workflow triggered on a schedule that will ingest the latest data from the Citi Bike dataset and provide the top start and destination stations for the given dataset.

Clean up

To avoid incurring future charges, delete the AWS resources (IAM users and roles, Secrets Manager secrets, Amazon MWAA environment, SNS topics and subscription, S3 buckets) and Snowflake resources (database, stage, storage integration, view, tables) created as part of this post.

Conclusion

In this post, we demonstrated how to set up an Amazon MWAA connection for authenticating to Snowflake as well as to AWS using AWS user credentials. We used a DAG to automate creating the Snowflake objects such as database, tables, and stage using SQL queries. We also orchestrated the data pipeline using Amazon MWAA, which ran tasks related to data transformation as well as Snowflake queries. We used Secrets Manager to store Snowflake connection information and credentials and Amazon SNS to publish the data output for end consumption.

With this solution, you have an automated end-to-end orchestration of your data pipeline encompassing ingesting, transformation, analysis, and data consumption.

To learn more, refer to the following resources:


About the authors

Payal Singh is a Partner Solutions Architect at Amazon Web Services, focused on the Serverless platform. She is responsible for helping partner and customers modernize and migrate their applications to AWS.

James Sun is a Senior Partner Solutions Architect at Snowflake. He actively collaborates with strategic cloud partners like AWS, supporting product and service integrations, as well as the development of joint solutions. He has held senior technical positions at tech companies such as EMC, AWS, and MapR Technologies. With over 20 years of experience in storage and data analytics, he also holds a PhD from Stanford University.

Bosco Albuquerque is a Sr. Partner Solutions Architect at AWS and has over 20 years of experience working with database and analytics products from enterprise database vendors and cloud providers. He has helped technology companies design and implement data analytics solutions and products.

Manuj Arora is a Sr. Solutions Architect for Strategic Accounts in AWS. He focuses on Migration and Modernization capabilities and offerings in AWS. Manuj has worked as a Partner Success Solutions Architect in AWS over the last 3 years and worked with partners like Snowflake to build solution blueprints that are leveraged by the customers. Outside of work, he enjoys traveling, playing tennis and exploring new places with family and friends.

Set up fine-grained permissions for your data pipeline using MWAA and EKS

Post Syndicated from Ulrich Hinze original https://aws.amazon.com/blogs/big-data/set-up-fine-grained-permissions-for-your-data-pipeline-using-mwaa-and-eks/

This is a guest blog post co-written with Patrick Oberherr from Contentful and Johannes Günther from Netlight Consulting.

This blog post shows how to improve security in a data pipeline architecture based on Amazon Managed Workflows for Apache Airflow (Amazon MWAA) and Amazon Elastic Kubernetes Service (Amazon EKS) by setting up fine-grained permissions, using HashiCorp Terraform for infrastructure as code.

Many AWS customers use Amazon EKS to execute their data workloads. The advantages of Amazon EKS include different compute and storage options depending on workload needs, higher resource utilization by sharing underlying infrastructure, and a vibrant open-source community that provides purpose-built extensions. The Data on EKS project provides a series of templates and other resources to help customers get started on this journey. It includes a description of using Amazon MWAA as a job scheduler.

Contentful is an AWS customer and AWS Partner Network (APN) partner. Behind the scenes of their Software-as-a-Service (SaaS) product, the Contentful Composable Content Platform, Contentful uses insights from data to improve business decision-making and customer experience. Contentful engaged Netlight, an APN consulting partner, to help set up a data platform to gather these insights.

Most of Contentful’s application workloads run on Amazon EKS, and knowledge of this service and Kubernetes is widespread in the organization. That’s why Contentful’s data engineering team decided to run data pipelines on Amazon EKS as well. For job scheduling, they started with a self-operated Apache Airflow on an Amazon EKS cluster and later switched to Amazon MWAA to reduce engineering and operations overhead. The job execution remained on Amazon EKS.

Contentful runs a complex data pipeline using this infrastructure, including ingestion from multiple data sources and different transformation jobs, for example using dbt. The whole pipeline shares a single Amazon MWAA environment and a single Amazon EKS cluster. With a diverse set of workloads in a single environment, it is necessary to apply the principle of least privilege, ensuring that individual tasks or components have only the specific permissions they need to function.

By segmenting permissions according to roles and responsibilities, Contentful’s data engineering team was able to create a more robust and secure data processing environment, which is essential for maintaining the integrity and confidentiality of the data being handled.

In this blog post, we walk through setting up the infrastructure from scratch and deploying a sample application using Terraform, Contentful’s tool of choice for infrastructure as code.

Prerequisites

To follow along this blog post, you need the latest version of the following tools installed:

Overview

In this blog post, you will create a sample application with the following infrastructure:

Architecture drawing of the sample application deployed in this blog post

The sample Airflow workflow lists objects in the source bucket, temporarily stores this list using Airflow XComs, and writes the list as a file to the destination bucket. This application is executed using Amazon EKS pods, scheduled by an Amazon MWAA environment. You deploy the EKS cluster and the MWAA environment into a virtual private cloud (VPC) and apply least-privilege permissions to the EKS pods using IAM roles for service accounts. The configuration bucket for Amazon MWAA contains runtime requirements, as well as the application code specifying an Airflow Directed Acyclic Graph (DAG).

Initialize the project and create buckets

Create a file main.tf with the following content in an empty directory:

locals {
  region = "us-east-1"
}

provider "aws" {
  region = local.region
}

resource "aws_s3_bucket" "source_bucket" {
  bucket_prefix = "source"
}

resource "aws_s3_object" "dummy_object" {
  bucket  = aws_s3_bucket.source_bucket.bucket
  key     = "dummy.txt"
  content = ""
}

resource "aws_ssm_parameter" "source_bucket" {
  name  = "mwaa_source_bucket"
  type  = "SecureString"
  value = aws_s3_bucket.source_bucket.bucket
}

resource "aws_s3_bucket" "destination_bucket" {
  bucket_prefix = "destination"
  force_destroy = true
}

resource "aws_ssm_parameter" "destination_bucket" {
  name  = "mwaa_destination_bucket"
  type  = "SecureString"
  value = aws_s3_bucket.destination_bucket.bucket
}

This file defines the Terraform AWS provider as well as the source and destination bucket, whose names are exported as AWS Systems Manager parameters. It also tells Terraform to upload an empty object named dummy.txt into the source bucket, which enables the Airflow sample application we will create later to receive a result when listing bucket content.

Initialize the Terraform project and download the module dependencies by issuing the following command:

terraform init

Create the infrastructure:

terraform apply

Terraform asks you to acknowledge changes to the environment and then starts deploying resources in AWS. Upon successful deployment, you should see the following success message:

Apply complete! Resources: 5 added, 0 changed, 0 destroyed.

Create VPC

Create a new file vpc.tf in the same directory as main.tf and insert the following:

data "aws_availability_zones" "available" {}

locals {
  cidr = "10.0.0.0/16"
  azs  = slice(data.aws_availability_zones.available.names, 0, 3)
}

module "vpc" {
  name               = "data-vpc"
  source             = "terraform-aws-modules/vpc/aws"
  version            = "~> 4.0"
  cidr               = local.cidr
  azs                = local.azs
  public_subnets     = [for k, v in local.azs : cidrsubnet(local.cidr, 8, k + 48)]
  private_subnets    = [for k, v in local.azs : cidrsubnet(local.cidr, 4, k)]
  enable_nat_gateway = true
}

This file defines the VPC, a virtual network, that will later host the Amazon EKS cluster and the Amazon MWAA environment. Note that we use an existing Terraform module for this, which wraps configuration of underlying network resources like subnets, route tables, and NAT gateways.

Download the VPC module:

terraform init

Deploy the new resources:

terraform apply

Note which resources are being created. By using the VPC module in our Terraform file, much of the underlying complexity is taken away when defining our infrastructure, but it’s still useful to know what exactly is being deployed.

Note that Terraform now handles resources we defined in both files, main.tf and vpc.tf, because Terraform includes all .tf files in the current working directory.

Create the Amazon MWAA environment

Create a new file mwaa.tf and insert the following content:

locals {
  requirements_filename = "requirements.txt"
  airflow_version       = "2.6.3"
  requirements_content  = <<EOT
apache-airflow[cncf.kubernetes]==${local.airflow_version}
EOT
}

module "mwaa" {
  source = "github.com/aws-ia/terraform-aws-mwaa?ref=1066050"

  name              = "mwaa"
  airflow_version   = local.airflow_version
  environment_class = "mw1.small"

  vpc_id             = module.vpc.vpc_id
  private_subnet_ids = slice(module.vpc.private_subnets, 0, 2)

  webserver_access_mode = "PUBLIC_ONLY"

  requirements_s3_path = local.requirements_filename
}

resource "aws_s3_object" "requirements" {
  bucket  = module.mwaa.aws_s3_bucket_name
  key     = local.requirements_filename
  content = local.requirements_content

  etag = md5(local.requirements_content)
}

Like before, we use an existing module to save configuration effort for the Amazon MWAA environment. The module also creates the configuration bucket, which we use to specify the runtime dependency of the application (apache-airflow-cncf-kubernetes) in the requirements.txt file. This package, in combination with the preinstalled package apache-airflow-amazon, enables interaction with Amazon EKS.

Download the MWAA module:

terraform init

Deploy the new resources:

terraform apply

This operation takes 20–30 minutes to complete.

Create the Amazon EKS cluster

Create a file eks.tf with the following content:

module "cluster" {
  source = "github.com/aws-ia/terraform-aws-eks-blueprints?ref=8a06a6e"

  cluster_name    = "data-cluster"
  cluster_version = "1.27"

  vpc_id             = module.vpc.vpc_id
  private_subnet_ids = module.vpc.private_subnets
  enable_irsa        = true

  managed_node_groups = {
    node_group = {
      node_group_name = "node-group"
      desired_size    = 1
    }
  }
  application_teams = {
    mwaa = {}
  }

  map_roles = [{
    rolearn  = module.mwaa.mwaa_role_arn
    username = "mwaa-executor"
    groups   = []
  }]
}

data "aws_eks_cluster_auth" "this" {
  name = module.cluster.eks_cluster_id
}

provider "kubernetes" {
  host                   = module.cluster.eks_cluster_endpoint
  cluster_ca_certificate = base64decode(module.cluster.eks_cluster_certificate_authority_data)
  token                  = data.aws_eks_cluster_auth.this.token
}

resource "kubernetes_role" "mwaa_executor" {
  metadata {
    name      = "mwaa-executor"
    namespace = "mwaa"
  }

  rule {
    api_groups = [""]
    resources  = ["pods", "pods/log", "pods/exec"]
    verbs      = ["get", "list", "create", "patch", "delete"]
  }
}

resource "kubernetes_role_binding" "mwaa_executor" {
  metadata {
    name      = "mwaa-executor"
    namespace = "mwaa"
  }
  role_ref {
    api_group = "rbac.authorization.k8s.io"
    kind      = "Role"
    name      = kubernetes_role.mwaa_executor.metadata[0].name
  }
  subject {
    kind      = "User"
    name      = "mwaa-executor"
    api_group = "rbac.authorization.k8s.io"
  }
}

output "configure_kubectl" {
  description = "Configure kubectl: make sure you're logged in with the correct AWS profile and run the following command to update your kubeconfig"
  value       = "aws eks --region ${local.region} update-kubeconfig --name ${module.cluster.eks_cluster_id}"
}

To create the cluster itself, we take advantage of the Amazon EKS Blueprints for Terraform project. We also define a managed node group with one node as the target size. Note that in cases with fluctuating load, scaling your cluster with Karpenter instead of the managed node group approach shown above makes the cluster scale more flexibly. We used managed node groups primarily because of the ease of configuration.

We define the identity that the Amazon MWAA execution role assumes in Kubernetes using the map_roles variable. After configuring the Terraform Kubernetes provider, we give the Amazon MWAA execution role permissions to manage pods in the cluster.

Download the EKS Blueprints for Terraform module:

terraform init

Deploy the new resources:

terraform apply

This operation takes about 12 minutes to complete.

Create IAM roles for service accounts

Create a file roles.tf with the following content:

data "aws_iam_policy_document" "source_bucket_reader" {
  statement {
    actions   = ["s3:ListBucket"]
    resources = ["${aws_s3_bucket.source_bucket.arn}"]
  }
  statement {
    actions   = ["ssm:GetParameter"]
    resources = [aws_ssm_parameter.source_bucket.arn]
  }
}

resource "aws_iam_policy" "source_bucket_reader" {
  name   = "source_bucket_reader"
  path   = "/"
  policy = data.aws_iam_policy_document.source_bucket_reader.json
}

module "irsa_source_bucket_reader" {
  source = "github.com/aws-ia/terraform-aws-eks-blueprints//modules/irsa"

  eks_cluster_id              = module.cluster.eks_cluster_id
  eks_oidc_provider_arn       = module.cluster.eks_oidc_provider_arn
  irsa_iam_policies           = [aws_iam_policy.source_bucket_reader.arn]
  kubernetes_service_account  = "source-bucket-reader-sa"
  kubernetes_namespace        = "mwaa"
  create_kubernetes_namespace = false
}

data "aws_iam_policy_document" "destination_bucket_writer" {
  statement {
    actions   = ["s3:PutObject"]
    resources = ["${aws_s3_bucket.destination_bucket.arn}/*"]
  }
  statement {
    actions   = ["ssm:GetParameter"]
    resources = [aws_ssm_parameter.destination_bucket.arn]
  }
}

resource "aws_iam_policy" "destination_bucket_writer" {
  name   = "irsa_destination_bucket_writer"
  policy = data.aws_iam_policy_document.destination_bucket_writer.json
}

module "irsa_destination_bucket_writer" {
  source = "github.com/aws-ia/terraform-aws-eks-blueprints//modules/irsa"

  eks_cluster_id              = module.cluster.eks_cluster_id
  eks_oidc_provider_arn       = module.cluster.eks_oidc_provider_arn
  irsa_iam_policies           = [aws_iam_policy.destination_bucket_writer.arn]
  kubernetes_service_account  = "destination-bucket-writer-sa"
  kubernetes_namespace        = "mwaa"
  create_kubernetes_namespace = false
}

This file defines two Kubernetes service accounts, source-bucket-reader-sa and destination-bucket-writer-sa, and their permissions against the AWS API, using IAM roles for service accounts (IRSA). Again, we use a module from the Amazon EKS Blueprints for Terraform project to simplify IRSA configuration. Note that both roles only get the minimum permissions that they need, defined using AWS IAM policies.

Download the new module:

terraform init

Deploy the new resources:

terraform apply

Create the DAG

Create a file dag.py defining the Airflow DAG:

from datetime import datetime

from airflow import DAG
from airflow.providers.amazon.aws.operators.eks import EksPodOperator

dag = DAG(
    "dag_with_fine_grained_permissions",
    description="DAG with fine-grained permissions",
    default_args={
        "cluster_name": "data-cluster",
        "namespace": "mwaa",
        "get_logs": True,
        "is_delete_operator_pod": True,
    },
    schedule="@hourly",
    start_date=datetime(2023, 1, 1),
    catchup=False,
)

read_bucket = EksPodOperator(
    task_id="read-bucket",
    pod_name="read-bucket",
    service_account_name="source-bucket-reader-sa",
    image="amazon/aws-cli:latest",
    cmds=[
        "sh",
        "-xc",
        "aws s3api list-objects --output json --bucket $(aws ssm get-parameter --name mwaa_source_bucket --with-decryption --query 'Parameter.Value' --output text)  > /airflow/xcom/return.json",
    ],
    do_xcom_push=True,
    dag=dag,
)

write_bucket = EksPodOperator(
    task_id="write-bucket",
    pod_name="write-bucket",
    service_account_name="destination-bucket-writer-sa",
    image="amazon/aws-cli:latest",
    cmds=[
        "sh",
        "-xc",
        "echo '{{ task_instance.xcom_pull('read-bucket')|tojson }}' > list.json; aws s3 cp list.json s3://$(aws ssm get-parameter --name mwaa_destination_bucket  --with-decryption --query 'Parameter.Value' --output text)",
    ],
    dag=dag,
)

read_bucket >> write_bucket

The DAG is defined to run on an hourly schedule, with two tasks read_bucket with service account source-bucket-reader-sa and write_bucket with service account destination-bucket-writer-sa, running after one another. Both are run using the EksPodOperator, which is responsible for scheduling the tasks on Amazon EKS, using the AWS CLI Docker image to run commands. The first task lists files in the source bucket and writes the list to Airflow XCom. The second task reads the list from XCom and stores it in the destination bucket. Note that the service_account_name parameter differentiates what each task is permitted to do.

Create a file dag.tf to upload the DAG code to the Amazon MWAA configuration bucket:

locals {
  dag_filename = "dag.py"
}

resource "aws_s3_object" "dag" {
  bucket = module.mwaa.aws_s3_bucket_name
  key    = "dags/${local.dag_filename}"
  source = local.dag_filename

  etag = filemd5(local.dag_filename)
}

Deploy the changes:

terraform apply

The Amazon MWAA environment automatically imports the file from the S3 bucket.

Run the DAG

In your browser, navigate to the Amazon MWAA console and select your environment. In the top right-hand corner, select Open Airflow UI . You should see the following:

Screenshot of the MWAA user interface

To trigger the DAG, in the Actions column, select the play symbol and then select Trigger DAG. Click on the DAG name to explore the DAG run and its results.

Navigate to the Amazon S3 console and choose the bucket starting with “destination”. It should contain a file list.json recently created by the write_bucket task. Download the file to explore its content, a JSON list with a single entry.

Clean up

The resources you created in this walkthrough incur AWS costs. To delete the created resources, issue the following command:

terraform destroy

And approve the changes in the Terraform CLI dialog.

Conclusion

In this blog post, you learned how to improve the security of your data pipeline running on Amazon MWAA and Amazon EKS by narrowing the permissions of each individual task.

To dive deeper, use the working example created in this walkthrough to explore the topic further: What happens if you remove the service_account_name parameter from an Airflow task? What happens if you exchange the service account names in the two tasks?

For simplicity, in this walkthrough we used a flat file structure with Terraform and Python files inside a single directory. We did not adhere to the standard module structure proposed by Terraform, which is generally recommended. In a real-life project, splitting up the project into multiple Terraform projects or modules may also increase flexibility, speed, and independence between teams owning different parts of the infrastructure.

Lastly, make sure to study the Data on EKS documentation, which provides other valuable resources for running your data pipeline on Amazon EKS, as well as the Amazon MWAA and Apache Airflow documentation for implementing your own use cases. Specifically, have a look at this sample implementation of a Terraform module for Amazon MWAA and Amazon EKS, which contains a more mature approach to Amazon EKS configuration and node automatic scaling, as well as networking.

If you have any questions, you can start a new thread on AWS re:Post or reach out to AWS Support.


About the Authors

Ulrich Hinze is a Solutions Architect at AWS. He partners with software companies to architect and implement cloud-based solutions on AWS. Before joining AWS, he worked for AWS customers and partners in software engineering, consulting, and architecture roles for 8+ years.

Patrick Oberherr is a Staff Data Engineer at Contentful with 4+ years of working with AWS and 10+ years in the Data field. At Contentful he is responsible for infrastructure and operations of the data stack which is hosted on AWS.

Johannes Günther is a cloud & data consultant at Netlight with 5+ years of working with AWS. He has helped clients across various industries designing sustainable cloud platforms and is AWS certified.

AWS Weekly Roundup – Amazon MWAA, EMR Studio, Generative AI, and More – August 14, 2023

Post Syndicated from Antje Barth original https://aws.amazon.com/blogs/aws/aws-weekly-roundup-amazon-mwaa-emr-studio-generative-ai-and-more-august-14-2023/

While I enjoyed a few days off in California to get a dose of vitamin sea, a lot has happened in the AWS universe. Let’s take a look together!

Last Week’s Launches
Here are some launches that got my attention:

Amazon MWAA now supports Apache Airflow version 2.6Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow that you can use to set up and operate end-to-end data pipelines in the cloud. Apache Airflow version 2.6 introduces important security updates and bug fixes that enhance the security and reliability of your workflows. If you’re currently running Apache Airflow version 2.x, you can now seamlessly upgrade to version 2.6.3. Check out this AWS Big Data Blog post to learn more.

Amazon EMR Studio adds support for AWS Lake Formation fine-grained access controlAmazon EMR Studio is a web-based integrated development environment (IDE) for fully managed Jupyter notebooks that run on Amazon EMR clusters. When you connect to EMR clusters from EMR Studio workspaces, you can now choose the AWS Identity and Access Management (IAM) role that you want to connect with. Apache Spark interactive notebooks will access only the data and resources permitted by policies attached to this runtime IAM role. When data is accessed from data lakes managed with AWS Lake Formation, you can enforce table and column-level access using policies attached to this runtime role. For more details, have a look at the Amazon EMR documentation.

AWS Security Hub launches 12 new security controls AWS Security Hub is a cloud security posture management (CSPM) service that performs security best practice checks, aggregates alerts, and enables automated remediation. With the newly released controls, Security Hub now supports three additional AWS services: Amazon Athena, Amazon DocumentDB (with MongoDB compatibility), and Amazon Neptune. Security Hub has also added an additional control against Amazon Relational Database Service (Amazon RDS). AWS Security Hub now offers 276 controls. You can find more information in the AWS Security Hub documentation.

Additional AWS services available in the AWS Israel (Tel Aviv) Region – The AWS Israel (Tel Aviv) Region opened on August 1, 2023. This past week, AWS Service Catalog, Amazon SageMaker, Amazon EFS, and Amazon Kinesis Data Analytics were added to the list of available services in the Israel (Tel Aviv) Region. Check the AWS Regional Services List for the most up-to-date availability information.

For a full list of AWS announcements, be sure to keep an eye on the What’s New at AWS page.

Other AWS News
Here are some additional blog posts and news items that you might find interesting:

AWS recognized as a Leader in 2023 Gartner Magic Quadrant for Contact Center as a Service with Amazon Connect – AWS was named a Leader for the first time since Amazon Connect, our flexible, AI-powered cloud contact center, was launched in 2017. Read the full story here. 

Generate creative advertising using generative AI –  This AWS Machine Learning Blog post shows how to generate captivating and innovative advertisements at scale using generative AI. It discusses the technique of inpainting and how to seamlessly create image backgrounds, visually stunning and engaging content, and reducing unwanted image artifacts.

AWS open-source news and updates – My colleague Ricardo writes this weekly open-source newsletter in which he highlights new open-source projects, tools, and demos from the AWS Community.

Upcoming AWS Events
Check your calendars and sign up for these AWS events:

Build On AWS - Generative AIBuild On Generative AI – Your favorite weekly Twitch show about all things generative AI is back for season 2 today! Every Monday, 9:00 US PT, my colleagues Emily and Darko look at new technical and scientific patterns on AWS, inviting guest speakers to demo their work and show us how they built something new to improve the state of generative AI.

In today’s episode, Emily and Darko discussed the latest models LlaMa-2 and Falcon, and explored them in retrieval-augmented generation design patterns. You can watch the video here. Check out show notes and the full list of episodes on community.aws.

AWS NLP Conference 2023 – Join this in-person event on September 13–14 in London to hear about the latest trends, ground-breaking research, and innovative applications that leverage natural language processing (NLP) capabilities on AWS. This year, the conference will primarily focus on large language models (LLMs), as they form the backbone of many generative AI applications and use cases. Register here.

AWS Global Summits – The 2023 AWS Summits season is almost coming to an end with the last two in-person events in Mexico City (August 30) and Johannesburg (September 26).

AWS Community Days – Join a community-led conference run by AWS user group leaders in your region: West Africa (August 19), Taiwan (August 26), Aotearoa (September 6), Lebanon (September 9), and Munich (September 14).

AWS re:Invent 2023AWS re:Invent (November 27 – December 1) – Join us to hear the latest from AWS, learn from experts, and connect with the global cloud community. Registration is now open.

You can browse all upcoming in-person and virtual events.

That’s all for this week. Check back next Monday for another Weekly Roundup!

— Antje

P.S. We’re focused on improving our content to provide a better customer experience, and we need your feedback to do so. Take this quick survey to share insights on your experience with the AWS Blog. Note that this survey is hosted by an external company, so the link doesn’t lead to our website. AWS handles your information as described in the AWS Privacy Notice.