All posts by Michael Greenshtein

Monitoring Apache Iceberg metadata layer using AWS Lambda, AWS Glue, and AWS CloudWatch

Post Syndicated from Michael Greenshtein original https://aws.amazon.com/blogs/big-data/monitoring-apache-iceberg-metadata-layer-using-aws-lambda-aws-glue-and-aws-cloudwatch/

In the era of big data, data lakes have emerged as a cornerstone for storing vast amounts of raw data in its native format. They support structured, semi-structured, and unstructured data, offering a flexible and scalable environment for data ingestion from multiple sources. Data lakes provide a unified repository for organizations to store and use large volumes of data. This enables more informed decision-making and innovative insights through various analytics and machine learning applications.

Despite their advantages, traditional data lake architectures often grapple with challenges such as understanding deviations from the most optimal state of the table over time, identifying issues in data pipelines, and monitoring a large number of tables. As data volumes grow, the complexity of maintaining operational excellence also increases. Monitoring and tracking issues in the data management lifecycle are essential for achieving operational excellence in data lakes.

This is where Apache Iceberg comes into play, offering a new approach to data lake management. Apache Iceberg is an open table format designed specifically to improve the performance, reliability, and scalability of data lakes. It addresses many of the shortcomings of traditional data lakes by providing features such as ACID transactions, schema evolution, row-level updates and deletes, and time travel.

In this blog post, we’ll discuss how the metadata layer of Apache Iceberg can be used to make data lakes more efficient. You will learn about an open-source solution that can collect important metrics from the Iceberg metadata layer. Based on collected metrics, we will provide recommendations on how to improve the efficiency of Iceberg tables. Additionally, you will learn how to use Amazon CloudWatch anomaly detection feature to detect ingestion issues.

Deep dive into Iceberg’s Metadata layer

Before diving into a solution, let’s understand how the Apache Iceberg metadata layer works. The Iceberg metadata layer provides an open specification instructing integrated big data engines such as Spark or Trino how to run read and write operations and how to resolve concurrency issues. It’s crucial for maintaining inter-operability between different engines. It stores detailed information about tables such as schema, partitioning, and file organization in versioned JSON and Avro files. This ensures that each change is tracked and reversible, enhancing data governance and auditability.

Apache Iceberg metadata layer architecture diagram

History and versioning: Iceberg’s versioning feature captures every change in table metadata as immutable snapshots, facilitating data integrity, historical views, and rollbacks.

File organization and snapshot management: Metadata closely manages data files, detailing file paths, formats, and partitions, supporting multiple file formats like Parquet, Avro, and ORC. This organization helps with efficient data retrieval through predicate pushdown, minimizing unnecessary data scans. Snapshot management allows concurrent data operations without interference, maintaining data consistency across transactions.

In addition to its core metadata management capabilities, Apache Iceberg also provides specialized metadata tables—snapshots, files, and partitions—that provide deeper insights and control over data management processes. These tables are dynamically generated and provide a live view of the metadata for query purposes, facilitating advanced data operations:

  • Snapshots table: This table lists all snapshots of a table, including snapshot IDs, timestamps, and operation types. It enables users to track changes over time and manage version history effectively.
  • Files table: The files table provides detailed information on each file in the table, including file paths, sizes, and partition values. It is essential for optimizing read and write performance.
  • Partitions table: This table shows how data is partitioned across different files and provides statistics for each partition, which is crucial for understanding and optimizing data distribution.

Metadata tables enhance Iceberg’s functionality by making metadata queries straightforward and efficient. Using these tables, data teams can gain precise control over data snapshots, file management, and partition strategies, further improving data system reliability and performance.

Before you get started

The next section describes a packaged open source solution using Apache Iceberg’s metadata layer and AWS services to enhance monitoring across your Iceberg tables.

Before we deep dive into the suggested solution, let’s mention Iceberg MetricsReporter, which is a native way to emit metrics for Apache Iceberg. It supports two types of reports: one for commits and one for scans. The default output is log based. It produces log files as a result of commit or scan operations. To submit metrics to CloudWatch or any other monitoring tool, users need to create and configure a custom MetricsReporter implementation. MetricsReporter is supported in Apache Iceberg v1.1.0 and later versions, and customers who want to use it must enable it through Spark configuration on their existing pipelines.

The following is deployed independently and doesn’t require any configuration changes to existing data pipelines. It can immediately start monitoring all the tables within the AWS account and AWS Region where it’s deployed. This solution introduces an additional latency of metrics arrival between 20 and 80 seconds compared to MetricsReporter but offers seamless integration without the need for custom configurations or changes to current workflows.

Solution overview

This solution is specifically designed for customers who run Apache Iceberg on Amazon Simple Storage Service (Amazon S3) and use AWS Glue as their data catalog.

Solution architecture diagram

Key features

This solution uses an AWS Lambda deployment package to collect metrics from Apache Iceberg tables. The metrics are then submitted to CloudWatch where you can create metrics visualizations to help recognize trends and anomalies over time.

The solution is designed to be lightweight, focusing on collecting metrics directly from the Iceberg metadata layer without scanning the actual data layer. This approach significantly reduces the compute capacity required, making it efficient and cost-effective. Key features of the solution include:

  • Time-series metrics collection: The solution monitors Iceberg tables continuously to identify trends and detect anomalies in data ingestion rates, partition skewness, and more.
  • Event-driven architecture: The solution uses Amazon EventBridge to launch a Lambda function when the state of an AWS Glue Data Catalog table changes. This ensures real-time metrics collection every time a transaction is committed to an Iceberg table.
  • Efficient data retrieval: Incorporates minimal compute resources by utilizing AWS Glue interactive sessions and the pyiceberg library to directly access Iceberg metadata tables such as snapshots, partitions, and files.

Metrics tracked

As of the blog release date, the solution collects over 25 metrics. These metrics are categorized into several groups:

  • Snapshot metrics: Include total and changes in data files, delete files, records added or removed, and size changes.
  • Partition and file metrics: Aggregated and per-partition metrics like average, maximum, minimum record counts and file sizes, which help in understanding data distribution and help optimizing storage.

To see the complete list of metrics, go to the GitHub repository.

Visualizing data with CloudWatch dashboards

The solution also provides a sample CloudWatch dashboard to visualize the collected metrics. Metrics visualization is important for real-time monitoring and detecting operational issues. The provided helper script simplifies the set up and deployment of the dashboard.

Amazon CloudWatch dashboard

You can go to the GitHub repository to learn more about how to deploy the solution in your AWS account.

What are the vital metrics for Apache Iceberg tables?

This section discusses specific metrics from Iceberg’s metadata and explains why they’re important for monitoring data quality and system performance. The metrics are broken down into three parts: insight, challenge, and action. This provides a clear path for practical application. In this section, we provide only a subset of the available metrics that the solution can collect, for a complete list, see the solution Github page.

1. snapshot.added_data_files, snapshot.added_records

  • Metric insight: The number of data files and number of records added to the table during the last transaction. The ingestion rate measures the speed at which new data is added to the data lake. This metric helps identify bottlenecks or inefficiencies in data pipelines, guiding capacity planning and scalability decisions.
  • Challenge: A sudden drop in the ingestion rate can indicate failures in data ingestion pipelines, source system outages, configuration errors or traffic spikes.
  • Action: Teams need to establish real-time monitoring and alert systems to detect drops in ingestion rates promptly, allowing quick investigations and resolutions.

2. files.avg_record_count, files.avg_file_size

  • Metric insight: These metrics provide insights into the distribution and storage efficiency of the table. Small file sizes might suggest excessive fragmentation.
  • Challenge: Excessively small file sizes can indicate inefficient data storage leading to increased read operations and higher I/O costs.
  • Action: Implementing regular data compaction processes helps consolidate small files, optimizing storage and enhancing content delivery speeds as demonstrated by a streaming service. Data Catalog offers automatic compaction of Apache Iceberg tables. To learn more about compacting Apache Iceberg tables, see Enable compaction in Working with tables on the AWS Glue console.

3. partitions.skew_record_count, partitions.skew_file_count

  • Metric insight: The metrics indicate the asymmetry of the data distribution across the available table partitions. A skewness value of zero, or very close to zero, suggests that the data is balanced. Positive or negative skewness values might indicate a problem.
  • Challenge: Imbalances in data distribution across partitions can lead to inefficiencies and slow query responses.
  • Action: Regularly analyze data distribution metrics to adjust partitioning configuration. Apache Iceberg allows you to transform partitions dynamically, which enables optimization of table partitioning as query patterns or data volumes change, without impacting your existing data.

4. snapshot.deleted_records, snapshot.total_delete_files, snapshot.added_position_deletes

  • Metric insight: Deletion metrics in Apache Iceberg provide important information on the volume and nature of data deletions within a table. These metrics help track how often data is removed or updated, which is essential for managing data lifecycle and compliance with data retention policies.
  • Challenge: High values in these metrics can indicate excessive deletions or updates, which might lead to fragmentation and decreased query performance.
  • Action: To address these challenges, run compaction periodically to ensure deleted rows do not persist in new files. Regularly review and adjust data retention policies and consider expiring old snapshots to keep only necessary amount of data files. You can run compaction operation on specific partitions using Amazon Athena Optimize

Effective monitoring is essential for making informed decisions about necessary maintenance actions for Apache Iceberg tables. Determining the right timing for these actions is crucial. Implementing timely preventative maintenance ensures high operational efficiency of the data lake and helps to address potential issues before they become significant problems.

Using Amazon CloudWatch for anomaly detection and alerts

This section assumes that you have completed the solution setup and collected operational metrics from your Apache Iceberg tables into Amazon CloudWatch.

Now you can start setting up some alerts and detect anomalies.

We guide you on setting up the anomaly detection and configuring alerts in CloudWatch to monitor the snapshot.added_records metric, which indicates the ingestion rate of data written into an Apache Iceberg table.

Set up anomaly detection

CloudWatch anomaly detection applies machine learning algorithms to continuously analyze system metrics, determine normal baselines, and identify items that are outside of the established patterns. Here is how you configure it:

Amazon CloudWatch anomaly detection screenshot

  1. Select Metrics: In the AWS Management Console for Cloudwatch, go to the Metrics  tab and search for and select snapshot.added_records.
  2. Create anomaly detection models: Choose the Graphed metrics tab and click the Pulse icon to enable anomaly detection.
  3. Set Sensitivity: The second parameter of the ANOMALY_DETECTION_BAND (m1, 5) is to adjust the sensitivity of the anomaly detection. The goal is to balance detecting real issues and reducing false positives.

Configure alerts

After the anomaly detection model is set up, set up an alert to notify operations teams about potential issues:

  1. Create alarm: Choose the bell icon under Actions on the same Graphed metrics tab.
  2. Alarm settings: Set the alarm to notify the operations team when the snapshot.added_records metric is outside the anomaly detection band for two consecutive periods. This helps reduce the risk of false alerts.
  3. Alarm actions: Configure CloudWatch to send an alarm email to the operations team. In addition to sending emails, CloudWatch alarm actions can automatically launch remediation processes, such as scaling operations or initiating data compaction.

Best practices

  • Regularly review and adjust models: As data patterns evolve, periodically review and adjust anomaly detection models and alarm settings to remain effective.
  • Comprehensive coverage: Ensure that all critical aspects of the data pipeline are monitored, not just a few metrics.
  • Documentation and communication: Maintain clear documentation of what each metric and alarm represent and ensure that your operations team understands the monitoring set up and response procedures. Set up the alerting mechanisms to send notifications through appropriate channels such as email, corporate messenger, or telephone to ensure your operations team stays informed and can quickly address the issues.
  • Create playbooks and automate remediation tasks: Establish detailed playbooks that describe step-by-step responses for common scenarios identified by alerts. Additionally, automate remediation tasks where possible to speed up response times and reduce the manual burden on teams. This ensures consistent and effective responses to all incidents.

CloudWatch anomaly detection and alerting features help organizations proactively manage their data lakes. This ensures data integrity, reduces downtime, and maintains high data quality. As a result, it enhances operational efficiency and supports robust data governance.

Conclusion

In this blog post, we explored Apache Iceberg’s transformative impact on data lake management. Apache Iceberg addresses the challenges of big data with features like ACID transactions, schema evolution, and snapshot isolation, enhancing data reliability, query performance, and scalability.

We delved into Iceberg’s metadata layer and related metadata tables such as snapshots, files, and partitions that allow easy access to crucial information about the current state of the table. These metadata tables facilitate the extraction of performance-related data, enabling teams to monitor and optimize the data lake’s efficiency.

Finally, we showed you a practical solution for monitoring Apache Iceberg tables using Lambda, AWS Glue, and CloudWatch. This solution uses Iceberg’s metadata layer and CloudWatch monitoring capabilities to provide a proactive operational framework. This framework detects trends and anomalies, ensuring robust data lake management.


About the Author

AvatarMichael Greenshtein is a Senior Analytics Specialist at Amazon Web Services. He is an experienced data professional with over 8 years in cloud computing and data management. Michael is passionate about open-source technology and Apache Iceberg.

Combine AWS Glue and Amazon MWAA to build advanced VPC selection and failover strategies

Post Syndicated from Michael Greenshtein original https://aws.amazon.com/blogs/big-data/combine-aws-glue-and-amazon-mwaa-to-build-advanced-vpc-selection-and-failover-strategies/

AWS Glue is a serverless data integration service that makes it straightforward to discover, prepare, move, and integrate data from multiple sources for analytics, machine learning (ML), and application development.

AWS Glue customers often have to meet strict security requirements, which sometimes involve locking down the network connectivity allowed to the job, or running inside a specific VPC to access another service. To run inside the VPC, the jobs needs to be assigned to a single subnet, but the most suitable subnet can change over time (for instance, based on the usage and availability), so you may prefer to make that decision at runtime, based on your own strategy.

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is an AWS service to run managed Airflow workflows, which allow writing custom logic to coordinate how tasks such as AWS Glue jobs run.

In this post, we show how to run an AWS Glue job as part of an Airflow workflow, with dynamic configurable selection of the VPC subnet assigned to the job at runtime.

Solution overview

To run inside a VPC, an AWS Glue job needs to be assigned at least a connection that includes network configuration. Any connection allows specifying a VPC, subnet, and security group, but for simplicity, this post uses connections of type: NETWORK, which just defines the network configuration and doesn’t involve external systems.

If the job has a fixed subnet assigned by a single connection, in case of a service outage on the Availability Zones or if the subnet isn’t available for other reasons, the job can’t run. Furthermore, each node (driver or worker) in an AWS Glue job requires an IP address assigned from the subnet. When running many large jobs concurrently, this could lead to an IP address shortage and the job running with fewer nodes than intended or not running at all.

AWS Glue extract, transform, and load (ETL) jobs allow multiple connections to be specified with multiple network configurations. However, the job will always try to use the connections’ network configuration in the order listed and pick the first one that passes the health checks and has at least two IP addresses to get the job started, which might not be the optimal option.

With this solution, you can enhance and customize that behavior by reordering the connections dynamically and defining the selection priority. If a retry is needed, the connections are reprioritized again based on the strategy, because the conditions might have changed since the last run.

As a result, it helps prevent the job from failing to run or running under capacity due to subnet IP address shortage or even an outage, while meeting the network security and connectivity requirements.

The following diagram illustrates the solution architecture.

Prerequisites

To follow the steps of the post, you need a user that can log in to the AWS Management Console and has permission to access Amazon MWAA, Amazon Virtual Private Cloud (Amazon VPC), and AWS Glue. The AWS Region where you choose to deploy the solution needs the capacity to create a VPC and two elastic IP addresses. The default Regional quota for both types of resources is five, so you might need to request an increase via the console.

You also need an AWS Identity and Access Management (IAM) role suitable to run AWS Glue jobs if you don’t have one already. For instructions, refer to Create an IAM role for AWS Glue.

Deploy an Airflow environment and VPC

First, you’ll deploy a new Airflow environment, including the creation of a new VPC with two public subnets and two private ones. This is because Amazon MWAA requires Availability Zone failure tolerance, so it needs to run on two subnets on two different Availability Zones in the Region. The public subnets are used so the NAT Gateway can provide internet access for the private subnets.

Complete the following steps:

  1. Create an AWS CloudFormation template in your computer by copying the template from the following quick start guide into a local text file.
  2. On the AWS CloudFormation console, choose Stacks in the navigation pane.
  3. Choose Create stack with the option With new resources (standard).
  4. Choose Upload a template file and choose the local template file.
  5. Choose Next.
  6. Complete the setup steps, entering a name for the environment, and leave the rest of the parameters as default.
  7. On the last step, acknowledge that resources will be created and choose Submit.

The creation can take 20–30 minutes, until the status of the stack changes to CREATE_COMPLETE.

The resource that will take most of time is the Airflow environment. While it’s being created, you can continue with the following steps, until you are required to open the Airflow UI.

  1. On the stack’s Resources tab, note the IDs for the VPC and two private subnets (PrivateSubnet1 and PrivateSubnet2), to use in the next step.

Create AWS Glue connections

The CloudFormation template deploys two private subnets. In this step, you create an AWS Glue connection to each one so AWS Glue jobs can run in them. Amazon MWAA recently added the capacity to run the Airflow cluster on shared VPCs, which reduces cost and simplifies network management. For more information, refer to Introducing shared VPC support on Amazon MWAA.

Complete the following steps to create the connections:

  1. On the AWS Glue console, choose Data connections in the navigation pane.
  2. Choose Create connection.
  3. Choose Network as the data source.
  4. Choose the VPC and private subnet (PrivateSubnet1) created by the CloudFormation stack.
  5. Use the default security group.
  6. Choose Next.
  7. For the connection name, enter MWAA-Glue-Blog-Subnet1.
  8. Review the details and complete the creation.
  9. Repeat these steps using PrivateSubnet2 and name the connection MWAA-Glue-Blog-Subnet2.

Create the AWS Glue job

Now you create the AWS Glue job that will be triggered later by the Airflow workflow. The job uses the connections created in the previous section, but instead of assigning them directly on the job, as you would normally do, in this scenario you leave the job connections list empty and let the workflow decide which one to use at runtime.

The job script in this case is not significant and is just intended to demonstrate the job ran in one of the subnets, depending on the connection.

  1. On the AWS Glue console, choose ETL jobs in the navigation pane, then choose Script editor.
  2. Leave the default options (Spark engine and Start fresh) and choose Create script.
  3. Replace the placeholder script with the following Python code:
    import ipaddress
    import socket
    
    subnets = {
        "PrivateSubnet1": "10.192.20.0/24",
        "PrivateSubnet2": "10.192.21.0/24"
    }
    
    ip = socket.gethostbyname(socket.gethostname())
    subnet_name = "unknown"
    for subnet, cidr in subnets.items():
        if ipaddress.ip_address(ip) in ipaddress.ip_network(cidr):
            subnet_name = subnet
    
    print(f"The driver node has been assigned the ip: {ip}"
          + f" which belongs to the subnet: {subnet_name}")
    

  4. Rename the job to AirflowBlogJob.
  5. On the Job details tab, for IAM Role, choose any role and enter 2 for the number of workers (just for frugality).
  6. Save these changes so the job is created.

Grant AWS Glue permissions to the Airflow environment role

The role created for Airflow by the CloudFormation template provides the basic permissions to run workflows but not to interact with other services such as AWS Glue. In a production project, you would define your own templates with these additional permissions, but in this post, for simplicity, you add the additional permissions as an inline policy. Complete the following steps:

  1. On the IAM console, choose Roles in the navigation pane.
  2. Locate the role created by the template; it will start with the name you assigned to the CloudFormation stack and then -MwaaExecutionRole-.
  3. On the role details page, on the Add permissions menu, choose Create inline policy.
  4. Switch from Visual to JSON mode and enter the following JSON on the textbox. It assumes that the AWS Glue role you have follows the convention of starting with AWSGlueServiceRole. For enhanced security, you can replace the wildcard resource on the ec2:DescribeSubnets permission with the ARNs of the two private subnets from the CloudFormation stack.
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "glue:GetConnection"
                ],
                "Resource": [
                    "arn:aws:glue:*:*:connection/MWAA-Glue-Blog-Subnet*",
                    "arn:aws:glue:*:*:catalog"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "glue:UpdateJob",
                    "glue:GetJob",
                    "glue:StartJobRun",
                    "glue:GetJobRun"
                ],
                "Resource": [
                    "arn:aws:glue:*:*:job/AirflowBlogJob",
                    "arn:aws:glue:*:*:job/BlogAirflow"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "ec2:DescribeSubnets"
                ],
                "Resource": "*"
            },
            {
                "Effect": "Allow",
                "Action": [
                    "iam:GetRole",
                    "iam:PassRole"
                ],
                "Resource": "arn:aws:iam::*:role/service-role/AWSGlueServiceRole*"
            }
        ]
    }
    

  5. Choose Next.
  6. Enter GlueRelatedPermissions as the policy name and complete the creation.

In this example, we use an ETL script job; for a visual job, because it generates the script automatically on save, the Airflow role would need permission to write to the configured script path on Amazon Simple Storage Service (Amazon S3).

Create the Airflow DAG

An Airflow workflow is based on a Directed Acyclic Graph (DAG), which is defined by a Python file that programmatically specifies the different tasks involved and its interdependencies. Complete the following scripts to create the DAG:

  1. Create a local file named glue_job_dag.py using a text editor.

In each of the following steps, we provide a code snippet to enter into the file and an explanation of what is does.

  1. The following snippet adds the required Python modules imports. The modules are already installed on Airflow; if that weren’t the case, you would need to use a requirements.txt file to indicate to Airflow which modules to install. It also defines the Boto3 clients that the code will use later. By default, they will use the same role and Region as Airflow, that’s why you set up before the role with the additional permissions required.
    import boto3
    from pendulum import datetime, duration
    from random import shuffle
    from airflow import DAG
    from airflow.decorators import dag, task
    from airflow.models import Variable
    from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
    
    glue_client = boto3.client('glue')
    ec2 = boto3.client('ec2')
    

  2. The following snippet adds three functions to implement the connection order strategy, which defines how to reorder the connections given to establish their priority. This is just an example; you can build your custom code to implement your own logic, as per your needs. The code first checks the IPs available on each connection subnet and separates the ones that have enough IPs available to run the job at full capacity and those that could be used because they have at least two IPs available, which is the minimum a job needs to start. If the strategy is set to random, it will randomize the order within each of the connection groups previously described and add any other connections. If the strategy is capacity, it will order them from most IPs free to fewest.
    def get_available_ips_from_connection(glue_connection_name):
        conn_response = glue_client.get_connection(Name=glue_connection_name)
        connection_properties = conn_response['Connection']['PhysicalConnectionRequirements']
        subnet_id = connection_properties['SubnetId']
        subnet_response = ec2.describe_subnets(SubnetIds=[subnet_id])
        return subnet_response['Subnets'][0]['AvailableIpAddressCount']
    
    def get_connections_free_ips(glue_connection_names, num_workers):
        good_connections = []
        usable_connections = []    
        for connection_name in glue_connection_names:
            try:
                available_ips = get_available_ips_from_connection(connection_name)
                # Priority to connections that can hold the full cluster and we haven't just tried
                if available_ips >= num_workers:
                    good_connections.append((connection_name, available_ips))
                elif available_ips >= 2: # The bare minimum to start a Glue job
                    usable_connections.append((connection_name, available_ips))                
            except Exception as e:
                print(f"[WARNING] Failed to check the free ips for:{connection_name}, will skip. Exception: {e}")  
        return good_connections, usable_connections
    
    def prioritize_connections(connection_list, num_workers, strategy):
        (good_connections, usable_connections) = get_connections_free_ips(connection_list, num_workers)
        print(f"Good connections: {good_connections}")
        print(f"Usable connections: {usable_connections}")
        all_conn = []
        if strategy=="random":
            shuffle(good_connections)
            shuffle(usable_connections)
            # Good connections have priority
            all_conn = good_connections + usable_connections
        elif strategy=="capacity":
            # We can sort both at the same time
            all_conn = good_connections + usable_connections
            all_conn.sort(key=lambda x: -x[1])
        else: 
            raise ValueError(f"Unknown strategy specified: {strategy}")    
        result = [c[0] for c in all_conn] # Just need the name
        # Keep at the end any other connections that could not be checked for ips
        result += [c for c in connection_list if c not in result]
        return result
    

  3. The following code creates the DAG itself with the run job task, which updates the job with the connection order defined by the strategy, runs it, and waits for the results. The job name, connections, and strategy come from Airflow variables, so it can be easily configured and updated. It has two retries with exponential backoff configured, so if the tasks fails, it will repeat the full task including the connection selection. Maybe now the best choice is another connection, or the subnet previously picked randomly is in an Availability Zone that is currently suffering an outage, and by picking a different one, it can recover.
    with DAG(
        dag_id='glue_job_dag',
        schedule_interval=None, # Run on demand only
        start_date=datetime(2000, 1, 1), # A start date is required
        max_active_runs=1,
        catchup=False
    ) as glue_dag:
        
        @task(
            task_id="glue_task", 
            retries=2,
            retry_delay=duration(seconds = 30),
            retry_exponential_backoff=True
        )
        def run_job_task(**ctx):    
            glue_connections = Variable.get("glue_job_dag.glue_connections").strip().split(',')
            glue_jobname = Variable.get("glue_job_dag.glue_job_name").strip()
            strategy= Variable.get('glue_job_dag.strategy', 'random') # random or capacity
            print(f"Connections available: {glue_connections}")
            print(f"Glue job name: {glue_jobname}")
            print(f"Strategy to use: {strategy}")
            job_props = glue_client.get_job(JobName=glue_jobname)['Job']            
            num_workers = job_props['NumberOfWorkers']
            
            glue_connections = prioritize_connections(glue_connections, num_workers, strategy)
            print(f"Running Glue job with the connection order: {glue_connections}")
            existing_connections = job_props.get('Connections',{}).get('Connections', [])
            # Preserve other connections that we don't manage
            other_connections = [con for con in existing_connections if con not in glue_connections]
            job_props['Connections'] = {"Connections": glue_connections + other_connections}
            # Clean up properties so we can reuse the dict for the update request
            for prop_name in ['Name', 'CreatedOn', 'LastModifiedOn', 'AllocatedCapacity', 'MaxCapacity']:
                del job_props[prop_name]
    
            GlueJobOperator(
                task_id='submit_job',
                job_name=glue_jobname,
                iam_role_name=job_props['Role'].split('/')[-1],
                update_config=True,
                create_job_kwargs=job_props,
                wait_for_completion=True
            ).execute(ctx)   
            
        run_job_task()
    

Create the Airflow workflow

Now you create a workflow that invokes the AWS Glue job you just created:

  1. On the Amazon S3 console, locate the bucket created by the CloudFormation template, which will have a name starting with the name of the stack and then -environmentbucket- (for example, myairflowstack-environmentbucket-ap1qks3nvvr4).
  2. Inside that bucket, create a folder called dags, and inside that folder, upload the DAG file glue_job_dag.py that you created in the previous section.
  3. On the Amazon MWAA console, navigate to the environment you deployed with the CloudFormation stack.

If the status is not yet Available, wait until it reaches that state. It shouldn’t take longer than 30 minutes since you deployed the CloudFormation stack.

  1. Choose the environment link on the table to see the environment details.

It’s configured to pick up DAGs from the bucket and folder you used in the previous steps. Airflow will monitor that folder for changes.

  1. Choose Open Airflow UI to open a new tab accessing the Airflow UI, using the integrated IAM security to log you in.

If there’s any issue with the DAG file you created, it will display an error on top of the page indicating the lines affected. In that case, review the steps and upload again. After a few seconds, it will parse it and update or remove the error banner.

  1. On the Admin menu, choose Variables.
  2. Add three variables with the following keys and values:
    1. Key glue_job_dag.glue_connections with value MWAA-Glue-Blog-Subnet1,MWAA-Glue-Blog-Subnet2.
    2. Key glue_job_dag.glue_job_name with value AirflowBlogJob.
    3. Key glue_job_dag.strategy with value capacity.

Run the job with a dynamic subnet assignment

Now you’re ready to run the workflow and see the strategy dynamically reordering the connections.

  1. On the Airflow UI, choose DAGs, and on the row glue_job_dag, choose the play icon.
  2. On the Browse menu, choose Task instances.
  3. On the instances table, scroll right to display the Log Url and choose the icon on it to open the log.

The log will update as the task runs; you can locate the line starting with “Running Glue job with the connection order:” and the previous lines showing details of the connection IPs and the category assigned. If an error occurs, you’ll see the details in this log.

  1. On the AWS Glue console, choose ETL jobs in the navigation pane, then choose the job AirflowBlogJob.
  2. On the Runs tab, choose the run instance, then the Output logs link, which will open a new tab.
  3. On the new tab, use the log stream link to open it.

It will display the IP that the driver was assigned and which subnet it belongs to, which should match the connection indicated by Airflow (if the log is not displayed, choose Resume so it gets updated as soon as it’s available).

  1. On the Airflow UI, edit the Airflow variable glue_job_dag.strategy to set it to random.
  2. Run the DAG multiple times and see how the ordering changes.

Clean up

If you no longer need the deployment, delete the resources to avoid any further charges:

  1. Delete the Python script you uploaded, so the S3 bucket can be automatically deleted in the next step.
  2. Delete the CloudFormation stack.
  3. Delete the AWS Glue job.
  4. Delete the script that the job saved in Amazon S3.
  5. Delete the connections you created as part of this post.

Conclusion

In this post, we showed how AWS Glue and Amazon MWAA can work together to build more advanced custom workflows, while minimizing the operational and management overhead. This solution gives you more control about how your AWS Glue job runs to meet special operational, network, or security requirements.

You can deploy your own Amazon MWAA environment in multiple ways, such as with the template used in this post, on the Amazon MWAA console, or using the AWS CLI. You can also implement your own strategies to orchestrate AWS Glue jobs, based on your network architecture and requirements (for instance, to run the job closer to the data when possible).


About the authors

Michael Greenshtein is an Analytics Specialist Solutions Architect for the Public Sector.

Gonzalo Herreros is a Senior Big Data Architect on the AWS Glue team.