All posts by Nitin Kumar

Unlock real-time data insights with schema evolution using Amazon MSK Serverless, Iceberg, and AWS Glue streaming

Post Syndicated from Nitin Kumar original https://aws.amazon.com/blogs/big-data/unlock-real-time-data-insights-with-schema-evolution-using-amazon-msk-serverless-iceberg-and-aws-glue-streaming/

Efficient real-time synchronization of data within data lakes present challenges. Any data inaccuracies or latency issues can significantly compromise analytical insights and subsequent business strategies. Organizations increasingly require synchronized data in near real-time to extract actionable intelligence and respond promptly to evolving market dynamics. Additionally, scalability remains a concern for data lake implementations, which must accommodate expanding volumes of streaming data and maintain optimal performance without incurring high operational costs.

Schema evolution is the process of modifying the structure (schema) of a data table to accommodate changes in the data over time, such as adding or removing columns, without disrupting ongoing operations or requiring a complete data rewrite. Schema evolution is vital in streaming data environments for several reasons. Unlike batch processing, streaming pipelines operate continuously, ingesting data in real time from sources that are actively serving production applications. Source systems naturally evolve over time as businesses add new features, refine data models, or respond to changing requirements. Without proper schema evolution capabilities, even minor changes to source schemas can force streaming pipeline shutdowns, requiring developers to manually reconcile schema differences and rebuild tables.

Such disruptions reduce the core value proposition of streaming architectures—continuous, low-latency data processing. Organizations can maintain uninterrupted data flows and keep source systems evolving independently by using the seamless schema evolution provided by Apache Iceberg. This reduces operational friction and maintains the availability of real-time analytics and applications even as underlying data structures change.

Apache Iceberg is an open table format, delivering essential capabilities for streaming workloads, including robust schema evolution support. This critical feature enables table schemas to adapt dynamically as source database structures evolve, maintaining operational continuity. Consequently, when database columns undergo additions, removals, or modifications, the data lake accommodates these changes seamlessly without requiring manual intervention or risking data inconsistencies.

Our comprehensive solution showcases an end-to-end real-time CDC pipeline that enables immediate processing of data modifications from Amazon Relational Database Service (Amazon RDS) for MySQL, streaming altered records directly to AWS Glue streaming jobs using Amazon Managed Streaming for Apache Kafka (Amazon MSK) Serverless. These jobs continually process incoming changes and update Iceberg tables on Amazon Simple Storage Service (Amazon S3) so that the data lake reflects the current state of the operational database environment in real time. By using Apache Iceberg’s comprehensive schema evolution support, our ETL pipeline automatically adapts to database schema modifications, providing data lake consistency and currentness without manual intervention. This approach combines complete process control with instantaneous analytics on operational data, eliminating traditional latency, and future-proofs the solution to address evolving organizational data needs. The architecture’s inherent flexibility facilitates adaptation to diverse use cases requiring immediate data insights.

Solution overview

To effectively address streaming challenges, we propose an architecture using Amazon MSK Serverless, a comprehensive managed Apache Kafka service that autonomously provisions and scales computational and storage resources. This solution offers a frictionless mechanism for ingesting and processing streaming data without the complexity of capacity management. Our implementation uses Amazon MSK Connect with the Debezium MySQL connector to capture and stream database modifications in real time. Rather than employing traditional batch processing methodologies, we implement an AWS Glue streaming job that directly consumes data from Kafka topics, processes CDC events as they occur, and writes transformed data to Apache Iceberg tables on Amazon S3.

The workflow consists of the following:

  1. Data flows from Amazon RDS through Amazon MSK Connect using the Debezium MySQL connector to Amazon MSK Serverless. This represents a CDC pipeline that captures database changes from the relational database and streams them to Kafka.
  2. From Amazon MSK Serverless, the data then moves to AWS Glue job, which processes the data and stores it in Amazon S3 as Iceberg tables. The AWS Glue job interacts with the AWS Glue Data Catalog to maintain metadata about the datasets.
  3. Analyze the data using the serverless interactive query service Amazon Athena, which can be used to query the iceberg table created in Data Catalog. This allows for interactive data analysis without managing infrastructure.

The following diagram illustrates the architecture that we implement through this post. Each number corresponds to the preceding list and shows major components that you implement.

Prerequisites

Before getting started, make sure you have the following:

  • An active AWS account with billing enabled
  • An AWS Identity and Access Management (IAM) user with specific permissions to create and manage resources, such as a virtual private cloud (VPC), subnet, security group, IAM roles, NAT gateway, internet gateway, Amazon Elastic Compute Cloud (Amazon EC2) client, MSK Serverless, MSK Connector and its plugin AWS Glue job, and S3 buckets.
  • Sufficient VPC capacity in your chosen AWS Region.

For this post, we create the solution resources in the US East (N. Virginia) – us-east-1 Region using AWS CloudFormation templates. In the following sections, we show you how to configure your resources and implement the solution.

Configuring CDC and processing using AWS CloudFormation

In this post, you use the CloudFormation template vpc-msk-mskconnect-rds-client-gluejob.yaml. This template sets up the streaming CDC pipeline resources such as a VPC, subnet, security group, IAM roles, NAT, internet gateway, EC2 client, MSK Serverless, MSK Connect, Amazon RDS, S3 buckets, and AWS Glue job.

To create the solution resources for the CDC pipeline, complete the following steps:

  1. Launch the stack vpc-msk-mskconnect-rds-client-gluejob.yaml using the CloudFormation template:
  2. Provide the parameter values as listed in the following table.

    A B C
    1 Parameters Description Sample value
    2 EnvironmentName An environment name that is prefixed to resource names. msk-iceberg-cdc-pipeline
    3 DatabasePassword Database admin account password. ****
    4 InstanceType MSK client EC2 instance type. t2.micro
    5 LatestAmiId Latest AMI ID of Amazon Linux 3 for ec2 instance. You can use the default value. /aws/service/ami-amazon-linux-latest/al2023-ami-kernel-default-x86_64
    6 VpcCIDR IP range (CIDR notation) for this VPC. 10.192.0.0/16
    7 PublicSubnet1CIDR IP range (CIDR notation) for the public subnet in the first Availability Zone. 10.192.10.0/24
    8 PublicSubnet2CIDR IP range (CIDR notation) for the public subnet in the second Availability Zone. 10.192.11.0/24
    9 PrivateSubnet1CIDR IP range (CIDR notation) for the private subnet in the first Availability Zone. 10.192.20.0/24
    10 PrivateSubnet2CIDR IP range (CIDR notation) for the private subnet in the second Availability Zone. 10.192.21.0/24
    11 NumberOfWorkers Number of workers for AWS Glue streaming job. 3
    12 GlueWorkerType Worker type for AWS Glue streaming job. For example, G.1X. G.1X
    13 GlueDatabaseName Name of the AWS Glue Data Catalog database. glue_cdc_blogdb
    14 GlueTableName Name of the AWS Glue Data Catalog table. iceberg_cdc_tbl

The stack creation process can take approximately 25 minutes to complete. You can check the Outputs tab for the stack after the stack is created, as shown in the following screenshot.

Following the successful deployment of the CloudFormation stack, you now have a fully operational Amazon RDS database environment. The database instance contains the salesdb database with the customer table populated with 30 data records.

These records have been streamed to the Kafka topic through the Debezium MySQL connector implementation, establishing a reliable CDC pipeline. With this foundation in place, proceed to the next phase of the data architecture: near real-time data processing using the AWS Glue streaming job.

Run the AWS Glue streaming job

To transfer the data load from the Kafka topic (created by the Debezium MySQL connector for database table customer) to the Iceberg table, run the AWS Glue streaming job configured by the CloudFormation setup. This process will migrate all existing customer data from the source database table to the Iceberg table. Complete the following steps:

  1. On the CloudFormation console, choose the stack vpc-msk-mskconnect-rds-client-gluejob.yaml
  2. On the Outputs tab, retrieve the name of the AWS Glue streaming job from the GlueJobName row. In the following screenshot, the name is IcebergCDC-msk-iceberg-cdc-pipeline.
  3. On the AWS Glue console, choose ETL jobs in the navigation pane.
  4. Search for the AWS Glue job named IcebergCDC-msk-iceberg-cdc-pipeline.
  5. Choose the job name to open its details page.
  6. Choose Run to start the job. On the Runs tab, confirm if the job ran without failure.

You need to wait approximately 2 minutes for the job to process before continuing. This pause allows the jobrun to fully process records from the Kafka topic (initial load) and create the Iceberg table.

Query the Iceberg table using Athena

After the AWS Glue streaming job has successfully started and the Iceberg table has been created in the Data Catalog, follow these steps to validate the data using Athena:

  1. On the Athena console, navigate to the query editor.
  2. Choose the Data Catalog as the data source.
  3. Choose the database glue_cdc_blogdb.
  4. To validate the data, enter the following query to preview the data and find the total count:
    SELECT id, name, mktsegment FROM "glue_cdc_blogdb"."iceberg_cdc_tbl" order by id desc limit 40;
    
    SELECT count(*) as total_rows FROM "glue_cdc_blogdb"."iceberg_cdc_tbl";

    The following screenshot shows the output of the example query.

After performing the preceding steps, you’ve established a complete near real-time data processing pipeline by running an AWS Glue streaming job that transfers data from Kafka topics to an Apache Iceberg table, then verified the successful data migration by querying the results through Amazon Athena.

Upload incremental (CDC) data for further processing

Now that you’ve successfully completed the initial full data load, it’s time to focus on the dynamic aspects of the data pipeline. In this section, we explore how the system handles ongoing data modifications such as insertions, updates, and deletions in Amazon RDS for MySQL database. These changes won’t go unnoticed. Our Debezium MySQL connector stands ready to capture each modification event, transforming database changes into a continuous stream of data. Working in tandem with our AWS Glue streaming job, this architecture is designed to promptly process and propagate every change in our source database through our data pipeline.Let’s see this real-time data synchronization mechanism in action, demonstrating how our modern data infrastructure maintains consistency across systems with minimal latency. Follow these steps:

  1. On the Amazon EC2 console, access the EC2 instance that you created using the CloudFormation template named as KafkaClientInstance.
  2. Log in to the EC2 instance using AWS Systems Manager Agent (SSM Agent). Select the instance named as KafkaClientInstance and then choose Connect.
  3. Enter the following commands to insert the data into the RDS table. Use the same database password you entered when you created the CloudFormation stack.
    sudo su - ec2-user
    RDS_AURORA_ENDPOINT=`aws rds describe-db-instances --region us-east-1 | jq -r '.DBInstances[] | select(.DBName == "salesdb") | .Endpoint.Address'`
    mysql -f -u master -h $RDS_AURORA_ENDPOINT  --password

  4. Now perform the insert, update, and delete in the CUSTOMER table.
    use salesdb;
    
    INSERT INTO customer VALUES(31, 'Customer Name 31', 'Market segment 31');
    INSERT INTO customer VALUES(32, 'Customer Name 32', 'Market segment 32');
    
    UPDATE customer SET name='Customer Name update 29', mktsegment='Market segment update 29' WHERE id = 29;
    UPDATE customer SET name='Customer Name update 30', mktsegment='Market segment update 30' WHERE id = 30;
    
    DELETE FROM customer WHERE id = 27;
    DELETE FROM customer WHERE id = 28;
    

  5. Validate the data to verify the insert, update, and delete records in the Iceberg table from Athena, as shown in the following screenshot.

After performing the preceding steps, you’ve learned how our CDC pipeline handles ongoing data modifications by performing insertions, updates, and deletions in the MySQL database and verifying how these changes are automatically captured by Debezium MySQL connector, streamed through Kafka, and reflected in the Iceberg table in near real time.

Schema evolution: Adding new columns to the Iceberg table

The schema evolution mechanism in this implementation provides an automated approach to detecting and adding new columns from incoming data to existing Iceberg tables. Although Iceberg inherently supports robust schema evolution capabilities (including adding, dropping, and renaming columns, updating types, and reordering), this code specifically automates the column addition process for streaming environments. This automation uses Iceberg’s underlying schema evolution capabilities, which guarantee correctness through unique column IDs that ensure new columns never read existing values from another column. By handling column additions programmatically, the system reduces operational overhead in streaming pipelines where manual schema management would create bottlenecks. However, dropping and renaming columns, updating types, and reordering still required manual intervention.

When new data arrives through Kafka streams, the handle_schema_evolution() function orchestrates a four-step process to ensure seamless table schema updates.

  1. It analyzes the incoming batch DataFrame to infer its schema structure, cataloging all column names and their corresponding data types.
  2. It retrieves the existing Iceberg table’s schema from the AWS Glue catalog to establish a baseline for comparison.
  3. The system then performs a schema comparison using method compare_schemas() between batch schema with existing table schema.
    1. If the incoming frame contains fewer columns than the catalog table, no action is taken.
    2. It identifies any new columns present in the incoming data that don’t exist in the current table structure and returns a list of new columns that need to be added.
    3. New columns will be added at the last.
    4. Handle type evolution isn’t supported. If needed, you can handle the same at comment # Handle type evolution in the compare_schemas() method.
    5. If the destination table has columns that are dropped in the source table, it doesn’t drop those columns. If that is required for your use case, you can use drop column manually using ALTER TABLE ... DROP COLUMN.
    6. Renaming the column isn’t supported. To rename the column use case, manually evolve the schema using ALTER TABLE … RENAME COLUMN.
  4. Finally, if new columns are discovered, the function executes ALTER TABLE … ADD COLUMN statements to evolve the Iceberg table schema, adding the new columns with their appropriate data types.

This approach eliminates the need for manual schema management and prevents data pipeline failures that would typically occur when encountering unexpected fields in streaming data. The implementation also includes proper error handling and logging to track schema evolution events, making it particularly valuable for environments where data structures frequently change.

def infer_schema_from_batch(batch_df):
    """
    Infer schema from the batch DataFrame
    Returns a dictionary with column names and their inferred types
    """
    schema_dict = {}
    for field in batch_df.schema.fields:
        schema_dict[field.name] = field.dataType
    return schema_dict

def get_existing_table_schema(spark, table_identifier):
    """
    Read the existing table schema from the Iceberg table
    Returns a dictionary with column names and their types
    """
    try:
        existing_df = spark.table(table_identifier)
        schema_dict = {}
        for field in existing_df.schema.fields:
            schema_dict[field.name] = field.dataType
        return schema_dict
    except Exception as e:
        print(f"Error reading existing table schema: {e}")
        return {}

def compare_schemas(batch_schema, existing_schema):
    """
    Compare batch schema with existing table schema
    Returns a list of new columns that need to be added
    """
    new_columns = []
    
    for col_name, col_type in batch_schema.items():
        if col_name not in existing_schema:
            new_columns.append((col_name, col_type))
        elif existing_schema[col_name] != col_type:
            # Handle type evolution if needed
            print(f"Warning: Column {col_name} type mismatch - existing: {existing_schema[col_name]}, new: {col_type}")
    
    return new_columns

def spark_type_to_sql_string(spark_type):
    """
    Convert Spark DataType to SQL string representation for ALTER TABLE
    """
    type_mapping = {
        'IntegerType': 'INT',
        'LongType': 'BIGINT',
        'StringType': 'STRING',
        'BooleanType': 'BOOLEAN',
        'DoubleType': 'DOUBLE',
        'FloatType': 'FLOAT',
        'TimestampType': 'TIMESTAMP',
        'DateType': 'DATE'
    }
    
    type_name = type(spark_type).__name__
    return type_mapping.get(type_name, 'STRING')

def evolve_table_schema(spark, table_identifier, new_columns):
    """
    Alter the Iceberg table to add new columns
    """
    if not new_columns:
        return
    
    try:
        for col_name, col_type in new_columns:
            sql_type = spark_type_to_sql_string(col_type)
            alter_sql = f"ALTER TABLE {table_identifier} ADD COLUMN {col_name} {sql_type}"
            print(f"Executing schema evolution: {alter_sql}")
            spark.sql(alter_sql)
            print(f"Successfully added column {col_name} with type {sql_type}")
    except Exception as e:
        print(f"Error during schema evolution: {e}")
        raise e

def handle_schema_evolution(spark, batch_df, table_identifier):
    """
    schema evolution steps
    1. Infer schema from batch DataFrame
    2. Read existing table schema
    3. Compare schemas and identify new columns
    4. Alter table if schema evolved
    """
    # Step 1: Infer schema from batch DataFrame
    batch_schema = infer_schema_from_batch(batch_df)
    print(f"Batch schema: {batch_schema}")
    
    # Step 2: Read existing table schema
    existing_schema = get_existing_table_schema(spark, table_identifier)
    print(f"Existing table schema: {existing_schema}")
    
    # Step 3: Compare schemas
    new_columns = compare_schemas(batch_schema, existing_schema)
    
    # Step 4: Evolve schema if needed
    if new_columns:
        print(f"Schema evolution detected. New columns: {new_columns}")
        evolve_table_schema(spark, table_identifier, new_columns)
        return True
    else:
        print("No schema evolution needed")
        return False

In this section, we demonstrate how our system handles structural changes to the underlying data model by adding a new status column to the customer table and populating it with default values. Our architecture is designed to seamlessly propagate these schema modifications throughout the pipeline so that downstream analytics and processing capabilities remain uninterrupted while accommodating the enhanced data model. This flexibility is essential for maintaining a responsive, business-aligned data infrastructure that can evolve alongside changing organizational needs.

  1. Add a new status column to the customer table and populate it with default values as Green.
    use salesdb;
    
    ALTER TABLE customer ADD COLUMN status VARCHAR(20) NOT NULL;
    
    UPDATE customer SET status = 'Green';
    

  2. Use the Athena console to validate the data and schema evolution, as shown in the following screenshot.

When schema evolution occurs in an Iceberg table, the metadata.json file undergoes specific updates to track and manage these changes. In job when schema evolution detected, it ran the following query to evolve the schema for the Iceberg table.

ALTER TABLE glue_catalog.glue_cdc_blogdb.iceberg_cdc_tbl ADD COLUMN status string

We checked the metadata.json file in Amazon S3 for iceberg table location, and the following screenshot shows how the schema evolved.

We now explain how our implementation handles schema evolution by automatically detecting and adding new columns from incoming data streams to existing Iceberg tables. The system employs a four-step process that analyzes incoming data schemas, compares them with existing table structures, identifies new columns, and executes the necessary ALTER TABLE statements to evolve the schema without manual intervention, though certain schema changes still require manual handling.

Clean up

To clean up your resources, complete the following steps:

  1. Stop the running AWS Glue streaming job:
    1. On the AWS Glue console, choose ETL jobs in the navigation pane.
    2. Search for the AWS Glue job named IcebergCDC-msk-iceberg-cdc-pipeline.
    3. Choose the job name to open its details page.
    4. On the Runs tab, select running jobrun and choose Stop job run. Confirm that the job stopped successfully.
  2. Remove the AWS Glue database and table:
    1. On the AWS Glue console, choose Tables in the navigation pane, select iceberg_cdc_tbl, and choose Delete.
    2. Choose Databases in the navigation pane, select glue_cdc_blogdb, and choose Delete.
  3. Delete the CloudFormation stack vpc-msk-mskconnect-rds-client-gluejob.yaml.

Conclusion

This post showcases a solution that businesses can use to access real-time data insights without the traditional delays between data creation and analysis. By combining Amazon MSK Serverless, Debezium MySQL connector, AWS Glue streaming, and Apache Iceberg tables, the architecture captures database changes instantly and makes them immediately available for analytics through Amazon Athena. A standout feature is the system’s ability to automatically adapt when database structures change—such as adding new columns—without disrupting operations or requiring manual intervention. This eliminates the technical complexity typically associated with real-time data pipelines and provides business users with the most current information for decision-making, effectively bridging the gap between operational databases and analytical systems in a cost-effective, scalable way.


About the Authors

Nitin Kumar

Nitin Kumar

Nitin is a Cloud Engineer (ETL) at AWS, specializing in AWS Glue. With a decade of experience, he excels in aiding customers with their big data workloads, focusing on data processing and analytics. In his free time, he likes to watch movies and spend time with his family.

Shubham Purwar

Shubham Purwar

Shubham is an Analytics Specialist Solutions Architect at AWS. He helps organizations unlock the full potential of their data by designing and implementing scalable, secure, and high-performance analytics solutions on AWS. In his free time, Shubham loves to spend time with his family and travel around the world.

Noritaka Sekiyama

Noritaka Sekiyama

Noritaka is a Principal Big Data Architect on the AWS Glue team. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Migrate Delta tables from Azure Data Lake Storage to Amazon S3 using AWS Glue

Post Syndicated from Nitin Kumar original https://aws.amazon.com/blogs/big-data/migrate-delta-tables-from-azure-data-lake-storage-to-amazon-s3-using-aws-glue/

Organizations are increasingly using a multi-cloud strategy to run their production workloads. We often see requests from customers who have started their data journey by building data lakes on Microsoft Azure, to extend access to the data to AWS services. Customers want to use a variety of AWS analytics, data, AI, and machine learning (ML) services like AWS Glue, Amazon Redshift, and Amazon SageMaker to build more cost-efficient, performant data solutions harnessing the strength of individual cloud service providers for their business use cases.

In such scenarios, data engineers face challenges in connecting and extracting data from storage containers on Microsoft Azure. Customers typically use Azure Data Lake Storage Gen2 (ADLS Gen2) as their data lake storage medium and store the data in open table formats like Delta tables, and want to use AWS analytics services like AWS Glue to read the delta tables. AWS Glue, with its ability to process data using Apache Spark and connect to various data sources, is a suitable solution for addressing the challenges of accessing data across multiple cloud environments.

AWS Glue is a serverless data integration service that makes it straightforward to discover, prepare, and combine data for analytics, ML, and application development. AWS Glue custom connectors allow you to discover and integrate additional data sources, such as software as a service (SaaS) applications and your custom data sources. With just a few clicks, you can search for and subscribe to connectors from AWS Marketplace and begin your data preparation workflow in minutes.

In this post, we explain how you can extract data from ADLS Gen2 using the Azure Data Lake Storage Connector for AWS Glue. We specifically demonstrate how to import data stored in Delta tables in ADLS Gen2. We provide step-by-step guidance on how to configure the connector, author an AWS Glue ETL (extract, transform, and load) script, and load the extracted data into Amazon Simple Storage Service (Amazon S3).

Azure Data Lake Storage Connector for AWS Glue

The Azure Data Lake Storage Connector for AWS Glue simplifies the process of connecting AWS Glue jobs to extract data from ADLS Gen2. It uses the Hadoop’s FileSystem interface and the ADLS Gen2 connector for Hadoop. The Azure Data Lake Storage Connector for AWS Glue also includes the hadoop-azure module, which lets you run Apache Hadoop or Apache Spark jobs directly with data in ADLS. When the connector is added to the AWS Glue environment, AWS Glue loads the library from the Amazon Elastic Container Registry (Amazon ECR) repository during initialization (as a connector). When AWS Glue has internet access, the Spark job in AWS Glue can read from and write to ADLS.

With the availability of the Azure Data Lake Storage Connector for AWS Glue in AWS Marketplace, an AWS Glue connection makes sure you have the required packages to use in your AWS Glue job.

For this post, we use the Shared Key authentication method.

Solution overview

In this post, our objective is to migrate a product table named sample_delta_table, which currently resides in ADLS Gen2, to Amazon S3. To accomplish this, we use AWS Glue, the Azure Data Lake Storage Connector for AWS Glue, and AWS Secrets Manager to securely store the Azure shared key. We employed an AWS Glue serverless ETL job, configured with the connector, to establish a connection to ADLS using shared key authentication over the public internet. After the table is migrated to Amazon S3, we use Amazon Athena to query Delta Lake tables.

The following architecture diagram illustrates how AWS Glue facilitates data ingestion from ADLS.

Prerequisites

You need the following prerequisites:

Configure your ADLS Gen2 account in Secrets Manager

Complete the following steps to create a secret in Secrets Manager to store the ADLS credentials:

  1. On the Secrets Manager console, choose Store a new secret.
  2. For Secret type, select Other type of secret.
  3. Enter the key accountName for the ADLS Gen2 storage account name.
  4. Enter the key accountKey for the ADLS Gen2 storage account key.
  5. Enter the key container for the ADLS Gen2 container.
  6. Leave the rest of the options as default and choose Next.

  1. Enter a name for the secret (for example, adlstorage_credentials).
  2. Choose Next.
  3. Complete the rest of the steps to store the secret.

Subscribe to the Azure Data Lake Storage Connector for AWS Glue

The Azure Data Lake Storage Connector for AWS Glue simplifies the process of connecting AWS Glue jobs to extract data from ADLS Gen2. The connector is available as an AWS Marketplace offering.

Complete the following steps to subscribe to the connector:

  1. Log in to your AWS account with the necessary permissions.
  2. Navigate to the AWS Marketplace page for the Azure Data Lake Storage Connector for AWS Glue.
  3. Choose Continue to Subscribe.
  4. Choose Continue to Configuration after reading the EULA.

  1. For Fulfilment option, choose Glue 4.0.
  2. For Software version, choose the latest software version.
  3. Choose Continue to Launch.

Create a custom connection in AWS Glue

After you’re subscribed to the connector, complete the following steps to create an AWS Glue connection based on it. This connection will be added to the AWS Glue job to make sure the connector is available and the data store connection information is accessible to establish a network pathway.

To create the AWS Glue connection, you need to activate the Azure Data Lake Storage Connector for AWS Glue on the AWS Glue Studio console. After you choose Continue to Launch in the previous steps, you’re redirected to the connector landing page.

  1. In the Configuration details section, choose Usage instructions.
  2. Choose Activate the Glue connector from AWS Glue Studio.

The AWS Glue Studio console allows the option to either activate the connector or activate it and create the connection in one step. For this post, we choose the second option.

  1. For Connector, confirm Azure ADLS Connector for AWS Glue 4.0 is selected.
  2. For Name, enter a name for the connection (for example, AzureADLSStorageGen2Connection).
  3. Enter an optional description.
  4. Choose Create connection and activate connector.

The connection is now ready for use. The connector and connection information is visible on the Data connections page of the AWS Glue console.


Read Delta tables from ADLS Gen2 using the connector in an AWS Glue ETL job

Complete the following steps to create an AWS Glue job and configure the AWS Glue connection and job parameter options:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Choose Author code with a script editor and choose Script editor.
  3. Choose Create script and go to the Job details section.
  4. Update the settings for Name and IAM role.
  5. Under Advanced properties, add the AWS Glue connection AzureADLSStorageGen2Connection created in previous steps.
  1. For Job parameters, add the key --datalake-formats with the value as delta.
  1. Use the following script to read the Delta table from ADLS. Provide the path to where you have Delta table files in your Azure storage account container and the S3 bucket for writing delta files to the output S3 location.
from pyspark.sql import SparkSession
from delta.tables import *
import boto3
import json

spark = SparkSession.builder.getOrCreate()

sm = boto3.client('secretsmanager')
response = sm.get_secret_value(SecretId="adlstorage_credentials")
value = json.loads(response['SecretString'])
account_name_sparkconfig = f"fs.azure.account.key.{value['accountName']}.dfs.core.windows.net"
account_name = value['accountName']
account_key = value['accountKey']
container_name = value['container']
path = f"abfss://{container_name}@{account_name}.dfs.core.windows.net/path-to-delta-table-files/"
s3DeltaTablePath="s3://yourdatalakebucketname/deltatablepath/"

# Method: Shared Key  
spark.conf.set(account_name_sparkconfig, account_key)

# Read delta table from ADLS gen2 storage
df = spark.read.format("delta").load(path)

# Write delta table to S3 path.
if DeltaTable.isDeltaTable(spark,s3DeltaTablePath):
    s3deltaTable = DeltaTable.forPath(spark,s3DeltaTablePath)
    print("Merge to existing s3 delta table")
    (s3deltaTable.alias("target")
        .merge(df.alias("source"), "target.product_id = source.product_id")
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )
else:
    print("Create delta table to S3.")
    df.write.format("delta").save(s3DeltaTablePath)
  1. Choose Run to start the job.
  2. On the Runs tab, confirm the job ran successfully.
  3. On the Amazon S3 console, verify the delta files in the S3 bucket (Delta table path).
  4. Create a database and table in Athena to query the migrated Delta table in Amazon S3.

You can accomplish this step using an AWS Glue crawler. The crawler can automatically crawl your Delta table stored in Amazon S3 and create the necessary metadata in the AWS Glue Data Catalog. Athena can then use this metadata to query and analyze the Delta table seamlessly. For more information, see Crawl Delta Lake tables using AWS Glue crawlers.

CREATE DATABASE deltadb;
CREATE EXTERNAL TABLE deltadb.sample_delta_table
LOCATION 's3://yourdatalakebucketname/deltatablepath/'
TBLPROPERTIES ('table_type'='DELTA');

12. Query the Delta table:

SELECT * FROM "deltadb"."sample_delta_table" limit 10;

By following the steps outlined in the post, you have successfully migrated a Delta table from ADLS Gen2 to Amazon S3 using an AWS Glue ETL job.

Read the Delta table in an AWS Glue notebook

The following are optional steps if you want to read the Delta table from ADLS Gen2 in an AWS Glue notebook:

  1. Create a notebook and run the following code in the first notebook cell to configure the AWS Glue connection and --datalake-formats in an interactive session:
%idle_timeout 30
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5
%connections AzureADLSStorageGen2Connection
%%configure
{
   "--datalake-formats": "delta"
}

  1. Run the following code in a new cell to read the Delta table stored in ADLS Gen 2. Provide the path to where you have delta files in an Azure storage account container and the S3 bucket for writing delta files to Amazon S3.
from pyspark.sql import SparkSession
from delta.tables import *
import boto3
import json

spark = SparkSession.builder.getOrCreate()

sm = boto3.client('secretsmanager')
response = sm.get_secret_value(SecretId="adlstorage_credentials")
value = json.loads(response['SecretString'])
account_name_sparkconfig = f"fs.azure.account.key.{value['accountName']}.dfs.core.windows.net"
account_name = value['accountName']
account_key = value['accountKey']
container_name = value['container']
path = f"abfss://{container_name}@{account_name}.dfs.core.windows.net/path-to-delta-table-files/"
s3DeltaTablePath="s3://yourdatalakebucketname/deltatablepath/"

# Method: Shared Key  
spark.conf.set(account_name_sparkconfig, account_key)

# Read delta table from ADLS gen2 storage
df = spark.read.format("delta").load(path)

# Write delta table to S3 path.
if DeltaTable.isDeltaTable(spark,s3DeltaTablePath):
    s3deltaTable = DeltaTable.forPath(spark,s3DeltaTablePath)
    print("Merge to existing s3 delta table")
    (s3deltaTable.alias("target")
        .merge(df.alias("source"), "target.product_id = source.product_id")
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )
else:
    print("Create delta table to S3.")
    df.write.format("delta").save(s3DeltaTablePath)

Clean up

To clean up your resources, complete the following steps:

  1. Remove the AWS Glue job, database, table, and connection:
    1. On the AWS Glue console, choose Tables in the navigation pane, select sample_delta_table, and choose Delete.
    2. Choose Databases in the navigation pane, select deltadb, and choose Delete.
    3. Choose Connections in the navigation pane, select AzureADLSStorageGen2Connection, and on the Actions menu, choose Delete.
  2. On the Secrets Manager console, choose Secrets in the navigation pane, select adlstorage_credentials, and on the Actions menu, choose Delete secret.
  3. If you are no longer going to use this connector, you can cancel the subscription to the connector:
    1. On the AWS Marketplace console, choose Manage subscriptions.
    2. Select the subscription for the product that you want to cancel, and on the Actions menu, choose Cancel subscription.
    3. Read the information provided and select the acknowledgement check box.
    4. Choose Yes, cancel subscription.
  4. On the Amazon S3 console, delete the data in the S3 bucket that you used in the previous steps. 

You can also use the AWS Command Line Interface (AWS CLI) to remove the AWS Glue and Secrets Manager resources. Remove the AWS Glue job, database, table, connection, and Secrets Manager secret with the following command:

aws glue delete-job —job-name <your_job_name>
aws glue delete-connection —connection-name <your_connection_name>
aws secretsmanager delete-secret —secret-id <your_secretsmanager_id>
aws glue delete-table --database-name deltadb --name sample_delta_table
aws glue delete-database --name deltadb

Conclusion

In this post, we demonstrated a real-world example of migrating a Delta table from Azure Delta Lake Storage Gen2 to Amazon S3 using AWS Glue. We used an AWS Glue serverless ETL job, configured with an AWS Marketplace connector, to establish a connection to ADLS using shared key authentication over the public internet. Additionally, we used Secrets Manager to securely store the shared key and seamlessly integrate it within the AWS Glue ETL job, providing a secure and efficient migration process. Lastly, we provided guidance on querying the Delta Lake table from Athena.

Try out the solution for your own use case, and let us know your feedback and questions in the comments.


About the Authors

Nitin Kumar is a Cloud Engineer (ETL) at Amazon Web Services, specialized in AWS Glue. With a decade of experience, he excels in aiding customers with their big data workloads, focusing on data processing and analytics. He is committed to helping customers overcome ETL challenges and develop scalable data processing and analytics pipelines on AWS. In his free time, he likes to watch movies and spend time with his family.

Shubham Purwar is a Cloud Engineer (ETL) at AWS Bengaluru, specialized in AWS Glue and Amazon Athena. He is passionate about helping customers solve issues related to their ETL workload and implement scalable data processing and analytics pipelines on AWS. In his free time, Shubham loves to spend time with his family and travel around the world.

Pramod Kumar P is a Solutions Architect at Amazon Web Services. With 19 years of technology experience and close to a decade of designing and architecting connectivity solutions (IoT) on AWS, he guides customers to build solutions with the right architectural tenets to meet their business outcomes.

Madhavi Watve is a Senior Solutions Architect at Amazon Web Services, providing help and guidance to a broad range of customers to build highly secure, scalable, reliable, and cost-efficient applications on the cloud. She brings over 20 years of technology experience in software development and architecture and is data analytics specialist.

Swathi S is a Technical Account Manager with the Enterprise Support team in Amazon Web Services. She has over 6 years of experience with AWS on big data technologies and specializes in analytics frameworks. She is passionate about helping AWS customers navigate the cloud space and enjoys assisting with design and optimization of analytics workloads on AWS.