All posts by Noritaka Sekiyama

Enhance monitoring and debugging for AWS Glue jobs using new job observability metrics: Part 2

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/part-2-enhance-monitoring-and-debugging-for-aws-glue-jobs-using-new-job-observability-metrics/

Monitoring data pipelines in real time is critical for catching issues early and minimizing disruptions. AWS Glue has made this more straightforward with the launch of AWS Glue job observability metrics, which provide valuable insights into your data integration pipelines built on AWS Glue. However, you might need to track key performance indicators across multiple jobs. In this case, a dashboard that can visualize the same metrics with the ability to drill down into individual issues is an effective solution to monitor at scale.

This post, walks through how to integrate AWS Glue job observability metrics with Grafana using Amazon Managed Grafana. We discuss the types of metrics and charts available to surface key insights along with two use cases on monitoring error classes and throughput of your AWS Glue jobs.

Solution overview

Grafana is an open source visualization tool that allows you to query, visualize, alert on, and understand your metrics no matter where they are stored. With Grafana, you can create, explore, and share visually rich, data-driven dashboards. The new AWS Glue job observability metrics can be effortlessly integrated with Grafana for real-time monitoring purpose. Metrics like worker utilization, skewness, I/O rate, and errors are captured and visualized in easy-to-read Grafana dashboards. The integration with Grafana provides a flexible way to build custom views of pipeline health tailored to your needs. Observability metrics open up monitoring capabilities that weren’t possible before for AWS Glue. Companies relying on AWS Glue for critical data integration pipelines can have greater confidence that their pipelines are running efficiently.

AWS Glue job observability metrics are emitted as Amazon CloudWatch metrics. You can provision and manage Amazon Managed Grafana, and configure the CloudWatch plugin for the given metrics. The following diagram illustrates the solution architecture.

Implement the solution

Complete following steps to set up the solution:

  1. Set up an Amazon Managed Grafana workspace.
  2. Sign in to your workspace.
  3. Choose Administration.
  4. Choose Add new data source.
  5. Choose CloudWatch.
  6. For Default Region, select your preferred AWS Region.
  7. For Namespaces of Custom Metrics, enter Glue.
  8. Choose Save & test.

Now the CloudWatch data source has been registered.

  1. Copy the data source ID from the URL https://g-XXXXXXXXXX.grafana-workspace.<region>.amazonaws.com/datasources/edit/<data-source-ID>/.

The next step is to prepare the JSON template file.

  1. Download the Grafana template.
  2. Replace <data-source-id> in the JSON file with your Grafana data source ID.

Lastly, configure the dashboard.

  1. On the Grafana console, choose Dashboards.
  2. Choose Import on the New menu.
  3. Upload your JSON file, and choose Import.

The Grafana dashboard visualizes AWS Glue observability metrics, as shown in the following screenshots.

The sample dashboard has the following charts:

  • [Reliability] Job Run Errors Breakdown
  • [Throughput] Bytes Read & Write
  • [Throughput] Records Read & Write
  • [Resource Utilization] Worker Utilization
  • [Job Performance] Skewness
  • [Resource Utilization] Disk Used (%)
  • [Resource Utilization] Disk Available (GB)
  • [Executor OOM] OOM Error Count
  • [Executor OOM] Heap Memory Used (%)
  • [Driver OOM] OOM Error Count
  • [Driver OOM] Heap Memory Used (%)

Analyze the causes of job failures

Let’s try analyzing the causes of job run failures of the job iot_data_processing.

First, look at the pie chart [Reliability] Job Run Errors Breakdown. This pie chart quickly identifies which errors are most common.

Then filter with the job name iot_data_processing to see the common errors for this job.

We can observe that the majority (75%) of failures were due to glue.error.DISK_NO_SPACE_ERROR.

Next, look at the line chart [Resource Utilization] Disk Used (%) to understand the driver’s used disk space during the job runs. For this job, the green line shows the driver’s disk usage, and the yellow line shows the average of the executors’ disk usage.

We can observe that there were three times when 100% of disk was used in executors.

Next, look at the line chart [Throughput] Records Read & Write to see whether the data volume was changed and whether it impacted disk usage.

The chart shows that around four billion records were read at the beginning of this range; however, around 63 billion records were read at the peak. This means that the incoming data volume has significantly increased, and caused local disk space shortage in the worker nodes. For such cases, you can increase the number of workers, enable auto scaling, or choose larger worker types.

After implementing those suggestions, we can see lower disk usage and a successful job run.

(Optional) Configure cross-account setup

We can optionally configure a cross-account setup. Cross-account metrics depend on CloudWatch cross-account observability. In this setup, we expect the following environment:

  • AWS accounts are not managed in AWS Organizations
  • You have two accounts: one account is used as the monitoring account where Grafana is located, another account is used as the source account where the AWS Glue-based data integration pipeline is located

To configure a cross-account setup for this environment, complete the following steps for each account.

Monitoring account

Complete the following steps to configure your monitoring account:

  1. Sign in to the AWS Management Console using the account you will use for monitoring.
  2. On the CloudWatch console, choose Settings in the navigation pane.
  3. Under Monitoring account configuration, choose Configure.
  4. For Select data, choose Metrics.
  5. For List source accounts, enter the AWS account ID of the source account that this monitoring account will view.
  6. For Define a label to identify your source account, choose Account name.
  7. Choose Configure.

Now the account is successfully configured as a monitoring account.

  1. Under Monitoring account configuration, choose Resources to link accounts.
  2. Choose Any account to get a URL for setting up individual accounts as source accounts.
  3. Choose Copy URL.

You will use the copied URL from the source account in the next steps.

Source account

Complete the following steps to configure your source account:

  1. Sign in to the console using your source account.
  2. Enter the URL that you copied from the monitoring account.

You can see the CloudWatch settings page, with some information filled in.

  1. For Select data, choose Metrics.
  2. Do not change the ARN in Enter monitoring account configuration ARN.
  3. The Define a label to identify your source account section is pre-filled with the label choice from the monitoring account. Optionally, choose Edit to change it.
  4. Choose Link.
  5. Enter Confirm in the box and choose Confirm.

Now your source account has been configured to link to the monitoring account. The metrics emitted in the source account will show on the Grafana dashboard in the monitoring account.

To learn more, see CloudWatch cross-account observability.

Considerations

The following are some considerations when using this solution:

  • Grafana integration is defined for real-time monitoring. If you have a basic understanding of your jobs, it will be straightforward for you to monitor performance, errors, and more on the Grafana dashboard.
  • Amazon Managed Grafana depends on AWS IAM Identify Center. This means you need to manage single sign-on (SSO) users separately, not just AWS Identity and Access Management (IAM) users and roles. It also requires another sign-in step from the AWS console. The Amazon Managed Grafana pricing model depends on an active user license per workspace. More users can cause more charges.
  • Graph lines are visualized per job. If you want to see the lines across all the jobs, you can choose ALL in the control.

Conclusion

AWS Glue job observability metrics offer a powerful new capability for monitoring data pipeline performance in real time. By streaming key metrics to CloudWatch and visualizing them in Grafana, you gain more fine-grained visibility that wasn’t possible before. This post showed how straightforward it is to enable observability metrics and integrate the data with Grafana using Amazon Managed Grafana. We explored the different metrics available and how to build customized Grafana dashboards to surface actionable insights.

Observability is now an essential part of robust data orchestration on AWS. With the ability to monitor data integration trends in real time, you can optimize costs, performance, and reliability.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his new road bike.

Xiaoxi Liu is a Software Development Engineer on the AWS Glue team. Her passion is building scalable distributed systems for efficiently managing big data on the cloud, and her concentrations are distributed system, big data, and cloud computing.

Akira Ajisaka is a Senior Software Development Engineer on the AWS Glue team. He likes open source software and distributed systems. In his spare time, he enjoys playing arcade games.

Shenoda Guirguis is a Senior Software Development Engineer on the AWS Glue team. His passion is in building scalable and distributed data infrastructure and processing systems. When he gets a chance, Shenoda enjoys reading and playing soccer.

Sean Ma is a Principal Product Manager on the AWS Glue team. He has an 18-year track record of innovating and delivering enterprise products that unlock the power of data for users. Outside of work, Sean enjoys scuba diving and college football.

Mohit Saxena is a Senior Software Development Manager on the AWS Glue team. His team focuses on building distributed systems to enable customers with interactive and simple to use interfaces to efficiently manage and transform petabytes of data seamlessly across data lakes on Amazon S3, databases and data-warehouses on cloud.

New Amazon CloudWatch log class to cost-effectively scale your AWS Glue workloads

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/new-amazon-cloudwatch-log-class-to-cost-effectively-scale-your-aws-glue-workloads/

AWS Glue is a serverless data integration service that makes it easier to discover, prepare, and combine data for analytics, machine learning (ML), and application development. You can use AWS Glue to create, run, and monitor data integration and ETL (extract, transform, and load) pipelines and catalog your assets across multiple data stores.

One of the most common questions we get from customers is how to effectively optimize costs on AWS Glue. Over the years, we have built multiple features and tools to help customers manage their AWS Glue costs. For example, AWS Glue Auto Scaling and AWS Glue Flex can help you reduce the compute cost associated with processing your data. AWS Glue interactive sessions and notebooks can help you reduce the cost of developing your ETL jobs. For more information about cost-saving best practices, refer to Monitor and optimize cost on AWS Glue for Apache Spark. Additionally, to understand data transfer costs, refer to the Cost Optimization Pillar defined in AWS Well-Architected Framework. For data storage, you can apply general best practices defined for each data source. For a cost optimization strategy using Amazon Simple Storage Service (Amazon S3), refer to Optimizing storage costs using Amazon S3.

In this post, we tackle the remaining piece—the cost of logs written by AWS Glue.

Before we get into the cost analysis of logs, let’s understand the reasons to enable logging for your AWS Glue job and the current options available. When you start an AWS Glue job, it sends the real-time logging information to Amazon CloudWatch (every 5 seconds and before each executor stops) during the Spark application starts running. You can view the logs on the AWS Glue console or the CloudWatch console dashboard. These logs provide you with insights into your job runs and help you optimize and troubleshoot your AWS Glue jobs. AWS Glue offers a variety of filters and settings to reduce the verbosity of your logs. As the number of job runs increases, so does the volume of logs generated.

To optimize CloudWatch Logs costs, AWS recently announced a new log class for infrequently accessed logs called Amazon CloudWatch Logs Infrequent Access (Logs IA). This new log class offers a tailored set of capabilities at a lower cost for infrequently accessed logs, enabling you to consolidate all your logs in one place in a cost-effective manner. This class provides a more cost-effective option for ingesting logs that only need to be accessed occasionally for auditing or debugging purposes.

In this post, we explain what the Logs IA class is, how it can help reduce costs compared to the standard log class, and how to configure your AWS Glue resources to use this new log class. By routing logs to Logs IA, you can achieve significant savings in your CloudWatch Logs spend without sacrificing access to important debugging information when you need it.

CloudWatch log groups used by AWS Glue job continuous logging

When continuous logging is enabled, AWS Glue for Apache Spark writes Spark driver/executor logs and progress bar information into the following log group:

/aws-glue/jobs/logs-v2

If a security configuration is enabled for CloudWatch logs, AWS Glue for Apache Spark will create a log group named as follows for continuous logs:

<Log-Group-Name>-<Security-Configuration-Name>

The default and custom log groups will be as follows:

  • The default continuous log group will be /aws-glue/jobs/logs-v2-<Security-Configuration-Name>
  • The custom continuous log group will be <custom-log-group-name>-<Security-Configuration-Name>

You can provide a custom log group name through the job parameter –continuous-log-logGroup.

Getting started with the new Infrequent Access log class for AWS Glue workload

To gain the benefits from Logs IA for your AWS Glue workloads, you need to complete the following two steps:

  1. Create a new log group using the new Log IA class.
  2. Configure your AWS Glue job to point to the new log group

Complete the following steps to create a new log group using the new Infrequent Access log class:

  1. On the CloudWatch console, choose Log groups under Logs in the navigation pane.
  2. Choose Create log group.
  3. For Log group name, enter /aws-glue/jobs/logs-v2-infrequent-access.
  4. For Log class, choose Infrequent Access.
  5. Choose Create.

Complete the following steps to configure your AWS Glue job to point to the new log group:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Choose your job.
  3. On the Job details tab, choose Add new parameter under Job parameters.
  4. For Key, enter --continuous-log-logGroup.
  5. For Value, enter /aws-glue/jobs/logs-v2-infrequent-access.
  6. Choose Save.
  7. Choose Run to trigger the job.

New log events are written into the new log group.

View the logs with the Infrequent Access log class

Now you’re ready to view the logs with the Infrequent Access log class. Open the log group /aws-glue/jobs/logs-v2-infrequent-access on the CloudWatch console.

When you choose one of the log streams, you will notice that it redirects you to the CloudWatch console Logs Insight page with a pre-configured default command and your log stream selected by default. By choosing Run query, you can view the actual log events on the Logs Insights page.

Considerations

Keep in mind the following considerations:

  • You cannot change the log class of a log group after it’s created. You need to create a new log group to configure the Infrequent Access class.
  • The Logs IA class offers a subset of CloudWatch Logs capabilities, including managed ingestion, storage, cross-account log analytics, and encryption with a lower ingestion price per GB. For example, you can’t view log events through the standard CloudWatch Logs console. To learn more about the features offered across both log classes, refer to Log Classes.

Conclusion

This post provided step-by-step instructions to guide you through enabling Logs IA for your AWS Glue job logs. If your AWS Glue ETL jobs generate large volumes of log data that makes it a challenge as you scale your applications, the best practices demonstrated in this post can help you cost-effectively scale while centralizing all your logs in CloudWatch Logs. Start using the Infrequent Access class with your AWS Glue workloads today and enjoy the cost benefits.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling on his road bike.

Abeetha Bala is a Senior Product Manager for Amazon CloudWatch, primarily focused on logs. Being customer obsessed, she solves observability challenges through innovative and cost-effective ways.

Kinshuk Pahare is a leader in AWS Glue’s product management team. He drives efforts on the platform, developer experience, and big data processing frameworks like Apache Spark, Ray, and Python Shell.

Build and manage your modern data stack using dbt and AWS Glue through dbt-glue, the new “trusted” dbt adapter

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/build-and-manage-your-modern-data-stack-using-dbt-and-aws-glue-through-dbt-glue-the-new-trusted-dbt-adapter/

dbt is an open source, SQL-first templating engine that allows you to write repeatable and extensible data transforms in Python and SQL. dbt focuses on the transform layer of extract, load, transform (ELT) or extract, transform, load (ETL) processes across data warehouses and databases through specific engine adapters to achieve extract and load functionality. It enables data engineers, data scientists, and analytics engineers to define the business logic with SQL select statements and eliminates the need to write boilerplate data manipulation language (DML) and data definition language (DDL) expressions. dbt lets data engineers quickly and collaboratively deploy analytics code following software engineering best practices like modularity, portability, continuous integration and continuous delivery (CI/CD), and documentation.

dbt is predominantly used by data warehouses (such as Amazon Redshift) customers who are looking to keep their data transform logic separate from storage and engine. We have seen a strong customer demand to expand its scope to cloud-based data lakes because data lakes are increasingly the enterprise solution for large-scale data initiatives due to their power and capabilities.

In 2022, AWS published a dbt adapter called dbt-glue—the open source, battle-tested dbt AWS Glue adapter that allows data engineers to use dbt for cloud-based data lakes along with data warehouses and databases, paying for just the compute they need. The dbt-glue adapter democratized access for dbt users to data lakes, and enabled many users to effortlessly run their transformation workloads on the cloud with the serverless data integration capability of AWS Glue. From the launch of the adapter, AWS has continued investing into dbt-glue to cover more requirements.

Today, we are pleased to announce that the dbt-glue adapter is now a trusted adapter based on our strategic collaboration with dbt Labs. Trusted adapters are adapters not maintained by dbt Labs, but adaptors that that dbt Lab is comfortable recommending to users for use in production.

The key capabilities of the dbt-glue adapter are as follows:

  • Runs SQL as Spark SQL on AWS Glue interactive sessions
  • Manages table definitions on the AWS Glue Data Catalog
  • Supports open table formats such as Apache Hudi, Delta Lake, and Apache Iceberg
  • Supports AWS Lake Formation permissions for fine-grained access control

In addition to those capabilities, the dbt-glue adapter is designed to optimize resource utilization with several techniques on top of AWS Glue interactive sessions.

This post demonstrates how the dbt-glue adapter helps your workload, and how you can build a modern data stack using dbt and AWS Glue using the dbt-glue adapter.

Common use cases

One common use case for using dbt-glue is if a central analytics team at a large corporation is responsible for monitoring operational efficiency. They ingest application logs into raw Parquet tables in an Amazon Simple Storage Service (Amazon S3) data lake. Additionally, they extract organized data from operational systems capturing the company’s organizational structure and costs of diverse operational components that they stored in the raw zone using Iceberg tables to maintain the original schema, facilitating easy access to the data. The team uses dbt-glue to build a transformed gold model optimized for business intelligence (BI). The gold model joins the technical logs with billing data and organizes the metrics per business unit. The gold model uses Iceberg’s ability to support data warehouse-style modeling needed for performant BI analytics in a data lake. The combination of Iceberg and dbt-glue allows the team to efficiently build a data model that’s ready to be consumed.

Another common use case is when an analytics team in a company that has an S3 data lake creates a new data product in order to enrich its existing data from its data lake with medical data. Let’s say that this company is located in Europe and the data product must comply with the GDPR. For this, the company uses Iceberg to meet needs such as the right to be forgotten and the deletion of data. The company uses dbt to model its data product on its existing data lake due to its compatibility with AWS Glue and Iceberg and the simplicity that the dbt-glue adapter brings to the use of this storage format.

How dbt and dbt-glue work

The following are key dbt features:

  • Project – A dbt project enforces a top-level structure on the staging, models, permissions, and adapters. A project can be checked into a GitHub repo for version control.
  • SQL – dbt relies on SQL select statements for defining data transformation logic. Instead of raw SQL, dbt offers templatized SQL (using Jinja) that allows code modularity. Instead of having to copy/paste SQL in multiple places, data engineers can define modular transforms and call those from other places within the project. Having a modular pipeline helps data engineers collaborate on the same project.
  • Models – dbt models are primarily written as a SELECT statement and saved as a .sql file. Data engineers define dbt models for their data representations. To learn more, refer to About dbt models.
  • Materializations – Materializations are strategies for persisting dbt models in a warehouse. There are five types of materializations built into dbt: table, view, incremental, ephemeral, and materialized view. To learn more, refer to Materializations and Incremental models.
  • Data lineage – dbt tracks data lineage, allowing you to understand the origin of data and how it flows through different transformations. dbt also supports impact analysis, which helps identify the downstream effects of changes.

The high-level data flow is as follows:

  1. Data engineers ingest data from data sources to raw tables and define table definitions for the raw tables.
  2. Data engineers write dbt models with templatized SQL.
  3. The dbt adapter converts dbt models to SQL statements compatible in a data warehouse.
  4. The data warehouse runs the SQL statements to create intermediate tables or final tables, views, or materialized views.

The following diagram illustrates the architecture.

dbt-glue works with the following steps:

  1. The dbt-glue adapter converts dbt models to SQL statements compatible in Spark SQL.
  2. AWS Glue interactive sessions run the SQL statements to create intermediate tables or final tables, views, or materialized views.
  3. dbt-glue supports csv, parquet, hudi, delta, and iceberg as fileformat.
  4. On the dbt-glue adapter, table or incremental are commonly used for materializations at the destination. There are three strategies for incremental materialization. The merge strategy requires hudi, delta, or iceberg. With the other two strategies, append and insert_overwrite, you can use csv, parquet, hudi, delta, or iceberg.

The following diagram illustrates this architecture.

Example use case

In this post, we use the data from the New York City Taxi Records dataset. This dataset is available in the Registry of Open Data on AWS (RODA), which is a repository containing public datasets from AWS resources. The raw Parquet table records in this dataset stores trip records.

The objective is to create the following three tables, which contain metrics based on the raw table:

  • silver_avg_metrics – Basic metrics based on NYC Taxi Open Data for the year 2016
  • gold_passengers_metrics – Metrics per passenger based on the silver metrics table
  • gold_cost_metrics – Metrics per cost based on the silver metrics table

The final goal is to create two well-designed gold tables that store already aggregated results in Iceberg format for ad hoc queries through Amazon Athena.

Prerequisites

The instruction requires following prerequisites:

  • An AWS Identity and Access Management (IAM) role with all the mandatory permissions to run an AWS Glue interactive session and the dbt-glue adapter
  • An AWS Glue database and table to store the metadata related to the NYC taxi records dataset
  • An S3 bucket to use as output and store the processed data
  • An Athena configuration (a workgroup and S3 bucket to store the output) to explore the dataset
  • An AWS Lambda function (created as an AWS CloudFormation custom resource) that updates all the partitions in the AWS Glue table

With these prerequisites, we simulate the situation that data engineers have already ingested data from data sources to raw tables, and defined table definitions for the raw tables.

For ease of use, we prepared a CloudFormation template. This template deploys all the required infrastructure. To create these resources, choose Launch Stack in the us-east-1 Region, and follow the instructions:

Install dbt, the dbt CLI, and the dbt adaptor

The dbt CLI is a command line interface for running dbt projects. It’s free to use and available as an open source project. Install dbt and the dbt CLI with the following code:

$ pip3 install --no-cache-dir dbt-core

For more information, refer to How to install dbt, What is dbt?, and Viewpoint.

Install the dbt adapter with the following code:

$ pip3 install --no-cache-dir dbt-glue

Create a dbt project

Complete the following steps to create a dbt project:

  1. Run the dbt init command to create and initialize a new empty dbt project:
    $ dbt init

  2. For the project name, enter dbt_glue_demo.
  3. For the database, choose glue.

Now the empty project has been created. The directory structure is shown as follows:

$ cd dbt_glue_demo 
$ tree .
.
├── README.md
├── analyses
├── dbt_project.yml
├── macros
├── models
│   └── example
│       ├── my_first_dbt_model.sql
│       ├── my_second_dbt_model.sql
│       └── schema.yml
├── seeds
├── snapshots
└── tests

Create a source

The next step is to create a source table definition. We add models/source_tables.yml with the following contents:

version: 2

sources:
  - name: data_source
    schema: nyctaxi

    tables:
      - name: records

This source definition corresponds to the AWS Glue table nyctaxi.records, which we created in the CloudFormation stack.

Create models

In this step, we create a dbt model that represents the average values for trip duration, passenger count, trip distance, and total amount of charges. Complete the following steps:

  1. Create the models/silver/ directory.
  2. Create the file models/silver/silver_avg_metrics.sql with the following contents:
    WITH source_avg as ( 
        SELECT avg((CAST(dropoff_datetime as LONG) - CAST(pickup_datetime as LONG))/60) as avg_duration 
        , avg(passenger_count) as avg_passenger_count 
        , avg(trip_distance) as avg_trip_distance 
        , avg(total_amount) as avg_total_amount
        , year
        , month 
        , type
        FROM {{ source('data_source', 'records') }} 
        WHERE year = "2016"
        AND dropoff_datetime is not null 
        GROUP BY year, month, type
    ) 
    SELECT *
    FROM source_avg

  3. Create the file models/silver/schema.yml with the following contents:
    version: 2
    
    models:
      - name: silver_avg_metrics
        description: This table has basic metrics based on NYC Taxi Open Data for the year 2016
    
        columns:
          - name: avg_duration
            description: The average duration of a NYC Taxi trip
    
          - name: avg_passenger_count
            description: The average number of passenger per NYC Taxi trip
    
          - name: avg_trip_distance
            description: The average NYC Taxi trip distance
    
          - name: avg_total_amount
            description: The avarage amount of a NYC Taxi trip
    
          - name: year
            description: The year of the NYC Taxi trip
    
          - name: month
            description: The month of the NYC Taxi trip 
    
          - name: type
            description: The type of the NYC Taxi 

  4. Create the models/gold/ directory.
  5. Create the file models/gold/gold_cost_metrics.sql with the following contents:
    {{ config(
        materialized='incremental',
        incremental_strategy='merge',
        unique_key=["year", "month", "type"],
        file_format='iceberg',
        iceberg_expire_snapshots='False',
        table_properties={'format-version': '2'}
    ) }}
    SELECT (avg_total_amount/avg_trip_distance) as avg_cost_per_distance
    , (avg_total_amount/avg_duration) as avg_cost_per_minute
    , year
    , month 
    , type 
    FROM {{ ref('silver_avg_metrics') }}

  6. Create the file models/gold/gold_passengers_metrics.sql with the following contents:
    {{ config(
        materialized='incremental',
        incremental_strategy='merge',
        unique_key=["year", "month", "type"],
        file_format='iceberg',
        iceberg_expire_snapshots='False',
        table_properties={'format-version': '2'}
    ) }}
    SELECT (avg_total_amount/avg_passenger_count) as avg_cost_per_passenger
    , (avg_duration/avg_passenger_count) as avg_duration_per_passenger
    , (avg_trip_distance/avg_passenger_count) as avg_trip_distance_per_passenger
    , year
    , month 
    , type 
    FROM {{ ref('silver_avg_metrics') }}

  7. Create the file models/gold/schema.yml with the following contents:
    version: 2
    
    models:
      - name: gold_cost_metrics
        description: This table has metrics per cost based on NYC Taxi Open Data
    
        columns:
          - name: avg_cost_per_distance
            description: The average cost per distance of a NYC Taxi trip
    
          - name: avg_cost_per_minute
            description: The average cost per minute of a NYC Taxi trip
    
          - name: year
            description: The year of the NYC Taxi trip
    
          - name: month
            description: The month of the NYC Taxi trip
    
          - name: type
            description: The type of the NYC Taxi
    
      - name: gold_passengers_metrics
        description: This table has metrics per passenger based on NYC Taxi Open Data
    
        columns:
          - name: avg_cost_per_passenger
            description: The average cost per passenger for a NYC Taxi trip
    
          - name: avg_duration_per_passenger
            description: The average number of passenger per NYC Taxi trip
    
          - name: avg_trip_distance_per_passenger
            description: The average NYC Taxi trip distance
    
          - name: year
            description: The year of the NYC Taxi trip
    
          - name: month
            description: The month of the NYC Taxi trip 
    
          - name: type
            description: The type of the NYC Taxi

  8. Remove the models/example/ folder, because it’s just an example created in the dbt init command.

Configure the dbt project

dbt_project.yml is a key configuration file for dbt projects. It contains the following code:

models:
  dbt_glue_demo:
    # Config indicated by + and applies to all files under models/example/
    example:
      +materialized: view

We configure dbt_project.yml to replace the preceding code with the following:

models:
  dbt_glue_demo:
    silver:
      +materialized: table

This is because that we want to materialize the models under silver as Parquet tables.

Configure a dbt profile

A dbt profile is a configuration that specifies how to connect to a particular database. The profiles are defined in the profiles.yml file within a dbt project.

Complete the following steps to configure a dbt profile:

  1. Create the profiles directory.
  2. Create the file profiles/profiles.yml with the following contents:
    dbt_glue_demo:
      target: dev
      outputs:
        dev:
          type: glue
          query-comment: demo-nyctaxi
          role_arn: "{{ env_var('DBT_ROLE_ARN') }}"
          region: us-east-1
          workers: 5
          worker_type: G.1X
          schema: "dbt_glue_demo_nyc_metrics"
          database: "dbt_glue_demo_nyc_metrics"
          session_provisioning_timeout_in_seconds: 120
          location: "{{ env_var('DBT_S3_LOCATION') }}"

  3. Create the profiles/iceberg/ directory.
  4. Create the file profiles/iceberg/profiles.yml with the following contents:
    dbt_glue_demo:
      target: dev
      outputs:
        dev:
          type: glue
          query-comment: demo-nyctaxi
          role_arn: "{{ env_var('DBT_ROLE_ARN') }}"
          region: us-east-1
          workers: 5
          worker_type: G.1X
          schema: "dbt_glue_demo_nyc_metrics"
          database: "dbt_glue_demo_nyc_metrics"
          session_provisioning_timeout_in_seconds: 120
          location: "{{ env_var('DBT_S3_LOCATION') }}"
          datalake_formats: "iceberg"
          conf: --conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.warehouse="{{ env_var('DBT_S3_LOCATION') }}"warehouse/ --conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"

The last two lines are added for setting Iceberg configurations on AWS Glue interactive sessions.

Run the dbt project

Now it’s time to run the dbt project. Complete the following steps:

  1. To run the project dbt, you should be in the project folder:
    $ cd dbt_glue_demo

  2. The project requires you to set environment variables in order to run on the AWS account:
    $ export DBT_ROLE_ARN="arn:aws:iam::$(aws sts get-caller-identity --query "Account" --output text):role/GlueInteractiveSessionRole"
    $ export DBT_S3_LOCATION="s3://aws-dbt-glue-datalake-$(aws sts get-caller-identity --query "Account" --output text)-us-east-1"

  3. Make sure the profile is set up correctly from the command line:
    $ dbt debug --profiles-dir profiles
    ...
    05:34:22 Connection test: [OK connection ok]
    05:34:22 All checks passed!

If you see any failures, check if you provided the correct IAM role ARN and S3 location in Step 2.

  1. Run the models with the following code:
    $ dbt run -m silver --profiles-dir profiles
    $ dbt run -m gold --profiles-dir profiles/iceberg/

Now the tables are successfully created in the AWS Glue Data Catalog, and the data is materialized in the Amazon S3 location.

You can verify those tables by opening the AWS Glue console, choosing Databases in the navigation pane, and opening dbt_glue_demo_nyc_metrics.

Query materialized tables through Athena

Let’s query the target table using Athena to verify the materialized tables. Complete the following steps:

  1. On the Athena console, switch the workgroup to athena-dbt-glue-aws-blog.
  2. If the workgroup athena-dbt-glue-aws-blog settings dialog box appears, choose Acknowledge.
  3. Use the following query to explore the metrics created by the dbt project:
    SELECT cm.avg_cost_per_minute
        , cm.avg_cost_per_distance
        , pm.avg_cost_per_passenger
        , cm.year
        , cm.month
        , cm.type
    FROM "dbt_glue_demo_nyc_metrics"."gold_passengers_metrics" pm
    LEFT JOIN "dbt_glue_demo_nyc_metrics"."gold_cost_metrics" cm
        ON cm.type = pm.type
        AND cm.year = pm.year
        AND cm.month = pm.month
    WHERE cm.type = 'yellow'
        AND cm.year = '2016'
        AND cm.month = '6'

The following screenshot shows the results of this query.

Review dbt documentation

Complete the following steps to review your documentation:

  1. Generate the following documentation for the project:
    $ dbt docs generate --profiles-dir profiles/iceberg
    11:41:51  Running with dbt=1.7.1
    11:41:51  Registered adapter: glue=1.7.1
    11:41:51  Unable to do partial parsing because profile has changed
    11:41:52  Found 3 models, 1 source, 0 exposures, 0 metrics, 478 macros, 0 groups, 0 semantic models
    11:41:52  
    11:41:53  Concurrency: 1 threads (target='dev')
    11:41:53  
    11:41:53  Building catalog
    11:43:32  Catalog written to /Users/username/Documents/workspace/dbt_glue_demo/target/catalog.json

  2. Run the following command to open the documentation on your browser:
    $ dbt docs serve --profiles-dir profiles/iceberg

  3. In the navigation pane, choose gold_cost_metrics under dbt_glue_demo/models/gold.

You can see the detailed view of the model gold_cost_metrics, as shown in the following screenshot.

  1. To see the lineage graph, choose the circle icon at the bottom right.

Clean up

To clean up your environment, complete the following steps:

  1. Delete the database created by dbt:
    $ aws glue delete-database —name dbt_glue_demo_nyc_metrics

  2. Delete all generated data:
    $ aws s3 rm s3://aws-dbt-glue-datalake-$(aws sts get-caller-identity —query "Account" —output text)-us-east-1/ —recursive
    $ aws s3 rm s3://aws-athena-dbt-glue-query-results-$(aws sts get-caller-identity —query "Account" —output text)-us-east-1/ —recursive

  3. Delete the CloudFormation stack:
    $ aws cloudformation delete-stack —stack-name dbt-demo

Conclusion

This post demonstrated how the dbt-glue adapter helps your workload, and how you can build a modern data stack using dbt and AWS Glue using the dbt-glue adapter. You learned the end-to-end operations and data flow for data engineers to build and manage a data stack using dbt and the dbt-glue adapter. To report issues or request a feature enhancement, feel free to open an issue on GitHub.


About the authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team at Amazon Web Services. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Benjamin Menuet is a Senior Data Architect on the AWS Professional Services team at Amazon Web Services. He helps customers develop data and analytics solutions to accelerate their business outcomes. Outside of work, Benjamin is a trail runner and has finished some iconic races like the UTMB.

Akira Ajisaka is a Senior Software Development Engineer on the AWS Glue team. He likes open source software and distributed systems. In his spare time, he enjoys playing arcade games.

Kinshuk Pahare is a Principal Product Manager on the AWS Glue team at Amazon Web Services.

Jason Ganz is the manager of the Developer Experience (DX) team at dbt Labs

Introducing Apache Hudi support with AWS Glue crawlers

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/introducing-apache-hudi-support-with-aws-glue-crawlers/

Apache Hudi is an open table format that brings database and data warehouse capabilities to data lakes. Apache Hudi helps data engineers manage complex challenges, such as managing continuously evolving datasets with transactions while maintaining query performance. Data engineers use Apache Hudi for streaming workloads as well as to create efficient incremental data pipelines. Hudi provides tables, transactions, efficient upserts and deletes, advanced indexes, streaming ingestion services, data clustering and compaction optimizations, and concurrency control, all while keeping your data in open source file formats. Hudi’s advanced performance optimizations make analytical workloads faster with any of the popular query engines including Apache Spark, Presto, Trino, Hive, and so on.

Many AWS customers adopted Apache Hudi on their data lakes built on top of Amazon S3 using AWS Glue, a serverless data integration service that makes it easier to discover, prepare, move, and integrate data from multiple sources for analytics, machine learning (ML), and application development. AWS Glue Crawler is a component of AWS Glue, which allows you to create table metadata from data content automatically without requiring manual definition of the metadata.

AWS Glue crawlers now support Apache Hudi tables, simplifying the adoption of AWS Glue Data Catalog as the catalog for Hudi tables. One typical use case is to register Hudi tables, which does not have catalog table definition. Another typical use case is migration from other Hudi catalogs, such as Hive metastore. When migrating from other Hudi Catalogs, you can create and schedule an AWS Glue crawler and provide one or more Amazon S3 paths where the Hudi table files are located. You have the option to provide the maximum depth of Amazon S3 paths that the AWS Glue crawler can traverse. With each run, AWS Glue crawlers will extract schema and partition information and update AWS Glue Data Catalog with the schema and partition changes. AWS Glue crawlers updates the latest metadata file location in the AWS Glue Data Catalog that AWS analytical engines can directly use.

With this launch, you can create and schedule an AWS Glue crawler to register Hudi tables in AWS Glue Data Catalog. You can then provide one or multiple Amazon S3 paths where the Hudi tables are located. You have the option to provide the maximum depth of Amazon S3 paths that crawlers can traverse. With each crawler run, the crawler inspects each of the S3 paths and catalogs the schema information, such as new tables, deletes, and updates to schemas in the AWS Glue Data Catalog. Crawlers inspect partition information and add newly added partitions to AWS Glue Data Catalog. Crawlers also update the latest metadata file location in the AWS Glue Data Catalog that AWS analytical engines can directly use.

This post demonstrates how this new capability to crawl Hudi tables works.

How AWS Glue crawler works with Hudi tables

Hudi tables have two categories, with specific implications for each:

  • Copy on write (CoW) – Data is stored in a columnar format (Parquet), and each update creates a new version of files during a write.
  • Merge on read (MoR) – Data is stored using a combination of columnar (Parquet) and row-based (Avro) formats. Updates are logged to row-based delta files and are compacted as needed to create new versions of the columnar files.

With CoW datasets, each time there is an update to a record, the file that contains the record is rewritten with the updated values. With a MoR dataset, each time there is an update, Hudi writes only the row for the changed record. MoR is better suited for write- or change-heavy workloads with fewer reads. CoW is better suited for read-heavy workloads on data that change less frequently.

Hudi provides three query types for accessing the data:

  • Snapshot queries – Queries that see the latest snapshot of the table as of a given commit or compaction action. For MoR tables, snapshot queries expose the most recent state of the table by merging the base and delta files of the latest file slice at the time of the query.
  • Incremental queries – Queries only see new data written to the table, since a given commit or compaction. This effectively provides change streams to enable incremental data pipelines.
  • Read optimized queries – For MoR tables, queries see the latest data compacted. For CoW tables, queries see the latest data committed.

For copy-on-write tables, crawlers create a single table in the AWS Glue Data Catalog with the ReadOptimized Serde  org.apache.hudi.hadoop.HoodieParquetInputFormat.

For merge-on-read tables, crawlers create two tables in AWS Glue Data Catalog for the same table location:

  • A table with suffix _ro, which uses the ReadOptimized Serde org.apache.hudi.hadoop.HoodieParquetInputFormat
  • A table with suffix _rt, which uses the RealTime Serde allowing for Snapshot queries: org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat

During each crawl, for each Hudi path provided, crawlers make an Amazon S3 list API call, filter based on the .hoodie folders, and find the most recent metadata file under that Hudi table metadata folder.

Crawl a Hudi CoW table using AWS Glue crawler

In this section, let’s go through how to crawl a Hudi CoW using AWS Glue crawlers.

Prerequisites

Here are the prerequisites for this tutorial:

  1. Install and configure AWS Command Line Interface (AWS CLI).
  2. Create your S3 bucket if you do not have it.
  3. Create your IAM role for AWS Glue if you do not have it. You need s3:GetObject for s3://your_s3_bucket/data/sample_hudi_cow_table/.
  4. Run the following command to copy the sample Hudi table into your S3 bucket. (Replace your_s3_bucket with your S3 bucket name.)
$ aws s3 sync s3://aws-bigdata-blog/artifacts/hudi-crawler/product_cow/ s3://your_s3_bucket/data/sample_hudi_cow_table/

This instruction guides you to copy sample data, but you can create any Hudi tables easily using AWS Glue. Learn more in Introducing native support for Apache Hudi, Delta Lake, and Apache Iceberg on AWS Glue for Apache Spark, Part 2: AWS Glue Studio Visual Editor.

Create a Hudi crawler

In this instruction, create the crawler through the console. Complete the following steps to create a Hudi crawler:

  1. On the AWS Glue console, choose Crawlers.
  2. Choose Create crawler.
  3. For Name, enter hudi_cow_crawler. Choose Next.
  4. Under Data source configuration,  choose Add data source.
    1. For Data source, choose Hudi.
    2. For Include hudi table paths, enter s3://your_s3_bucket/data/sample_hudi_cow_table/. (Replace your_s3_bucket with your S3 bucket name.)
    3. Choose Add Hudi data source.
  5. Choose Next.
  6. For Existing IAM role, choose your IAM role, then choose Next.
  7. For Target database, choose Add database, then the Add database dialog appears. For Database name, enter hudi_crawler_blog, then choose Create. Choose Next.
  8. Choose Create crawler.

Now a new Hudi crawler has been successfully created. The crawler can be triggered to run through the console or through the SDK or AWS CLI using the StartCrawl API. It could also be scheduled through the console to trigger the crawlers at specific times. In this instruction, run the crawler through the console.

  1. Choose Run crawler.
  2. Wait for the crawler to complete.

After the crawler has run, you can see the Hudi table definition in the AWS Glue console:

You have successfully crawled the Hudi CoR table with data on Amazon S3 and created an AWS Glue Data Catalog table with the schema populated. After you create the table definition on AWS Glue Data Catalog, AWS analytics services such as Amazon Athena are able to query the Hudi table.

Complete the following steps to start queries on Athena:

  1. Open the Amazon Athena console.
  2. Run the following query.
SELECT * FROM "hudi_crawler_blog"."sample_hudi_cow_table" limit 10;

The following screenshot shows our output:

Crawl a Hudi MoR table using AWS Glue crawler with AWS Lake Formation data permissions

In this section, let’s go through how to crawl a Hudi MoR table using AWS Glue. This time, you use AWS Lake Formation data permission for crawling Amazon S3 data sources instead of IAM and Amazon S3 permission. This is optional, but it simplifies permission configurations when your data lake is managed by AWS Lake Formation permissions.

Prerequisites

Here are the prerequisites for this tutorial:

  1. Install and configure AWS Command Line Interface (AWS CLI).
  2. Create your S3 bucket if you do not have it.
  3. Create your IAM role for AWS Glue if you do not have it. You need lakeformation:GetDataAccess. But you do not need s3:GetObject for s3://your_s3_bucket/data/sample_hudi_mor_table/ because we use Lake Formation data permission to access the files.
  4. Run the following command to copy the sample Hudi table into your S3 bucket. (Replace your_s3_bucket with your S3 bucket name.)
$ aws s3 sync s3://aws-bigdata-blog/artifacts/hudi-crawler/product_mor/ s3://your_s3_bucket/data/sample_hudi_mor_table/

In addition to the processing steps, complete the following steps to update the AWS Glue Data Catalog settings to use Lake Formation permissions to control catalog resources instead of IAM-based access control:

  1. Sign in to the Lake Formation console as a data lake administrator.
    1. If this is the first time accessing the Lake Formation console, add yourself as the data lake administrator.
  2. Under Administration, choose Data catalog settings.
  3. For Default permissions for newly created databases and tables, deselect Use only IAM access control for new databases and Use only IAM access control for new tables in new databases.
  4. For Cross account version setting, choose Version 3.
  5. Choose Save.

The next step is to register your S3 bucket in Lake Formation data lake locations:

  1. On the Lake Formation console, choose Data lake locations, and choose Register location.
  2. For Amazon S3 path, enter s3://your_s3_bucket/. (Replace your_s3_bucket with your S3 bucket name.)
  3. Choose Register location.

Then, grant Glue crawler role access to data location so that the crawler can use Lake Formation permission to access the data and create tables in the location:

  1. On the Lake Formation console, choose Data locations and choose Grant.
  2. For IAM users and roles, select the IAM role you used for the crawler.
  3. For Storage location, enter s3://your_s3_bucket/data/. (Replace your_s3_bucket with your S3 bucket name.)
  4. Choose Grant.

Then, grant crawler role to create tables under the database hudi_crawler_blog:

  1. On the Lake Formation console, choose Data lake permissions.
  2. Choose Grant.
  3. For Principals, choose IAM users and roles, and choose the crawler role.
  4. For LF tags or catalog resources, choose Named data catalog resources.
  5. For Database, choose the database hudi_crawler_blog.
  6. Under Database permissions, select Create table.
  7. Choose Grant.

Create a Hudi crawler with Lake Formation data permissions

Complete the following steps to create a Hudi crawler:

  1. On the AWS Glue console, choose Crawlers.
  2. Choose Create crawler.
  3. For Name, enter hudi_mor_crawler. Choose Next.
  4. Under Data source configuration,  choose Add data source.
    1. For Data source, choose Hudi.
    2. For Include hudi table paths, enter s3://your_s3_bucket/data/sample_hudi_mor_table/. (Replace your_s3_bucket with your S3 bucket name.)
    3. Choose Add Hudi data source.
  5. Choose Next.
  6. For Existing IAM role, choose your IAM role.
  7. Under Lake Formation configuration – optional, select Use Lake Formation credentials for crawling S3 data source.
  8. Choose Next.
  9. For Target database, choose hudi_crawler_blog. Choose Next.
  10. Choose Create crawler.

Now a new Hudi crawler has been successfully created. The crawler uses Lake Formation credentials for crawling Amazon S3 files. Let’s run the new crawler:

  1. Choose Run crawler.
  2. Wait for the crawler to complete.

After the crawler has run, you can see two tables of the Hudi table definition in the AWS Glue console:

  • sample_hudi_mor_table_ro (read optimized table)
  • sample_hudi_mor_table_rt (real time table)

You registered the data lake bucket with Lake Formation and enabled crawling access to the data lake using Lake Formation permissions. You have successfully crawled the Hudi MoR table with data on Amazon S3 and created an AWS Glue Data Catalog table with the schema populated. After you create the table definitions on AWS Glue Data Catalog, AWS analytics services such as Amazon Athena are able to query the Hudi table.

Complete the following steps to start queries on Athena:

  1. Open the Amazon Athena console.
  2. Run the following query.
    SELECT * FROM "hudi_crawler_blog"."sample_hudi_mor_table_rt" limit 10;

The following screenshot shows our output:

  1. Run the following query.
    SELECT * FROM "hudi_crawler_blog"."sample_hudi_mor_table_ro" limit 10;

The following screenshot shows our output:

Fine-grained access control using AWS Lake Formation permissions

To apply fine-grained access control on the Hudi table, you can benefit from AWS Lake Formation permissions. Lake Formation permissions allow you to restrict access to specific tables, columns, or rows and then query the Hudi tables through Amazon Athena with fine-grained access control. Let’s configure Lake Formation permission for the Hudi MoR table.

Prerequisites

Here are the prerequisites for this tutorial:

  1. Complete the previous section Crawl a Hudi MoR table using AWS Glue crawler with AWS Lake Formation data permissions.
  2. Create an IAM user DataAnalyst, who has AWS managed policy AmazonAthenaFullAccess.

Create a Lake Formation data cell filter

Let’s first set up a filter for the MoR read optimized table.

  1. Sign in to the Lake Formation console as a data lake administrator.
  2. Choose Data filters.
  3. Choose Create new filter.
  4. For Data filter name, enter exclude_product_price.
  5. For Target database, choose the database hudi_crawler_blog.
  6. For Target table, choose the table sample_hudi_mor_table_ro.
  7. For Column-level access, select Exclude columns, and choose the column price.
  8. For Row filter expression, enter true.
  9. Choose Create filter.

Grant Lake Formation permissions to the DataAnalyst user

Complete the following steps to grant Lake Formation permission to the DataAnalyst user

  1. On the Lake Formation console, choose Data lake permissions.
  2. Choose Grant.
  3. For Principals, choose IAM users and roles, and choose the user DataAnalyst.
  4. For LF tags or catalog resources, choose Named data catalog resources.
  5. For Database, choose the database hudi_crawler_blog.
  6. For Table – optional, choose the table sample_hudi_mor_table_ro.
  7. For Data filters – optional, select exclude_product_price.
  8. For Data filter permissions, select Select.
  9. Choose Grant.

You granted Lake Formation permission on the database hudi_crawler_blog and the table sample_hudi_mor_table_ro, excluding the column price to the DataAnalyst user. Now let’s validate user access to the data using Athena.

  1. Sign in to the Athena console as a DataAnalyst user.
  2. On the query editor, run the following query:
    SELECT * FROM "hudi_crawler_blog"."sample_hudi_mor_table_ro" limit 10;

The following screenshot shows our output:

Now you validated that the column price is not shown, but the other columns product_id, product_name, update_at, and category are shown.

Clean up

To avoid unwanted charges to your AWS account, delete the following AWS resources:

  1. Delete AWS Glue database hudi_crawler_blog.
  2. Delete AWS Glue crawlers hudi_cow_crawler and hudi_mor_crawler.
  3. Delete Amazon S3 files under s3://your_s3_bucket/data/sample_hudi_cow_table/ and s3://your_s3_bucket/data/sample_hudi_mor_table/.

Conclusion

This post demonstrated how AWS Glue crawlers work for Hudi tables. With the support for Hudi crawler, you can quickly move to using AWS Glue Data Catalog as your primary Hudi table catalog. You can start building your serverless transactional data lake using Hudi on AWS using AWS Glue, AWS Glue Data Catalog, and Lake Formation fine-grained access controls for tables and formats supported by AWS analytical engines.


About the authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Kyle Duong is a Software Development Engineer on the AWS Glue and Lake Formation team. He is passionate about building big data technologies and distributed systems.

Sandeep Adwankar is a Senior Technical Product Manager at AWS. Based in the California Bay Area, he works with customers around the globe to translate business and technical requirements into products that enable customers to improve how they manage, secure, and access data.

Enhance monitoring and debugging for AWS Glue jobs using new job observability metrics

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/enhance-monitoring-and-debugging-for-aws-glue-jobs-using-new-job-observability-metrics/

For any modern data-driven company, having smooth data integration pipelines is crucial. These pipelines pull data from various sources, transform it, and load it into destination systems for analytics and reporting. When running properly, it provides timely and trustworthy information. However, without vigilance, the varying data volumes, characteristics, and application behavior can cause data pipelines to become inefficient and problematic. Performance can slow down or pipelines can become unreliable. Undetected errors result in bad data and impact downstream analysis. That’s why robust monitoring and troubleshooting for data pipelines is essential across the following four areas:

  • Reliability
  • Performance
  • Throughput
  • Resource utilization

Together, these four aspects of monitoring provide end-to-end visibility and control over a data pipeline and its operations.

Today we are pleased to announce a new class of Amazon CloudWatch metrics reported with your pipelines built on top of AWS Glue for Apache Spark jobs. The new metrics provide aggregate and fine-grained insights into the health and operations of your job runs and the data being processed. In addition to providing insightful dashboards, the metrics provide classification of errors, which helps with root cause analysis of performance bottlenecks and error diagnosis. With this analysis, you can evaluate and apply the recommended fixes and best practices for architecting your jobs and pipelines. As a result, you gain the benefit of higher availability, better performance, and lower cost for your AWS Glue for Apache Spark workload.

This post demonstrates how the new enhanced metrics help you monitor and debug AWS Glue jobs.

Enable the new metrics

The new metrics can be configured through the job parameter enable-observability-metrics.

The new metrics are enabled by default on the AWS Glue Studio console. To configure the metrics on the AWS Glue Studio console, complete the following steps:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Under Your jobs, choose your job.
  3. On the Job details tab, expand Advanced properties.
  4. Under Job observability metrics, select Enable the creation of additional observability CloudWatch metrics when this job runs.

To enable the new metrics in the AWS Glue CreateJob and StartJobRun APIs, set the following parameters in the DefaultArguments property:

  • Key--enable-observability-metrics
  • Valuetrue

To enable the new metrics in the AWS Command Line Interface (AWS CLI), set the same job parameters in the --default-arguments argument.

Use case

A typical workload for AWS Glue for Apache Spark jobs is to load data from a relational database to a data lake with SQL-based transformations. The following is a visual representation of an example job where the number of workers is 10.

When the example job ran, the workerUtilization metrics showed the following trend.

Note that workerUtilization showed values between 0.20 (20%) and 0.40 (40%) for the entire duration. This typically happens when the job capacity is over-provisioned and many Spark executors were idle, resulting in unnecessary cost. To improve resource utilization efficiency, it’s a good idea to enable AWS Glue Auto Scaling. The following screenshot shows the same workerUtilization metrics graph when AWS Glue Auto Scaling is enabled for the same job.

workerUtilization showed 1.0 in the beginning because of AWS Glue Auto Scaling and it trended between 0.75 (75%) and 1.0 (100%) based on the workload requirements.

Query and visualize metrics in CloudWatch

Complete the following steps to query and visualize metrics on the CloudWatch console:

  1. On the CloudWatch console, choose All metrics in the navigation pane.
  2. Under Custom namespaces, choose Glue.
  3. Choose Observability Metrics (or Observability Metrics Per Source, or Observability Metrics Per Sink).
  4. Search for and select the specific metric name, job name, job run ID, and observability group.
  5. On the Graphed metrics tab, configure your preferred statistic, period, and so on.

Query metrics using the AWS CLI

Complete the following steps for querying using the AWS CLI (for this example, we query the worker utilization metric):

  1. Create a metric definition JSON file (provide your AWS Glue job name and job run ID):
    $ cat multiplequeries.json
    [
      {
        "Id": "avgWorkerUtil_0",
        "MetricStat" : {
          "Metric" : {
            "Namespace": "Glue",
            "MetricName": "glue.driver.workerUtilization",
            "Dimensions": [
              {
                  "Name": "JobName",
                  "Value": "<your-Glue-job-name-A>"
              },
              {
                "Name": "JobRunId",
                "Value": "<your-Glue-job-run-id-A>"
              },
              {
                "Name": "Type",
                "Value": "gauge"
              },
              {
                "Name": "ObservabilityGroup",
                "Value": "resource_utilization"
              }
            ]
          },
          "Period": 1800,
          "Stat": "Minimum",
          "Unit": "None"
        }
      },
      {
          "Id": "avgWorkerUtil_1",
          "MetricStat" : {
          "Metric" : {
            "Namespace": "Glue",
            "MetricName": "glue.driver.workerUtilization",
            "Dimensions": [
               {
                 "Name": "JobName",
                 "Value": "<your-Glue-job-name-B>"
               },
               {
                 "Name": "JobRunId",
                 "Value": "<your-Glue-job-run-id-B>"
               },
               {
                 "Name": "Type",
                 "Value": "gauge"
               },
               {
                 "Name": "ObservabilityGroup",
                 "Value": "resource_utilization"
               }
            ]
          },
          "Period": 1800,
          "Stat": "Minimum",
          "Unit": "None"
        }
      }
    ]

  2. Run the get-metric-data command:
    $ aws cloudwatch get-metric-data --metric-data-queries file://multiplequeries.json \
         --start-time '2023-10-28T18:20' \
         --end-time '2023-10-28T19:10'  \
         --region us-east-1
    {
        "MetricDataResults": [
          {
             "Id": "avgWorkerUtil_0",
             "Label": "<your label A>",
             "Timestamps": [
                   "2023-10-28T18:20:00+00:00"
             ], 
             "Values": [
                   0.06718750000000001
             ],
             "StatusCode": "Complete"
          },
          {
             "Id": "avgWorkerUtil_1",
             "Label": "<your label B>",
             "Timestamps": [
                  "2023-10-28T18:20:00+00:00"
              ],
              "Values": [
                  0.5959183673469387
              ],
              "StatusCode": "Complete"
           }
        ],
        "Messages": []
    }

Create a CloudWatch alarm

You can create static threshold-based alarms for the different metrics. For instructions, refer to Create a CloudWatch alarm based on a static threshold.

For example, for skewness, you can set an alarm for skewness.stage with a threshold of 1.0, and skewness.job with a threshold of 0.5. This threshold is just a recommendation; you can adjust the threshold based on your specific use case (for example, some jobs are expected to be skewed and it’s not an issue to be alarmed for). Our recommendation is to evaluate the metric values of your job runs for some time before qualifying the anomalous values and configuring the thresholds to alarm.

Other enhanced metrics

For a full list of other enhanced metrics available with AWS Glue jobs, refer to Monitoring with AWS Glue Observability metrics. These metrics allow you to capture the operational insights of your jobs, such as resource utilization (memory and disk), normalized error classes such as compilation and syntax, user or service errors, and throughput for each source or sink (records, files, partitions, and bytes read or written).

Job observability dashboards

You can further simplify observability for your AWS Glue jobs using dashboards for the insight metrics that enable real-time monitoring using Amazon Managed Grafana, and enable visualization and analysis of trends with Amazon QuickSight.

Conclusion

This post demonstrated how the new enhanced CloudWatch metrics help you monitor and debug AWS Glue jobs. With these enhanced metrics, you can more easily identify and troubleshoot issues in real time. This results in AWS Glue jobs that experience higher uptime, faster processing, and reduced expenditures. The end benefit for you is more effective and optimized AWS Glue for Apache Spark workloads. The metrics are available in all AWS Glue supported Regions. Check it out!


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his new road bike.

Shenoda Guirguis is a Senior Software Development Engineer on the AWS Glue team. His passion is in building scalable and distributed Data Infrastructure/Processing Systems. When he gets a chance, Shenoda enjoys reading and playing soccer.

Sean Ma is a Principal Product Manager on the AWS Glue team. He has an 18+ year track record of innovating and delivering enterprise products that unlock the power of data for users. Outside of work, Sean enjoys scuba diving and college football.

Mohit Saxena is a Senior Software Development Manager on the AWS Glue team. His team focuses on building distributed systems to enable customers with interactive and simple to use interfaces to efficiently manage and transform petabytes of data seamlessly across data lakes on Amazon S3, databases and data-warehouses on cloud.

Introducing AWS Glue serverless Spark UI for better monitoring and troubleshooting

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/introducing-aws-glue-studio-serverless-spark-ui-for-better-monitoring-and-troubleshooting/

In AWS, hundreds of thousands of customers use AWS Glue, a serverless data integration service, to discover, combine, and prepare data for analytics and machine learning. When you have complex datasets and demanding Apache Spark workloads, you may experience performance bottlenecks or errors during Spark job runs. Troubleshooting these issues can be difficult and delay getting jobs working in production. Customers often use Apache Spark Web UI, a popular debugging tool that is part of open source Apache Spark, to help fix problems and optimize job performance. AWS Glue supports Spark UI in two different ways, but you need to set it up yourself. This requires time and effort spent managing networking and EC2 instances, or through trial-and error with Docker containers.

Today, we are pleased to announce serverless Spark UI built into the AWS Glue console. You can now use Spark UI easily as it’s a built-in component of the AWS Glue console, enabling you to access it with a single click when examining the details of any given job run. There’s no infrastructure setup or teardown required. AWS Glue serverless Spark UI is a fully-managed serverless offering and generally starts up in a matter of seconds. Serverless Spark UI makes it significantly faster and easier to get jobs working in production because you have ready access to low level details for your job runs.

This post describes how the AWS Glue serverless Spark UI helps you to monitor and troubleshoot your AWS Glue job runs.

Getting started with serverless Spark UI

You can access the serverless Spark UI for a given AWS Glue job run by navigating from your Job’s page in AWS Glue console.

  1. On the AWS Glue console, choose ETL jobs.
  2. Choose your job.
  3. Choose the Runs tab.
  4. Select the job run you want to investigate, then choose Spark UI.

The Spark UI will display in the lower pane, as shown in the following screen capture:

Alternatively, you can get to the serverless Spark UI for a specific job run by navigating from Job run monitoring in AWS Glue.

  1. On the AWS Glue console, choose job run monitoring under ETL jobs.
  2. Select your job run, and choose View run details.

Scroll down to the bottom to view the Spark UI for the job run.

Prerequisites

Complete the following prerequisite steps:

  1. Enable Spark UI event logs for your job runs. It is enabled by default on Glue console and once enabled, Spark event log files will be created during the job run, and stored in your S3 bucket. The serverless Spark UI parses a Spark event log file generated in your S3 bucket to visualize detailed information for both running and completed job runs. A progress bar shows the percentage to completion, with a typical parsing time of less than a minute. Once logs are parsed, you can
  2. When logs are parsed, you can use the built-in Spark UI to debug, troubleshoot, and optimize your jobs.

For more information about Apache Spark UI, refer to Web UI in Apache Spark.

Monitor and Troubleshoot with Serverless Spark UI

A typical workload for AWS Glue for Apache Spark jobs is loading data from relational databases to S3-based data lakes. This section demonstrates how to monitor and troubleshoot an example job run for the above workload with serverless Spark UI. The sample job reads data from MySQL database and writes to S3 in Parquet format. The source table has approximately 70 million records.

The following screen capture shows a sample visual job authored in AWS Glue Studio visual editor. In this example, the source MySQL table has already been registered in the AWS Glue Data Catalog in advance. It can be registered through AWS Glue crawler or AWS Glue catalog API. For more information, refer to Data Catalog and crawlers in AWS Glue.

Now it’s time to run the job! The first job run finished in 30 minutes and 10 seconds as shown:

Let’s use Spark UI to optimize the performance of this job run. Open Spark UI tab in the Job runs page. When you drill down to Stages and view the Duration column, you will notice that Stage Id=0 spent 27.41 minutes to run the job, and the stage had only one Spark task in the Tasks:Succeeded/Total column. That means there was no parallelism to load data from the source MySQL database.

To optimize the data load, introduce parameters called hashfield and hashpartitions to the source table definition. For more information, refer to Reading from JDBC tables in parallel. Continuing to the Glue Catalog table, add two properties: hashfield=emp_no, and hashpartitions=18 in Table properties.

This means the new job runs reading parallelize data load from the source MySQL table.

Let’s try running the same job again! This time, the job run finished in 9 minutes and 9 seconds. It saved 21 minutes from the previous job run.

As a best practice, view the Spark UI and compare them before and after the optimization. Drilling down to Completed stages, you will notice that there was one stage and 18 tasks instead of one task.

In the first job run, AWS Glue automatically shuffled data across multiple executors before writing to destination because there were too few tasks. On the other hand, in the second job run, there was only one stage because there was no need to do extra shuffling, and there were 18 tasks for loading data in parallel from source MySQL database.

Considerations

Keep in mind the following considerations:

  • Serverless Spark UI is supported in AWS Glue 3.0 and later
  • Serverless Spark UI will be available for jobs that ran after November 20, 2023, due to a change in how AWS Glue emits and stores Spark logs
  • Serverless Spark UI can visualize Spark event logs which is up to 1 GB in size
  • There is no limit in retention because serverless Spark UI scans the Spark event log files on your S3 bucket
  • Serverless Spark UI is not available for Spark event logs stored in S3 bucket that can only be accessed by your VPC

Conclusion

This post described how the AWS Glue serverless Spark UI helps you monitor and troubleshoot your AWS Glue jobs. By providing instant access to the Spark UI directly within the AWS Management Console, you can now inspect the low-level details of job runs to identify and resolve issues. With the serverless Spark UI, there is no infrastructure to manage—the UI spins up automatically for each job run and tears down when no longer needed. This streamlined experience saves you time and effort compared to manually launching Spark UIs yourself.

Give the serverless Spark UI a try today. We think you’ll find it invaluable for optimizing performance and quickly troubleshooting errors. We look forward to hearing your feedback as we continue improving the AWS Glue console experience.


About the authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling on his road bike.

Alexandra Tello is a Senior Front End Engineer with the AWS Glue team in New York City. She is a passionate advocate for usability and accessibility. In her free time, she’s an espresso enthusiast and enjoys building mechanical keyboards.

Matt Sampson is a Software Development Manager on the AWS Glue team. He loves working with his other Glue team members to make services that our customers benefit from. Outside of work, he can be found fishing and maybe singing karaoke.

Matt Su is a Senior Product Manager on the AWS Glue team. He enjoys helping customers uncover insights and make better decisions using their data with AWS Analytic services. In his spare time, he enjoys skiing and gardening.

Load data incrementally from transactional data lakes to data warehouses

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/load-data-incrementally-from-transactional-data-lakes-to-data-warehouses/

Data lakes and data warehouses are two of the most important data storage and management technologies in a modern data architecture. Data lakes store all of an organization’s data, regardless of its format or structure. An open table format such as Apache Hudi, Delta Lake, or Apache Iceberg is widely used to build data lakes on Amazon Simple Storage Service (Amazon S3) in a transactionally consistent manner for use cases including record-level upserts and deletes, change data capture (CDC), time travel queries, and more. Data warehouses, on the other hand, store data that has been cleaned, organized, and structured for analysis. Depending on your use case, it’s common to have a copy of the data between your data lake and data warehouse to support different access patterns.

When the data becomes very large and unwieldy, it can be difficult to keep the copy of the data between data lakes and data warehouses in sync and up to date in an efficient manner.

In this post, we discuss different architecture patterns to keep data in sync and up to date between data lakes built on open table formats and data warehouses such as Amazon Redshift. We also discuss the benefits of incremental loading and the techniques for implementing the architecture using AWS Glue, which is a serverless, scalable data integration service that helps you discover, prepare, move, and integrate data from multiple sources. Various data stores are supported in AWS Glue; for example, AWS Glue 4.0 supports an enhanced Amazon Redshift connector to read from and write to Amazon Redshift, and also supports a built-in Snowflake connector to read from and write to Snowflake. Moreover, Apache Hudi, Delta Lake, and Apache Iceberg are natively supported in AWS Glue.

Architecture patterns

Generally, there are three major architecture patterns to keep your copy of data between data lakes and data warehouses in sync and up to date:

  • Dual writes
  • Incremental queries
  • Change data capture

Let’s discuss each of the architecture patterns and the techniques to achieve them.

Dual writes

When initially ingesting data from its raw source into the data lake and data warehouse, a single batch process is configured to write to both. We call this pattern dual writes. Although this architecture pattern (see the following diagram) is straightforward and easy to implement, it can become error-prone because there are two separate transactions threads, and each can have its own errors, causing inconsistencies between the data lake and data warehouse when a write fails in one but not both.

Incremental queries

An incremental query architectural pattern is designed to ingest data first into the data lake with an open table format, and then load the newly written data from the data lake into the data warehouse. Open table formats such as Apache Hudi and Apache Iceberg support incremental queries based on their respective transaction logs. You can capture records inserted or updated with the incremental queries, and then merge the captured records into the destination data warehouses.

Apache Hudi supports incremental query, which allows you to retrieve all records written during specific time range.

Delta Lake doesn’t have a specific concept for incremental queries. It’s covered in a change data feed, which is explained in the next section.

Apache Iceberg supports incremental read, which allows you to read appended data incrementally. As of this writing, Iceberg gets incremental data only from the append operation; other operations such as replace, overwrite, and delete aren’t supported by incremental read.

For merging the records into Amazon Redshift, you can use the MERGE SQL command, which was released in April 2023. AWS Glue supports the Redshift MERGE SQL command within its data integration jobs. To learn more, refer to Exploring new ETL and ELT capabilities for Amazon Redshift from the AWS Glue Studio visual editor.

Incremental queries are useful to capture changed records; however, incremental queries can’t handle the deletes and just send the latest version of each record. If you need to handle delete operations in the source data lake, you will need to use a CDC-based approach.

The following diagram illustrates an incremental query architectural pattern.

Change data capture

Change data capture (CDC) is a well-known technique to capture all mutating operations in a source database system and relay those operations to another system. CDC keeps all the intermediate changes, including the deletes. With this architecture pattern, you capture not only inserts and updates, but also deletes committed to the data lake, and then merge those captured changes into the data warehouses.

Apache Hudi 0.13.0 or later supports change data capture as an experimental feature, which is only available for Copy-on-Write (CoW) tables. Merge-on-Read tables (MoR) do not support CDC as of this writing.

Delta Lake 2.0.0 or later supports a change data feed, which allows Delta tables to track record-level changes between table versions.

Apache Iceberg 1.2.1 or later supports change data capture through its create_changelog_view procedure. When you run this procedure, a new view that contains the changes from a given table is created.

The following diagram illustrates a CDC architecture.

Example scenario

To demonstrate the end-to-end experience, this post uses the Global Historical Climatology Network Daily (GHCN-D) dataset. The data is publicly accessible through an S3 bucket. For more information, see the Registry of Open Data on AWS. You can also learn more in Visualize over 200 years of global climate data using Amazon Athena and Amazon QuickSight.

The Amazon S3 location s3://noaa-ghcn-pds/csv/by_year/ has all of the observations from 1763 to the present organized in CSV files, one file for each year. The following block shows an example of what the records look like:

ID,DATE,ELEMENT,DATA_VALUE,M_FLAG,Q_FLAG,S_FLAG,OBS_TIME
AE000041196,20220101,TAVG,204,H,,S,
AEM00041194,20220101,TAVG,211,H,,S,
AEM00041217,20220101,TAVG,209,H,,S,
AEM00041218,20220101,TAVG,207,H,,S,
AE000041196,20220102,TAVG,226,H,,S,
...
AE000041196,20221231,TMAX,243,,,S,
AE000041196,20221231,PRCP,0,D,,S,
AE000041196,20221231,TAVG,202,H,,S,

The records have fields including ID, DATE, ELEMENT, and more. Each combination of ID, DATE, and ELEMENT represents a unique record in this dataset. For example, the record with ID as AE000041196, ELEMENT as TAVG, and DATE as 20220101 is unique. We use this dataset in the following examples and simulate record-level updates and deletes as sample operations.

Prerequisites

To continue with the examples in this post, you need to create (or already have) the following AWS resources:

For the first tutorial (loading from Apache Hudi to Amazon Redshift), you also need the following:

For the second tutorial (loading from Delta Lake to Snowflake), you need the following:

  • A Snowflake account.
  • An AWS Glue connection named snowflake for Snowflake access. For more information, refer to Configuring Snowflake connections.
  • An AWS Secrets Manager secret named snowflake_credentials with the following key pairs:
    • Key sfUser with value <Your Snowflake username>
    • Key sfPassword with value <Your Snowflake password>

These tutorials are inter-changeable, so you can easily apply the same pattern for any combination of source and destination, for example, Hudi to Snowflake, or Delta to Amazon Redshift.

Load data incrementally from Apache Hudi table to Amazon Redshift using a Hudi incremental query

This tutorial uses Hudi incremental queries to load data from a Hudi table and then merge the changes to Amazon Redshift.

Ingest initial data to a Hudi table

Complete the following steps:

  1. Open AWS Glue Studio.
  2. Choose ETL jobs.
  3. Choose Visual with a source and target.
  4. For Source and Target, choose Amazon S3, then choose Create.

A new visual job configuration appears. The next step is to configure the data source to read an example dataset.

  1. Name this new job hudi-data-ingestion.
  2. Under Visual, choose Data source – S3 bucket.
  3. Under Node properties, for S3 source type, select S3 location.
  4. For S3 URL, enter s3://noaa-ghcn-pds/csv/by_year/2022.csv.

The data source is configured. The next step is to configure the data target to ingest data in Apache Hudi on your S3 bucket.

  1. Choose Data target – S3 bucket.
  2. Under Data target properties – S3, for Format, choose Apache Hudi.
  3. For Hudi Table Name, enter ghcn_hudi.
  4. For Hudi Storage Type, choose Copy on write.
  5. For Hudi Write Operation, choose Upsert.
  6. For Hudi Record Key Fields, choose ID.
  7. For Hudi Precombine Key Field, choose DATE.
  8. For Compression Type, choose GZIP.
  9. For S3 Target location, enter s3://<Your S3 bucket name>/<Your S3 bucket prefix>/hudi_incremental/ghcn/. (Provide your S3 bucket name and prefix.)
  10. For Data Catalog update options, select Do not update the Data Catalog.

Now your data integration job is authored in the visual editor completely. Let’s add one remaining setting about the IAM role, then run the job.

  1. Under Job details, for IAM Role, choose your IAM role.
  2. Choose Save, then choose Run.

You can track the progress on the Runs tab. It finishes in several minutes.

Load data from the Hudi table to a Redshift table

In this step, we assume that the files are updated with new records every day, and want to store only the latest record per the primary key (ID and ELEMENT) to make the latest snapshot data queryable. One typical approach is to do an INSERT for all the historical data, and calculate the latest records in queries; however, this can introduce additional overhead in all the queries. When you want to analyze only the latest records, it’s better to do an UPSERT (update and insert) based on the primary key and DATE field rather than just an INSERT in order to avoid duplicates and maintain a single updated row of data.

Complete the following steps to load data from the Hudi table to a Redshift table:

  1. Download the file hudi2redshift-incremental-load.ipynb.
  2. In AWS Glue Studio, choose Jupyter Notebook, then choose Create.
  3. For Job name, enter hudi-ghcn-incremental-load-notebook.
  4. For IAM Role, choose your IAM role.
  5. Choose Start notebook.

Wait for the notebook to be ready.

  1. Run the first cell to set up an AWS Glue interactive session.
  2. Replace the parameters with yours and run the cell under Configure your resource.
  3. Run the cell under Initialize SparkSession and GlueContext.
  4. Run the cell under Determine target time range for incremental query.
  5. Run the cells under Run query to load data updated during a given timeframe.
  6. Run the cells under Merge changes into destination table.

You can see the exact query immediately run right after ingesting a temp table into the Redshift table.

  1. Run the cell under Update the last query end time.

Validate initial records in the Redshift table

Complete the following steps to validate the initial records in the Redshift table:

  1. On the Amazon Redshift console, open Query Editor v2.
  2. Run the following query:
    SELECT * FROM "dev"."public"."ghcn" WHERE ID = 'AE000041196'

The query returns the following result set.

The original source file 2022.csv has historical records for record ID='AE000041196' from 20220101 to 20221231; however, the query result shows only four records, one record per ELEMENT at the latest snapshot of the day 20221230 or 20221231. Because we used the UPSERT write option when writing data, we configured the ID field as a Hudi record key field, the DATE field as a Hudi precombine field, and the ELEMENT field as partition key field. When two records have the same key value, Hudi picks the one with the largest value for the precombine field. When the job ingested data, it compared all the values in the DATE field for each pair of ID and ELEMENT, and then picked the record with the largest value in the DATE field. We use the current state of this table as an initial state.

Ingest updates to a Hudi table

Complete the following steps to simulating ingesting more records to the Hudi table:

  1. On AWS Glue Studio, choose the job hudi-data-ingestion.
  2. On the Data target – S3 bucket node, change the S3 location from s3://noaa-ghcn-pds/csv/by_year/2022.csv to s3://noaa-ghcn-pds/csv/by_year/2023.csv.
  3. Run the job.

Because this job uses the DATE field as a Hudi precombine field, the records included in the new source file have been upserted into the Hudi table.

Load data incrementally from the Hudi table to the Redshift table

Complete the following steps to load the ingested records incrementally to the Redshift table:

  1. On AWS Glue Studio, choose the job hudi-ghcn-incremental-load-notebook.
  2. Run all the cells again.

In the cells under Run query, you will notice that the records shown this time have DATE in 2023. Only newly ingested records are shown here.

In the cells under Merge changes into destination table, the newly ingested records are merged into the Redshift table. The generated MERGE query statement in the notebook is as follows:

MERGE INTO public.ghcn USING public.ghcn_tmp ON 
    public.ghcn.ID = public.ghcn_tmp.ID AND 
    public.ghcn.ELEMENT = public.ghcn_tmp.ELEMENT
WHEN MATCHED THEN UPDATE SET 
    _hoodie_commit_time = public.ghcn_tmp._hoodie_commit_time,
    _hoodie_commit_seqno = public.ghcn_tmp._hoodie_commit_seqno,
    _hoodie_record_key = public.ghcn_tmp._hoodie_record_key,
    _hoodie_partition_path = public.ghcn_tmp._hoodie_partition_path,
    _hoodie_file_name = public.ghcn_tmp._hoodie_file_name, 
    ID = public.ghcn_tmp.ID, 
    DATE = public.ghcn_tmp.DATE, 
    ELEMENT = public.ghcn_tmp.ELEMENT, 
    DATA_VALUE = public.ghcn_tmp.DATA_VALUE, 
    M_FLAG = public.ghcn_tmp.M_FLAG, 
    Q_FLAG = public.ghcn_tmp.Q_FLAG, 
    S_FLAG = public.ghcn_tmp.S_FLAG, 
    OBS_TIME = public.ghcn_tmp.OBS_TIME 
WHEN NOT MATCHED THEN INSERT VALUES (
    public.ghcn_tmp._hoodie_commit_time, 
    public.ghcn_tmp._hoodie_commit_seqno, 
    public.ghcn_tmp._hoodie_record_key, 
    public.ghcn_tmp._hoodie_partition_path, 
    public.ghcn_tmp._hoodie_file_name, 
    public.ghcn_tmp.ID, 
    public.ghcn_tmp.DATE, 
    public.ghcn_tmp.ELEMENT, 
    public.ghcn_tmp.DATA_VALUE, 
    public.ghcn_tmp.M_FLAG, 
    public.ghcn_tmp.Q_FLAG, 
    public.ghcn_tmp.S_FLAG, 
    public.ghcn_tmp.OBS_TIME
);

The next step is to verify the result on the Redshift side.

Validate updated records in the Redshift table

Complete the following steps to validate the updated records in the Redshift table:

  1. On the Amazon Redshift console, open Query Editor v2.
  2. Run the following query:
    SELECT * FROM "dev"."public"."ghcn" WHERE ID = 'AE000041196'

The query returns the following result set.

Now you see that the four records have been updated with the new records in 2023. If you have further future records, this approach works well to upsert new records based on the primary keys.

Load data incrementally from a Delta Lake table to Snowflake using a Delta change data feed

This tutorial uses a Delta change data feed to load data from a Delta table, and then merge the changes to Snowflake.

Ingest initial data to a Delta table

Complete the following steps:

  1. Open AWS Glue Studio.
  2. Choose ETL jobs.
  3. Choose Visual with a source and target.
  4. For Source and Target, choose Amazon S3, then choose Create.

A new visual job configuration appears. The next step is to configure the data source to read an example dataset.

  1. Name this new job delta-data-ingestion.
  2. Under Visual, choose Data source – S3 bucket.
  3. Under Node properties, for S3 source type, select S3 location.
  4. For S3 URL, enter s3://noaa-ghcn-pds/csv/by_year/2022.csv.

The data source is configured. The next step is to configure the data target to ingest data in Apache Hudi on your S3 bucket.

  1. Choose Data target – S3 bucket.
  2. Under Data target properties – S3, for Format, choose Delta Lake.
  3. For Compression Type, choose Snappy.
  4. For S3 Target location, enter s3://<Your S3 bucket name>/<Your S3 bucket prefix>/delta_incremental/ghcn/. (Provide your S3 bucket name and prefix.)
  5. For Data Catalog update options, select Do not update the Data Catalog.

Now your data integration job is authored in the visual editor completely. Let’s add an additional detail about the IAM role and job parameters, and then run the job.

  1. Under Job details, for IAM Role, choose your IAM role.
  2. Under Job parameters, for Key, enter --conf and for Value, enter spark.databricks.delta.properties.defaults.enableChangeDataFeed=true.
  3. Choose Save, then choose Run.

Load data from the Delta table to a Snowflake table

Complete the following steps to load data from the Delta table to a Snowflake table:

  1. Download the file delta2snowflake-incremental-load.ipynb.
  2. On AWS Glue Studio, choose Jupyter Notebook, then choose Create.
  3. For Job name, enter delta-ghcn-incremental-load-notebook.
  4. For IAM Role, choose your IAM role.
  5. Choose Start notebook.

Wait for the notebook to be ready.

  1. Run the first cell to start an AWS Glue interactive session.
  2. Replace the parameters with yours and run the cell under Configure your resource.
  3. Run the cell under Initialize SparkSession and GlueContext.
  4. Run the cell under Determine target time range for CDC.
  5. Run the cells under Run query to load data updated during a given timeframe.
  6. Run the cells under Merge changes into destination table.

You can see the exact query immediately run right after ingesting a temp table in the Snowflake table.

  1. Run the cell under Update the last query end time.

Validate initial records in the Snowflake warehouse

Run the following query in Snowflake:

SELECT * FROM "dev"."public"."ghcn" WHERE ID = 'AE000041196'

The query should return the following result set:

There are three records returned in this query.

Update and delete a record on the Delta table

Complete the following steps to update and delete a record on the Delta table as sample operations:

  1. Return to the AWS Glue notebook job.
  2. Run the cells under Update the record and Delete the record.

Load data incrementally from the Delta table to the Snowflake table

Complete the following steps to load the ingested records incrementally to the Redshift table:

  1. On AWS Glue Studio, choose the job delta-ghcn-incremental-load-notebook.
  2. Run all the cells again.

When you run the cells under Run query, you will notice that there are only three records, which correspond to the update and delete operation performed in the previous step.

In the cells under Merge changes into destination table, the changes are merged into the Snowflake table. The generated MERGE query statement in the notebook is as follows:

MERGE INTO public.ghcn USING public.ghcn_tmp ON 
    public.ghcn.ID = public.ghcn_tmp.ID AND 
    public.ghcn.DATE = public.ghcn_tmp.DATE AND 
    public.ghcn.ELEMENT = public.ghcn_tmp.ELEMENT 
WHEN MATCHED AND public.ghcn_tmp._change_type = 'update_postimage' THEN UPDATE SET 
    ID = public.ghcn_tmp.ID, 
    DATE = public.ghcn_tmp.DATE, 
    ELEMENT = public.ghcn_tmp.ELEMENT, 
    DATA_VALUE = public.ghcn_tmp.DATA_VALUE, 
    M_FLAG = public.ghcn_tmp.M_FLAG, 
    Q_FLAG = public.ghcn_tmp.Q_FLAG, 
    S_FLAG = public.ghcn_tmp.S_FLAG, 
    OBS_TIME = public.ghcn_tmp.OBS_TIME, 
    _change_type = public.ghcn_tmp._change_type, 
    _commit_version = public.ghcn_tmp._commit_version, 
    _commit_timestamp = public.ghcn_tmp._commit_timestamp 
WHEN MATCHED AND public.ghcn_tmp._change_type = 'delete' THEN DELETE 
WHEN NOT MATCHED THEN INSERT VALUES (
    public.ghcn_tmp.ID, 
    public.ghcn_tmp.DATE, 
    public.ghcn_tmp.ELEMENT, 
    public.ghcn_tmp.DATA_VALUE, 
    public.ghcn_tmp.M_FLAG, 
    public.ghcn_tmp.Q_FLAG, 
    public.ghcn_tmp.S_FLAG, 
    public.ghcn_tmp.OBS_TIME, 
    public.ghcn_tmp._change_type, 
    public.ghcn_tmp._commit_version, 
    public.ghcn_tmp._commit_timestamp
);

The next step is to verify the result on the Snowflake side.

Validate updated records in the Snowflake table

Complete the following steps to validate the updated and deleted records in the Snowflake table:

  1. On Snowflake, run the following query:
    SELECT * FROM ghcn WHERE ID = 'AE000041196' AND DATE = '20221231'

The query returns the following result set:

You will notice that the query only returns two records. The value of DATA_VALUE of the record ELEMENT=PRCP has been updated from 0 to 12345. The record ELEMENT=TMAX has been deleted. This means that your update and delete operations on the source Delta table have been successfully replicated to the target Snowflake table.

Clean up

Complete the following steps to clean up your resources:

  1. Delete the following AWS Glue jobs:
    • hudi-data-ingestion
    • hudi-ghcn-incremental-load-notebook
    • delta-data-ingestion
    • delta-ghcn-incremental-load-notebook
  2. Clean up your S3 bucket.
  3. If needed, delete the Redshift cluster or the Redshift Serverless workgroup.

Conclusion

This post discussed architecture patterns to keep a copy of your data between data lakes using open table formats and data warehouses in sync and up to date. We also discussed the benefits of incremental loading and the techniques for achieving the use case using AWS Glue. We covered two use cases: incremental load from a Hudi table to Amazon Redshift, and from a Delta table to Snowflake.


About the author

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

End-to-end development lifecycle for data engineers to build a data integration pipeline using AWS Glue

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/end-to-end-development-lifecycle-for-data-engineers-to-build-a-data-integration-pipeline-using-aws-glue/

Data is a key enabler for your business. Many AWS customers have integrated their data across multiple data sources using AWS Glue, a serverless data integration service, in order to make data-driven business decisions. To grow the power of data at scale for the long term, it’s highly recommended to design an end-to-end development lifecycle for your data integration pipelines. The following are common asks from our customers:

  • Is it possible to develop and test AWS Glue data integration jobs on my local laptop?
  • Are there recommended approaches to provisioning components for data integration?
  • How can we build a continuous integration and continuous delivery (CI/CD) pipeline for our data integration pipeline?
  • What is the best practice to move from a pre-production environment to production?

To tackle these asks, this post defines the development lifecycle for data integration and demonstrates how software engineers and data engineers can design an end-to-end development lifecycle using AWS Glue, including development, testing, and CI/CD, using a sample baseline template.

End-to-end development lifecycle for a data integration pipeline

Today, it’s common to define not only data integration jobs but also all the data components in code. This means that you can rely on standard software best practices to build your data integration pipeline. The software development lifecycle on AWS defines the following six phases: Plan, Design, Implement, Test, Deploy, and Maintain.

In this section, we discuss each phase in the context of data integration pipeline.

Plan

In the planning phase, developers collect requirements from stakeholders such as end-users to define a data requirement. This could be what the use cases are (for example, ad hoc queries, dashboard, or troubleshooting), how much data to process (for example, 1 TB per day), what kinds of data, how many different data sources to pull from, how much data latency to accept to make it queryable (for example, 15 minutes), and so on.

Design

In the design phase, you analyze requirements and identify the best solution to build the data integration pipeline. In AWS, you need to choose the right services to achieve the goal and come up with the architecture by integrating those services and defining dependencies between components. For example, you may choose AWS Glue jobs as a core component for loading data from different sources, including Amazon Simple Storage Service (Amazon S3), then integrating them and preprocessing and enriching data. Then you may want to chain multiple AWS Glue jobs and orchestrate them. Finally, you may want to use Amazon Athena and Amazon QuickSight to present the enriched data to end-users.

Implement

In the implementation phase, data engineers code the data integration pipeline. They analyze the requirements to identify coding tasks to achieve the final result. The code includes the following:

  • AWS resource definition
  • Data integration logic

When using AWS Glue, you can define the data integration logic in a job script, which can be written in Python or Scala. You can use your preferred IDE to implement AWS resource definition using the AWS Cloud Development Kit (AWS CDK) or AWS CloudFormation, and also the business logic of AWS Glue job scripts for data integration. To learn more about how to implement your AWS Glue job scripts locally, refer to Develop and test AWS Glue version 3.0 and 4.0 jobs locally using a Docker container.

Test

In the testing phase, you check the implementation for bugs. Quality analysis includes testing the code for errors and checking if it meets the requirements. Because many teams immediately test the code you write, the testing phase often runs parallel to the development phase. There are different types of testing:

  • Unit testing
  • Integration testing
  • Performance testing

For unit testing, even for data integration, you can rely on a standard testing framework such as pytest and ScalaTest. To learn more about how to achieve unit testing locally, refer to Develop and test AWS Glue version 3.0 and 4.0 jobs locally using a Docker container.

Deploy

When data engineers develop a data integration pipeline, you code and test on a different copy of the product than the one that the end-users have access to. The environment that end-users use is called production, whereas other copies are said to be in the development or the pre-production environment.

Having separate build and production environments ensures that you can continue to use the data integration pipeline even while it’s being changed or upgraded. The deployment phase includes several tasks to move the latest build copy to the production environment, such as packaging, environment configuration, and installation.

The following components are deployed through the AWS CDK or AWS CloudFormation:

  • AWS resources
  • Data integration job scripts for AWS Glue

AWS CodePipeline helps you to build a mechanism to automate deployments among different environments, including development, pre-production, and production. When you commit your code to AWS CodeCommit, CodePipeline automatically provisions AWS resources based on the CloudFormation templates included in the commit and uploads script files included in the commit to Amazon S3.

Maintain

Even after you deploy your solution to a production environment, it’s not the end of your project. You need to monitor the data integration pipeline continuously and keep maintaining and improving it. More specifically, you also need to fix bugs, resolve customer issues, and manage software changes. In addition, you need to monitor the overall system performance, security, and user experience to identify new ways to improve the existing data integration pipeline.

Solution overview

Typically, you have multiple accounts to manage and provision resources for your data pipeline. In this post, we assume the following three accounts:

  • Pipeline account – This hosts the end-to-end pipeline
  • Dev account – This hosts the integration pipeline in the development environment
  • Prod account – This hosts the data integration pipeline in the production environment

If you want, you can use the same account and the same Region for all three.

To start applying this end-to-end development lifecycle model to your data platform easily and quickly, we prepared the baseline template aws-glue-cdk-baseline using the AWS CDK. The template is built on top of AWS CDK v2 and CDK Pipelines. It provisions two kinds of stacks:

  • AWS Glue app stack – This provisions the data integration pipeline: one in the dev account and one in the prod account
  • Pipeline stack – This provisions the Git repository and CI/CD pipeline in the pipeline account

The AWS Glue app stack provisions the data integration pipeline, including the following resources:

  • AWS Glue jobs
  • AWS Glue job scripts

The following diagram illustrates this architecture.

At the time of publishing of this post, the AWS CDK has two versions of the AWS Glue module: @aws-cdk/aws-glue and @aws-cdk/aws-glue-alpha, containing L1 constructs and L2 constructs, respectively. The sample AWS Glue app stack is defined using aws-glue-alpha, the L2 construct for AWS Glue, because it’s straightforward to define and manage AWS Glue resources. If you want to use the L1 construct, refer to Build, Test and Deploy ETL solutions using AWS Glue and AWS CDK based CI/CD pipelines.

The pipeline stack provisions the entire CI/CD pipeline, including the following resources:

The following diagram illustrates the pipeline workflow.

Every time the business requirement changes (such as adding data sources or changing data transformation logic), you make changes on the AWS Glue app stack and re-provision the stack to reflect your changes. This is done by committing your changes in the AWS CDK template to the CodeCommit repository, then CodePipeline reflects the changes on AWS resources using CloudFormation change sets.

In the following sections, we present the steps to set up the required environment and demonstrate the end-to-end development lifecycle.

Prerequisites

You need the following resources:

Initialize the project

To initialize the project, complete the following steps:

  1. Clone the baseline template to your workplace:
    $ git clone [email protected]:aws-samples/aws-glue-cdk-baseline.git
    $ cd aws-glue-cdk-baseline.git

  2. Create a Python virtual environment specific to the project on the client machine:
    $ python3 -m venv .venv

We use a virtual environment in order to isolate the Python environment for this project and not install software globally.

  1. Activate the virtual environment according to your OS:
    • On MacOS and Linux, use the following command:
      $ source .venv/bin/activate

    • On a Windows platform, use the following command:
      % .venv\Scripts\activate.bat

After this step, the subsequent steps run within the bounds of the virtual environment on the client machine and interact with the AWS account as needed.

  1. Install the required dependencies described in requirements.txt to the virtual environment:
    $ pip install -r requirements.txt
    $ pip install -r requirements-dev.txt

  2. Edit the configuration file default-config.yaml based on your environments (replace each account ID with your own):
    pipelineAccount:
    awsAccountId: 123456789101
    awsRegion: us-east-1
    
    devAccount:
    awsAccountId: 123456789102
    awsRegion: us-east-1
    
    prodAccount:
    awsAccountId: 123456789103
    awsRegion: us-east-1

  3. Run pytest to initialize the snapshot test files by running the following command:
    $ python3 -m pytest --snapshot-update

Bootstrap your AWS environments

Run the following commands to bootstrap your AWS environments:

  1. In the pipeline account, replace PIPELINE-ACCOUNT-NUMBER, REGION, and PIPELINE-PROFILE with your own values:
    $ cdk bootstrap aws://<PIPELINE-ACCOUNT-NUMBER>/<REGION> --profile <PIPELINE-PROFILE> \
    --cloudformation-execution-policies arn:aws:iam::aws:policy/AdministratorAccess

  2. In the dev account, replace PIPELINE-ACCOUNT-NUMBER, DEV-ACCOUNT-NUMBER, REGION, and DEV-PROFILE with your own values:
    $ cdk bootstrap aws://<DEV-ACCOUNT-NUMBER>/<REGION> --profile <DEV-PROFILE> \
    --cloudformation-execution-policies arn:aws:iam::aws:policy/AdministratorAccess \
    --trust <PIPELINE-ACCOUNT-NUMBER>

  3. In the prod account, replace PIPELINE-ACCOUNT-NUMBER, PROD-ACCOUNT-NUMBER, REGION, and PROD-PROFILE with your own values:
    $ cdk bootstrap aws://<PROD-ACCOUNT-NUMBER>/<REGION> --profile <PROD-PROFILE> \
    --cloudformation-execution-policies arn:aws:iam::aws:policy/AdministratorAccess \
    --trust <PIPELINE-ACCOUNT-NUMBER>

When you use only one account for all environments, you can just run the cdk bootstrap command one time.

Deploy your AWS resources

Run the command using the pipeline account to deploy the resources defined in the AWS CDK baseline template:

$ cdk deploy --profile <PIPELINE-PROFILE>

This creates the pipeline stack in the pipeline account and the AWS Glue app stack in the development account.

When the cdk deploy command is completed, let’s verify the pipeline using the pipeline account.

On the CodePipeline console, navigate to GluePipeline. Then verify that GluePipeline has the following stages: Source, Build, UpdatePipeline, Assets, DeployDev, and DeployProd. Also verify that the stages Source, Build, UpdatePipeline, Assets, DeployDev have succeeded, and DeployProd is pending. It can take about 15 minutes.

Now that the pipeline has been created successfully, you can also verify the AWS Glue app stack resource on the AWS CloudFormation console in the dev account.

At this step, the AWS Glue app stack is deployed only in the dev account. You can try to run the AWS Glue job ProcessLegislators to see how it works.

Configure your Git repository with CodeCommit

In an earlier step, you cloned the Git repository from GitHub. Although it’s possible to configure the AWS CDK template to work with GitHub, GitHub Enterprise, or Bitbucket, for this post, we use CodeCommit. If you prefer those third-party Git providers, configure the connections and edit pipeline_stack.py to define the variable source to use the target Git provider using CodePipelineSource.

Because you already ran the cdk deploy command, the CodeCommit repository has already been created with all the required code and related files. The first step is to set up access to CodeCommit. The next step is to clone the repository from the CodeCommit repository to your local. Run the following commands:

$ mkdir aws-glue-cdk-baseline-codecommit
$ cd aws-glue-cdk-baseline-codecommit
$ git clone ssh://git-codecommit.us-east-1.amazonaws.com/v1/repos/aws-glue-cdk-baseline

In the next step, we make changes in this local copy of the CodeCommit repository.

End-to-end development lifecycle

Now that the environment has been successfully created, you’re ready to start developing a data integration pipeline using this baseline template. Let’s walk through end-to-end development lifecycle.

When you want to define your own data integration pipeline, you need to add more AWS Glue jobs and implement job scripts. For this post, let’s assume the use case to add a new AWS Glue job with a new job script to read multiple S3 locations and join them.

Implement and test in your local environment

First, implement and test the AWS Glue job and its job script in your local environment using Visual Studio Code.

Set up your development environment by following the steps in Develop and test AWS Glue version 3.0 and 4.0 jobs locally using a Docker container. The following steps are required in the context of this post:

  1. Start Docker.
  2. Pull the Docker image that has the local development environment using the AWS Glue ETL library:
    $ docker pull public.ecr.aws/glue/aws-glue-libs:glue_libs_4.0.0_image_01

  3. Run the following command to define the AWS named profile name:
    $ PROFILE_NAME="<DEV-PROFILE>"

  4. Run the following command to make it available with the baseline template:
    $ cd aws-glue-cdk-baseline/
    $ WORKSPACE_LOCATION=$(pwd)

  5. Run the Docker container:
    $ docker run -it -v ~/.aws:/home/glue_user/.aws -v $WORKSPACE_LOCATION:/home/glue_user/workspace/ -e AWS_PROFILE=$PROFILE_NAME -e DISABLE_SSL=true 
    --rm -p 4040:4040 -p 18080:18080 
    --name glue_pyspark public.ecr.aws/glue/aws-glue-libs:glue_libs_4.0.0_image_01 pyspark

  6. Start Visual Studio Code.
  7. Choose Remote Explorer in the navigation pane, then choose the arrow icon of the workspace folder in the container public.ecr.aws/glue/aws-glue-libs:glue_libs_4.0.0_image_01.

If the workspace folder is not shown, choose Open folder and select /home/glue_user/workspace.

Then you will see a view similar to the following screenshot.

Optionally, you can install AWS Tool Kit for Visual Studio Code, and start Amazon CodeWhisperer to enable code recommendations powered by machine learning model. For example, in aws_glue_cdk_baseline/job_scripts/process_legislators.py, you can put comments like “# Write a DataFrame in Parquet format to S3”, press Enter key, then CodeWhisperer will recommend a code snippet similar to the following:

CodeWhisperer on Visual Studio Code

Now you install the required dependencies described in requirements.txt to the container environment.

  1. Run the following commands in the terminal in Visual Studio Code:
    $ pip install -r requirements.txt
    $ pip install -r requirements-dev.txt

  2. Implement the code.

Now let’s make the required changes for a new AWS Glue job here.

  1. Edit the file aws_glue_cdk_baseline/glue_app_stack.py. Let’s add the following new code block after the existing job definition of ProcessLegislators in order to add the new AWS Glue job JoinLegislators:
            self.new_glue_job = glue.Job(self, "JoinLegislators",
                executable=glue.JobExecutable.python_etl(
                    glue_version=glue.GlueVersion.V4_0,
                    python_version=glue.PythonVersion.THREE,
                    script=glue.Code.from_asset(
                        path.join(path.dirname(__file__), "job_scripts/join_legislators.py")
                    )
                ),
                description="a new example PySpark job",
                default_arguments={
                    "--input_path_orgs": config[stage]['jobs']['JoinLegislators']['inputLocationOrgs'],
                    "--input_path_persons": config[stage]['jobs']['JoinLegislators']['inputLocationPersons'],
                    "--input_path_memberships": config[stage]['jobs']['JoinLegislators']['inputLocationMemberships']
                },
                tags={
                    "environment": self.environment,
                    "artifact_id": self.artifact_id,
                    "stack_id": self.stack_id,
                    "stack_name": self.stack_name
                }
            )

Here, you added three job parameters for different S3 locations using the variable config. It is the dictionary generated from default-config.yaml. In this baseline template, we use this central config file for managing parameters for all the Glue jobs in the structure <stage name>/jobs/<job name>/<parameter name>. In the proceeding steps, you provide those locations through the AWS Glue job parameters.

  1. Create a new job script called aws_glue_cdk_baseline/job_scripts/join_legislators.py:
    aws_glue_cdk_baseline/job_scripts/join_legislators.py:
    
    import sys
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from awsglue.transforms import Join
    from awsglue.utils import getResolvedOptions
    
    
    class JoinLegislators:
        def __init__(self):
            params = []
            if '--JOB_NAME' in sys.argv:
                params.append('JOB_NAME')
                params.append('input_path_orgs')
                params.append('input_path_persons')
                params.append('input_path_memberships')
            args = getResolvedOptions(sys.argv, params)
    
            self.context = GlueContext(SparkContext.getOrCreate())
            self.job = Job(self.context)
    
            if 'JOB_NAME' in args:
                jobname = args['JOB_NAME']
                self.input_path_orgs = args['input_path_orgs']
                self.input_path_persons = args['input_path_persons']
                self.input_path_memberships = args['input_path_memberships']
            else:
                jobname = "test"
                self.input_path_orgs = "s3://awsglue-datasets/examples/us-legislators/all/organizations.json"
                self.input_path_persons = "s3://awsglue-datasets/examples/us-legislators/all/persons.json"
                self.input_path_memberships = "s3://awsglue-datasets/examples/us-legislators/all/memberships.json"
            self.job.init(jobname, args)
        
        def run(self):
            dyf = join_legislators(self.context, self.input_path_orgs, self.input_path_persons, self.input_path_memberships)
            df = dyf.toDF()
            df.printSchema()
            df.show()
            print(df.count())
    
    def read_dynamic_frame_from_json(glue_context, path):
        return glue_context.create_dynamic_frame.from_options(
            connection_type='s3',
            connection_options={
                'paths': [path],
                'recurse': True
            },
            format='json'
        )
    
    def join_legislators(glue_context, path_orgs, path_persons, path_memberships):
        orgs = read_dynamic_frame_from_json(glue_context, path_orgs)
        persons = read_dynamic_frame_from_json(glue_context, path_persons)
        memberships = read_dynamic_frame_from_json(glue_context, path_memberships)
        orgs = orgs.drop_fields(['other_names', 'identifiers']).rename_field('id', 'org_id').rename_field('name', 'org_name')
        dynamicframe_joined = Join.apply(orgs, Join.apply(persons, memberships, 'id', 'person_id'), 'org_id', 'organization_id').drop_fields(['person_id', 'org_id'])
        return dynamicframe_joined
    
    if __name__ == '__main__':
        JoinLegislators().run()

  2. Create a new unit test script for the new AWS Glue job called aws_glue_cdk_baseline/job_scripts/tests/test_join_legislators.py:
    import pytest
    import sys
    import join_legislators
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from awsglue.utils import getResolvedOptions
    
    @pytest.fixture(scope="module", autouse=True)
    def glue_context():
        sys.argv.append('--JOB_NAME')
        sys.argv.append('test_count')
    
        args = getResolvedOptions(sys.argv, ['JOB_NAME'])
        context = GlueContext(SparkContext.getOrCreate())
        job = Job(context)
        job.init(args['JOB_NAME'], args)
    
        yield(context)
    
    def test_counts(glue_context):
        dyf = join_legislators.join_legislators(glue_context, 
            "s3://awsglue-datasets/examples/us-legislators/all/organizations.json",
            "s3://awsglue-datasets/examples/us-legislators/all/persons.json", 
            "s3://awsglue-datasets/examples/us-legislators/all/memberships.json")
        assert dyf.toDF().count() == 10439

  3. In default-config.yaml, add the following under prod and dev:
     JoinLegislators:
          inputLocationOrgs: "s3://awsglue-datasets/examples/us-legislators/all/organizations.json"
          inputLocationPersons: "s3://awsglue-datasets/examples/us-legislators/all/persons.json"
          inputLocationMemberships: "s3://awsglue-datasets/examples/us-legislators/all/memberships.json"

  4. Add the following under "jobs" in the variable config in tests/unit/test_glue_app_stack.py, tests/unit/test_pipeline_stack.py, and tests/snapshot/test_snapshot_glue_app_stack.py (no need to replace S3 locations):
    ,
                "JoinLegislators": {
                    "inputLocationOrgs": "s3://path_to_data_orgs",
                    "inputLocationPersons": "s3://path_to_data_persons",
                    "inputLocationMemberships": "s3://path_to_data_memberships"
                }

  5. Choose Run at the top right to run the individual job scripts.

If the Run button is not shown, install Python into the container through Extensions in the navigation pane.

  1. For local unit testing, run the following command in the terminal in Visual Studio Code:
    $ cd aws_glue_cdk_baseline/job_scripts/
    $ python3 -m pytest

Then you can verify that the newly added unit test passed successfully.

  1. Run pytest to initialize the snapshot test files by running following command:
    $ cd ../../
    $ python3 -m pytest --snapshot-update

Deploy to the development environment

Complete following steps to deploy the AWS Glue app stack to the development environment and run integration tests there:

  1. Set up access to CodeCommit.
  2. Commit and push your changes to the CodeCommit repo:
    $ git add .
    $ git commit -m "Add the second Glue job"
    $ git push

You can see that the pipeline is successfully triggered.

Integration test

There is nothing required for running the integration test for the newly added AWS Glue job. The integration test script integ_test_glue_app_stack.py runs all the jobs including a specific tag, then verifies the state and its duration. If you want to change the condition or the threshold, you can edit assertions at the end of the integ_test_glue_job method.

Deploy to the production environment

Complete the following steps to deploy the AWS Glue app stack to the production environment:

  1. On the CodePipeline console, navigate to GluePipeline.
  2. Choose Review under the DeployProd stage.
  3. Choose Approve.

Wait for the DeployProd stage to complete, then you can verify the AWS Glue app stack resource in the dev account.

Clean up

To clean up your resources, complete following steps:

  1. Run the following command using the pipeline account:
    $ cdk destroy --profile <PIPELINE-PROFILE>

  2. Delete the AWS Glue app stack in the dev account and prod account.

Conclusion

In this post, you learned how to define the development lifecycle for data integration and how software engineers and data engineers can design an end-to-end development lifecycle using AWS Glue, including development, testing, and CI/CD, through a sample AWS CDK template. You can get started building your own end-to-end development lifecycle for your workload using AWS Glue.


About the author

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Build data integration jobs with AI companion on AWS Glue Studio notebook powered by Amazon CodeWhisperer

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/build-data-integration-jobs-with-ai-companion-on-aws-glue-studio-notebook-powered-by-amazon-codewhisperer/

Data is essential for businesses to make informed decisions, improve operations, and innovate. Integrating data from different sources can be a complex and time-consuming process. AWS offers AWS Glue to help you integrate your data from multiple sources on serverless infrastructure for analysis, machine learning (ML), and application development. AWS Glue provides different authoring experiences for you to build data integration jobs. One of the most common options is the notebook. Data scientists tend to run queries interactively and retrieve results immediately to author data integration jobs. This interactive experience can accelerate building data integration pipelines.

Recently, AWS announced general availability of Amazon CodeWhisperer. Amazon CodeWhisperer is an AI coding companion that uses foundational models under the hood to improve developer productivity. This works by generating code suggestions in real time based on developers’ comments in natural language and prior code in their integrated development environment (IDE). AWS also announced the Amazon CodeWhisperer Jupyter extension to help Jupyter users by generating real-time, single-line, or full-function code suggestions for Python notebooks on Jupyter Lab and Amazon SageMaker Studio.

Today, we are excited to announce that AWS Glue Studio notebooks now support Amazon CodeWhisperer for AWS Glue users to improve your experience and help boost development productivity. Now, in your Glue Studio notebook, you can write a comment in natural language (in English) that outlines a specific task, such as “Create a Spark DataFrame from a json file.”. Based on this information, CodeWhisperer recommends one or more code snippets directly in the notebook that can accomplish the task. You can quickly accept the top suggestion, view more suggestions, or continue writing your own code.

This post demonstrates how the user experience on AWS Glue Studio notebook has been changed with the Amazon CodeWhisperer integration.

Prerequisites

Before going forward with this tutorial, you need to complete the following prerequisites:

  1. Set up AWS Glue Studio.
  2. Configure an AWS Identity and Access Management (IAM) role to interact with Amazon CodeWhisperer. Attach the following policy to your IAM role for the AWS Glue Studio notebook:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "CodeWhispererPermissions",
                "Effect": "Allow",
                "Action": [
                    "codewhisperer:GenerateRecommendations"
                ],
                "Resource": "*"
            }
        ]
    }

Getting Started

Let’s get started. Create a new AWS Glue Studio notebook job by completing the following steps:

  1. On the AWS Glue console, choose Notebooks under ETL jobs in the navigation pane.
  2. Select Jupyter Notebook and choose Create.
  3. For Job name, enter codewhisperer-demo.
  4. For IAM Role, select your IAM role that you configured as a prerequisite.
  5. Choose Start notebook.

A new notebook is created with sample cells.

At the bottom, there is a menu named CodeWhisperer. By choosing this menu, you can see the shortcuts and several options, including disabling auto-suggestions.

Let’s try your first recommendation by Amazon CodeWhisperer. Note that this post contains examples of recommendations, but you may see different code snippets recommended by Amazon CodeWhisperer.

Add a new cell and enter your comment to describe what you want to achieve. After you press Enter, the recommended code is shown.

If you press Tab, then code is chosen. If you press arrow keys, then you can select other recommendations. You can learn more in User actions.

Now let’s read a JSON file from Amazon Simple Storage Service (Amazon S3). Enter the following code comment into a notebook cell and press Enter:

# Create a Spark DataFrame from a json file

CodeWhisperer will recommend a code snippet similar to the following:

def create_spark_df_from_json(spark, file_path):
    return spark.read.json(file_path)

Now use this method to utilize the suggested code snippet:

df = create_spark_df_from_json(spark, "s3://awsglue-datasets/examples/us-legislators/all/persons.json")
df.show()

The proceeding code returns the following output:

+----------+--------------------+----------+-----------+------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+----------------+
|birth_date|     contact_details|death_date|family_name|gender|given_name|                  id|         identifiers|               image|              images|               links|              name|         other_names|       sort_name|
+----------+--------------------+----------+-----------+------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+----------------+
|1944-10-15|                null|      null|    Collins|  male|   Michael|0005af3a-9471-4d1...|[{C000640, biogui...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...|       Mac Collins|[{bar, Mac Collin...|Collins, Michael|
|1969-01-31|[{fax, 202-226-07...|      null|   Huizenga|  male|      Bill|00aa2dc0-bfb6-441...|[{Bill Huizenga, ...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...|     Bill Huizenga|[{da, Bill Huizen...|  Huizenga, Bill|
|1959-09-28|[{phone, 202-225-...|      null|    Clawson|  male|    Curtis|00aca284-9323-495...|[{C001102, biogui...|https://theunited...|[{https://theunit...|[{Wikipedia (comm...|      Curt Clawson|[{bar, Curt Claws...| Clawson, Curtis|
|1930-08-14|                null|2001-10-26|    Solomon|  male|    Gerald|00b73df5-4180-441...|[{S000675, biogui...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...|    Gerald Solomon|[{null, Gerald B....| Solomon, Gerald|
|1960-05-28|[{fax, 202-225-42...|      null|     Rigell|  male|    Edward|00bee44f-db04-4a7...|[{R000589, biogui...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...|   E. Scott Rigell|[{null, Scott Rig...|  Rigell, Edward|
|1951-05-20|[{twitter, MikeCr...|      null|      Crapo|  male|   Michael|00f8f12d-6e27-4a2...|[{Mike Crapo, bal...|https://theunited...|[{https://theunit...|[{Wikipedia (da),...|        Mike Crapo|[{da, Mike Crapo,...|  Crapo, Michael|
|1926-05-12|                null|      null|      Hutto|  male|      Earl|015d77c8-6edb-4ed...|[{H001018, biogui...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...|        Earl Hutto|[{null, Earl Dewi...|     Hutto, Earl|
|1937-11-07|                null|2015-11-19|      Ertel|  male|     Allen|01679bc3-da21-482...|[{E000208, biogui...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...|       Allen Ertel|[{null, Allen E. ...|    Ertel, Allen|
|1916-09-01|                null|2007-11-24|     Minish|  male|    Joseph|018247d0-2961-423...|[{M000796, biogui...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...|     Joseph Minish|[{bar, Joseph Min...|  Minish, Joseph|
|1957-08-04|[{phone, 202-225-...|      null|    Andrews|  male|    Robert|01b100ac-192e-4b5...|[{A000210, biogui...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...| Robert E. Andrews|[{null, Rob Andre...| Andrews, Robert|
|1957-01-10|[{fax, 202-225-57...|      null|     Walden|  male|      Greg|01bc21bf-8939-487...|[{Greg Walden, ba...|https://theunited...|[{https://theunit...|[{Wikipedia (comm...|       Greg Walden|[{bar, Greg Walde...|    Walden, Greg|
|1919-01-17|                null|1987-11-29|      Kazen|  male|   Abraham|02059c1e-0bdf-481...|[{K000025, biogui...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...|Abraham Kazen, Jr.|[{null, Abraham K...|  Kazen, Abraham|
|1960-01-11|[{fax, 202-225-67...|      null|     Turner|  male|   Michael|020aa7dd-54ef-435...|[{Michael R. Turn...|https://theunited...|[{https://theunit...|[{Wikipedia (comm...| Michael R. Turner|[{null, Mike Turn...| Turner, Michael|
|1942-06-28|                null|      null|      Kolbe|  male|     James|02141651-eca2-4aa...|[{K000306, biogui...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...|         Jim Kolbe|[{ca, Jim Kolbe, ...|    Kolbe, James|
|1941-03-08|[{fax, 202-225-79...|      null|  Lowenthal|  male|      Alan|0231c6ef-6e92-49b...|[{Alan Lowenthal,...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...| Alan S. Lowenthal|[{null, Alan Lowe...| Lowenthal, Alan|
|1952-01-09|[{fax, 202-225-93...|      null|    Capuano|  male|   Michael|0239032f-be5c-4af...|[{Michael Capuano...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...|Michael E. Capuano|[{null, Mike Capu...|Capuano, Michael|
|1951-10-19|[{fax, 202-225-56...|      null|   Schrader|  male|      Kurt|0263f619-eff8-4e1...|[{Kurt Schrader, ...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...|     Kurt Schrader|[{bar, Kurt Schra...|  Schrader, Kurt|
|1947-06-13|[{fax, 202-225-69...|      null|     Nadler|  male|   Jerrold|029e793d-ec40-4a1...|[{N000002, biogui...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...|    Jerrold Nadler|[{ca, Jerrold Nad...| Nadler, Jerrold|
|1970-02-03|[{fax, 202-225-82...|      null|     Graves|  male|       Tom|02b621fc-0523-449...|[{Tom Graves, bal...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...|        Tom Graves|[{bar, Tom Graves...|     Graves, Tom|
|1932-05-09|                null|      null|   McMillan|  male|      John|03018f7c-f866-419...|[{M000566, biogui...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...|     John McMillan|[{null, Alex McMi...|  McMillan, John|
+----------+--------------------+----------+-----------+------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+----------------+
only showing top 20 rows

As you can see from the result, you can quickly utilize the code snippet recommended by Amazon CodeWhisperer.

Examples of code recommendations

In this section, we provide additional examples of code recommendations. Note that these are just our examples, and different code snippets may be suggested by Amazon CodeWhisperer.

Add a column with a calculation

In extract, transform, and load (ETL) use cases, it’s common to add new columns from existing columns. When we need to add columns to our Spark DataFrame, we can articulate with a high level of detail to Amazon CodeWhisperer what type of column we need added and its respective attributes:

# Add age column to a given person DataFrame
# age is calculated from current date and birth_date. When death_date is not null, then use death_date to calculate age

Amazon CodeWhisperer will recommend a code snippet similar to the following:

def add_age_column(df):
    # Use current date to calculate age
    current_date = current_timestamp()
    # Use birth_date column to calculate age
    df = df.withColumn("age", datediff(current_date, df.birth_date) / 365)
    # Use death_date column to calculate age
    df = df.withColumn("age", when(df.death_date.isNull(), df.age).otherwise(datediff(df.death_date, df.birth_date) / 365))
    return df

It can be utilized as follows:

df = add_age_column(df)
df.show()

The preceding code returns the following output:

+----------+--------------------+----------+-----------+------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+----------------+------------------+--------------------+
|birth_date|     contact_details|death_date|family_name|gender|given_name|                  id|         identifiers|               image|              images|               links|              name|         other_names|       sort_name|               age|        current_date|
+----------+--------------------+----------+-----------+------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+----------------+------------------+--------------------+
|1944-10-15|                null|      null|    Collins|  male|   Michael|0005af3a-9471-4d1...|[{C000640, biogui...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...|       Mac Collins|[{bar, Mac Collin...|Collins, Michael| 78.71506849315068|2023-06-14 06:12:...|
|1969-01-31|[{fax, 202-226-07...|      null|   Huizenga|  male|      Bill|00aa2dc0-bfb6-441...|[{Bill Huizenga, ...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...|     Bill Huizenga|[{da, Bill Huizen...|  Huizenga, Bill|  54.4027397260274|2023-06-14 06:12:...|
|1959-09-28|[{phone, 202-225-...|      null|    Clawson|  male|    Curtis|00aca284-9323-495...|[{C001102, biogui...|https://theunited...|[{https://theunit...|[{Wikipedia (comm...|      Curt Clawson|[{bar, Curt Claws...| Clawson, Curtis| 63.75342465753425|2023-06-14 06:12:...|
|1930-08-14|                null|2001-10-26|    Solomon|  male|    Gerald|00b73df5-4180-441...|[{S000675, biogui...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...|    Gerald Solomon|[{null, Gerald B....| Solomon, Gerald| 71.24931506849315|2023-06-14 06:12:...|
|1960-05-28|[{fax, 202-225-42...|      null|     Rigell|  male|    Edward|00bee44f-db04-4a7...|[{R000589, biogui...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...|   E. Scott Rigell|[{null, Scott Rig...|  Rigell, Edward|63.087671232876716|2023-06-14 06:12:...|
|1951-05-20|[{twitter, MikeCr...|      null|      Crapo|  male|   Michael|00f8f12d-6e27-4a2...|[{Mike Crapo, bal...|https://theunited...|[{https://theunit...|[{Wikipedia (da),...|        Mike Crapo|[{da, Mike Crapo,...|  Crapo, Michael| 72.11780821917809|2023-06-14 06:12:...|
|1926-05-12|                null|      null|      Hutto|  male|      Earl|015d77c8-6edb-4ed...|[{H001018, biogui...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...|        Earl Hutto|[{null, Earl Dewi...|     Hutto, Earl| 97.15616438356165|2023-06-14 06:12:...|
|1937-11-07|                null|2015-11-19|      Ertel|  male|     Allen|01679bc3-da21-482...|[{E000208, biogui...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...|       Allen Ertel|[{null, Allen E. ...|    Ertel, Allen| 78.08493150684932|2023-06-14 06:12:...|
|1916-09-01|                null|2007-11-24|     Minish|  male|    Joseph|018247d0-2961-423...|[{M000796, biogui...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...|     Joseph Minish|[{bar, Joseph Min...|  Minish, Joseph|  91.2904109589041|2023-06-14 06:12:...|
|1957-08-04|[{phone, 202-225-...|      null|    Andrews|  male|    Robert|01b100ac-192e-4b5...|[{A000210, biogui...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...| Robert E. Andrews|[{null, Rob Andre...| Andrews, Robert|  65.9041095890411|2023-06-14 06:12:...|
|1957-01-10|[{fax, 202-225-57...|      null|     Walden|  male|      Greg|01bc21bf-8939-487...|[{Greg Walden, ba...|https://theunited...|[{https://theunit...|[{Wikipedia (comm...|       Greg Walden|[{bar, Greg Walde...|    Walden, Greg| 66.46849315068494|2023-06-14 06:12:...|
|1919-01-17|                null|1987-11-29|      Kazen|  male|   Abraham|02059c1e-0bdf-481...|[{K000025, biogui...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...|Abraham Kazen, Jr.|[{null, Abraham K...|  Kazen, Abraham| 68.91232876712328|2023-06-14 06:12:...|
|1960-01-11|[{fax, 202-225-67...|      null|     Turner|  male|   Michael|020aa7dd-54ef-435...|[{Michael R. Turn...|https://theunited...|[{https://theunit...|[{Wikipedia (comm...| Michael R. Turner|[{null, Mike Turn...| Turner, Michael|63.465753424657535|2023-06-14 06:12:...|
|1942-06-28|                null|      null|      Kolbe|  male|     James|02141651-eca2-4aa...|[{K000306, biogui...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...|         Jim Kolbe|[{ca, Jim Kolbe, ...|    Kolbe, James| 81.01643835616439|2023-06-14 06:12:...|
|1941-03-08|[{fax, 202-225-79...|      null|  Lowenthal|  male|      Alan|0231c6ef-6e92-49b...|[{Alan Lowenthal,...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...| Alan S. Lowenthal|[{null, Alan Lowe...| Lowenthal, Alan| 82.32328767123288|2023-06-14 06:12:...|
|1952-01-09|[{fax, 202-225-93...|      null|    Capuano|  male|   Michael|0239032f-be5c-4af...|[{Michael Capuano...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...|Michael E. Capuano|[{null, Mike Capu...|Capuano, Michael| 71.47671232876712|2023-06-14 06:12:...|
|1951-10-19|[{fax, 202-225-56...|      null|   Schrader|  male|      Kurt|0263f619-eff8-4e1...|[{Kurt Schrader, ...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...|     Kurt Schrader|[{bar, Kurt Schra...|  Schrader, Kurt|  71.7013698630137|2023-06-14 06:12:...|
|1947-06-13|[{fax, 202-225-69...|      null|     Nadler|  male|   Jerrold|029e793d-ec40-4a1...|[{N000002, biogui...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...|    Jerrold Nadler|[{ca, Jerrold Nad...| Nadler, Jerrold| 76.05479452054794|2023-06-14 06:12:...|
|1970-02-03|[{fax, 202-225-82...|      null|     Graves|  male|       Tom|02b621fc-0523-449...|[{Tom Graves, bal...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...|        Tom Graves|[{bar, Tom Graves...|     Graves, Tom|53.394520547945206|2023-06-14 06:12:...|
|1932-05-09|                null|      null|   McMillan|  male|      John|03018f7c-f866-419...|[{M000566, biogui...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...|     John McMillan|[{null, Alex McMi...|  McMillan, John| 91.15890410958905|2023-06-14 06:12:...|
+----------+--------------------+----------+-----------+------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+----------------+------------------+--------------------+
only showing top 20 rows

Sort and extract records

You can use Amazon CodeWhisperer for sorting data and extracting records within a Spark DataFrame as well:

# Show top 5 oldest persons from DataFrame
# Use age column

Amazon CodeWhisperer will recommend a code snippet similar to the following:

def get_oldest_person(df):
    return df.orderBy(desc("age")).limit(5)

It can be utilized as follows:

get_oldest_person(df).show()

The preceding code returns the following output:

+----------+---------------+----------+-----------+------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------+--------------------+---------------+------------------+--------------------+
|birth_date|contact_details|death_date|family_name|gender|given_name|                  id|         identifiers|               image|              images|               links|           name|         other_names|      sort_name|               age|        current_date|
+----------+---------------+----------+-----------+------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------+--------------------+---------------+------------------+--------------------+
|1919-08-22|           null|      null|       Winn|  male|    Edward|942d20ed-d838-436...|[{W000636, biogui...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...|Larry Winn, Jr.|[{null, Larry Win...|   Winn, Edward|103.88219178082191|2023-06-14 06:13:...|
|1920-03-23|           null|      null|      Smith|  male|      Neal|84a9cbe4-651b-46d...|[{S000596, biogui...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...|     Neal Smith|[{null, Neal Edwa...|    Smith, Neal| 103.2958904109589|2023-06-14 06:13:...|
|1920-09-17|           null|      null|       Holt|female|  Marjorie|8bfb671a-3147-4bc...|[{H000747, biogui...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...|  Marjorie Holt|[{bar, Marjorie H...| Holt, Marjorie| 102.8082191780822|2023-06-14 06:13:...|
|1921-03-05|           null|      null|     Bedell|  male|   Berkley|896f0ce3-afe4-4ea...|[{B000298, biogui...|https://theunited...|[{https://theunit...|[{Wikipedia (de),...| Berkley Bedell|[{ca, Berkley Bed...|Bedell, Berkley|102.34520547945205|2023-06-14 06:13:...|
|1921-06-23|           null|      null|    Findley|  male|      Paul|2811f793-1108-4fb...|[{F000123, biogui...|https://theunited...|[{https://theunit...|[{Wikipedia (azb)...|   Paul Findley|[{azb, پاول فایند...|  Findley, Paul|102.04383561643836|2023-06-14 06:13:...|
+----------+---------------+----------+-----------+------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------+--------------------+---------------+------------------+--------------------+

Generate sample datasets in a Spark DataFrame

Amazon CodeWhisperer is powerful enough to generate sample Spark DataFrames as well, which can be done like so:

# Generate sample Spark DataFrame of country name and country code
# First column name is country_name, and second column name is country_code

Amazon CodeWhisperer will recommend a code snippet similar to the following:

def get_country_code_df(spark):
    return spark.createDataFrame(
        [("United States", "US"), ("United Kingdom", "UK"), ("Canada", "CA")],
        ["country_name", "country_code"]
    )

It can be utilized as follows:

df = get_country_code_df(spark)
df.show()

The preceding code returns the following output:

+--------------+------------+
|  country_name|country_code|
+--------------+------------+
| United States|          US|
|United Kingdom|          UK|
|        Canada|          CA|
+--------------+------------+

Generate transformations in SQL

We can also use Amazon CodeWhisperer to create a code snippet for transformation in SQL and create a new table from the SQL query results (CTAS) like so:

# Generate CTAS query by selecting all the records in a table with grouping by a given column

Amazon CodeWhisperer will recommend a code snippet similar to following:

def generate_ctas_query_with_group_by(table_name, group_by_col):
    ctas_query = "CREATE TABLE " + table_name + " AS SELECT * FROM " + table_name + " GROUP BY " + group_by_col
    return ctas_query

Conclusion

In this post, we demonstrated how AWS Glue Studio notebook integration with Amazon CodeWhisperer helps you build data integration jobs faster. This integration is available today in US East (N. Virginia). You can start using the AWS Glue Studio notebook with Amazon CodeWhisperer to accelerate building your data integration jobs. To get started with AWS Glue, visit AWS Glue.

Learn more

To learn more about using AWS Glue notebooks and Amazon CodeWhisperer, check out the following video.


About the authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Gal blog picGal Heyne is a Product Manager for AWS Glue with a strong focus on AI/ML, data engineering, and BI, and is based in California. She is passionate about developing a deep understanding of customers’ business needs and collaborating with engineers to design easy-to-use data products. In her spare time, she enjoys playing card games.

Scale your AWS Glue for Apache Spark jobs with new larger worker types G.4X and G.8X

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/scale-your-aws-glue-for-apache-spark-jobs-with-new-larger-worker-types-g-4x-and-g-8x/

Hundreds of thousands of customers use AWS Glue, a serverless data integration service, to discover, prepare, and combine data for analytics, machine learning (ML), and application development. AWS Glue for Apache Spark jobs work with your code and configuration of the number of data processing units (DPU). Each DPU provides 4 vCPU, 16 GB memory, and 64 GB disk. AWS Glue manages running Spark and adjusts workers to achieve the best price performance. For workloads such as data transforms, joins, and queries, you can use G.1X (1 DPU) and G.2X (2 DPU) workers, which offer a scalable and cost-effective way to run most jobs. With exponentially growing data sources and data lakes, customers want to run more data integration workloads, including their most demanding transforms, aggregations, joins, and queries. These workloads require higher compute, memory, and storage per worker.

Today we are pleased to announce the general availability of AWS Glue G.4X (4 DPU) and G.8X (8 DPU) workers, the next series of AWS Glue workers for the most demanding data integration workloads. G.4X and G.8X workers offer increased compute, memory, and storage, making it possible for you to vertically scale and run intensive data integration jobs, such as memory-intensive data transforms, skewed aggregations, and entity detection checks involving petabytes of data. Larger worker types not only benefit the Spark executors, but also in cases where the Spark driver needs larger capacity—for instance, because the job query plan is quite large.

This post demonstrates how AWS Glue G.4X and G.8X workers help you scale your AWS Glue for Apache Spark jobs.

G.4X and G.8X workers

AWS Glue G.4X and G.8X workers give you more compute, memory, and storage to run your most demanding jobs. G.4X workers provide 4 DPU, with 16 vCPU, 64 GB memory, and 256 GB of disk per node. G.8X workers provide 8 DPU, with 32 vCPU, 128 GB memory, and 512 GB of disk per node. You can enable G.4X and G.8X workers with a single parameter change in the API, AWS Command Line Interface (AWS CLI), or visually in AWS Glue Studio. Regardless of the worker used, all AWS Glue jobs have the same capabilities, including auto scaling and interactive job authoring via notebooks. G.4X and G.8X workers are available with AWS Glue 3.0 and 4.0.

The following table shows compute, memory, disk, and Spark configurations per worker type in AWS Glue 3.0 or later.

AWS Glue Worker Type DPU per Node vCPU Memory (GB) Disk (GB) Number of Spark Executors per Node Number of Cores per Spark Executor
G.1X 1 4 16 64 1 4
G.2X 2 8 32 128 1 8
G.4X (new) 4 16 64 256 1 16
G.8X (new) 8 32 128 512 1 32

To use G.4X and G.8X workers on an AWS Glue job, change the setting of the worker type parameter to G.4X or G.8X. In AWS Glue Studio, you can choose G 4X or G 8X under Worker type.

In the AWS API or AWS SDK, you can specify G.4X or G.8X in the WorkerType parameter. In the AWS CLI, you can use the --worker-type parameter in a create-job command.

To use G.4X and G.8X on an AWS Glue Studio notebook or interactive sessions, set G.4X or G.8X in the %worker_type magic:

Performance characteristics using the TPC-DS benchmark

In this section, we use the TPC-DS benchmark to showcase performance characteristics of the new G.4X and G.8X worker types. We used AWS Glue version 4.0 jobs.

G.2X, G.4X, and G.8X results with the same number of workers

Compared to the G.2X worker type, the G.4X worker has 2 times the DPUs and the G.8X worker has 4 times the DPUs. We ran over 100 TPC-DS queries against the 3 TB TPC-DS dataset with the same number of workers but on different worker types. The following table shows the results of the benchmark.

Worker Type Number of Workers Number of DPUs Duration (minutes) Cost at $0.44/DPU-hour ($)
G.2X 30 60 537.4 $236.46
G.4X 30 120 264.6 $232.85
G.8X 30 240 122.6 $215.78

When running jobs on the same number of workers, the new G.4X and G.8x workers achieved roughly linear vertical scalability.

G.2X, G.4X, and G.8X results with the same number of DPUs

We ran over 100 TPC-DS queries against the 10 TB TPC-DS dataset with the same number of DPUs but on different worker types. The following table shows the results of the experiments.

Worker Type Number of Workers Number of DPUs Duration (minutes) Cost at $0.44/DPU-hour ($)
G.2X 40 80 1323 $776.16
G.4X 20 80 1191 $698.72
G.8X 10 80 1190 $698.13

When running jobs on the same number of total DPUs, the job performance stayed mostly the same with new worker types.

Example: Memory-intensive transformations

Data transformations are an essential step to preprocess and structure your data into an optimal form. Bigger memory footprints are consumed in some transformations such as aggregation, join, your own custom logic using user-defined functions (UDFs), and so on. The new G.4X and G.8X workers enable you to run larger memory-intensive transformations at scale.

The following example reads large JSON files compressed in GZIP from an input Amazon Simple Storage Service (Amazon S3) location, performs groupBy, calculates groups based on K-means clustering using a Pandas UDF, then shows the results. Note that this UDF-based K-means is used just for illustration purposes; it’s recommended to use native K-means clustering for production purposes.

With G.2X workers

When an AWS Glue job runs on 12 G.2X workers (24 DPU), it failed due to a No space left on device error. On the Spark UI, the Stages tab for the failed stage shows that there were multiple failed tasks in the AWS Glue job due to the error.

The Executor tab shows failed tasks per executor.

Generally, G.2X workers can process memory-intensive workload well. This time, we used a special Pandas UDF that consumes a significant amount of memory, and it caused a failure due to a large amount of shuffle writes.

With G.8X workers

When an AWS Glue job runs on 3 G.8X workers (24 DPU), it succeeded without any failures, as shown on the Spark UI’s Jobs tab.

The Executors tab also explains that there were no failed tasks.

From this result, we observed that G.8X workers processed the same workload without failures.

Conclusion

In this post, we demonstrated how AWS Glue G.4X and G.8X workers can help you vertically scale your AWS Glue for Apache Spark jobs. G.4X and G.8X workers are available today in US East (Ohio), US East (N. Virginia), US West (Oregon), Asia Pacific (Singapore), Asia Pacific (Sydney), Asia Pacific (Tokyo), Canada (Central), Europe (Frankfurt), Europe (Ireland), and Europe (Stockholm). You can start using the new G.4X and G.8X worker types to scale your workload from today. To get started with AWS Glue, visit AWS Glue.


About the authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Tomohiro Tanaka is a Senior Cloud Support Engineer on the AWS Support team. He’s passionate about helping customers build data lakes using ETL workloads. In his free time, he enjoys coffee breaks with his colleagues and making coffee at home.

Chuhan LiuChuhan Liu is a Software Development Engineer on the AWS Glue team. He is passionate about building scalable distributed systems for big data processing, analytics, and management. In his spare time, he enjoys playing tennis.

Matt Su is a Senior Product Manager on the AWS Glue team. He enjoys helping customers uncover insights and make better decisions using their data with AWS Analytic services. In his spare time, he enjoys skiing and gardening.

Introducing native support for Apache Hudi, Delta Lake, and Apache Iceberg on AWS Glue for Apache Spark, Part 2: AWS Glue Studio Visual Editor

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/part-2-glue-studio-visual-editor-introducing-native-support-for-apache-hudi-delta-lake-and-apache-iceberg-on-aws-glue-for-apache-spark/

In the first post of this series, we described how AWS Glue for Apache Spark works with Apache Hudi, Linux Foundation Delta Lake, and Apache Iceberg datasets tables using the native support of those data lake formats. This native support simplifies reading and writing your data for these data lake frameworks so you can more easily build and maintain your data lakes in a transactionally consistent manner. This feature removes the need to install a separate connector and reduces the configuration steps required to use these frameworks in AWS Glue for Apache Spark jobs.

These data lake frameworks help you store data more efficiently and enable applications to access your data faster. Unlike simpler data file formats such as Apache Parquet, CSV, and JSON, which can store big data, data lake frameworks organize distributed big data files into tabular structures that enable basic constructs of databases on data lakes.

Expanding on the functionality we announced at AWS re:Invent 2022, AWS Glue now natively supports Hudi, Delta Lake and Iceberg through the AWS Glue Studio visual editor. If you prefer authoring AWS Glue for Apache Spark jobs using a visual tool, you can now choose any of these three data lake frameworks as a source or target through a graphical user interface (GUI) without any custom code.

Even without prior experience using Hudi, Delta Lake or Iceberg, you can easily achieve typical use cases. In this post, we demonstrate how to ingest data stored in Hudi using the AWS Glue Studio visual editor.

Example scenario

To demonstrate the visual editor experience, this post introduces the Global Historical Climatology Network Daily (GHCN-D) dataset. The data is publicly accessible through an Amazon Simple Storage Service (Amazon S3) bucket. For more information, see the Registry of Open Data on AWS. You can also learn more in Visualize over 200 years of global climate data using Amazon Athena and Amazon QuickSight.

The Amazon S3 location s3://noaa-ghcn-pds/csv/by_year/ has all the observations from 1763 to the present organized in CSV files, one file for each year. The following block shows an example of what the records look like:

ID,DATE,ELEMENT,DATA_VALUE,M_FLAG,Q_FLAG,S_FLAG,OBS_TIME
AE000041196,20220101,TAVG,204,H,,S,
AEM00041194,20220101,TAVG,211,H,,S,
AEM00041217,20220101,TAVG,209,H,,S,
AEM00041218,20220101,TAVG,207,H,,S,
AE000041196,20220102,TAVG,226,H,,S,
...
AE000041196,20221231,TMAX,243,,,S,
AE000041196,20221231,PRCP,0,D,,S,
AE000041196,20221231,TAVG,202,H,,S,

The records have fields including ID, DATE, ELEMENT, and more. Each combination of ID, DATE, and ELEMENT represents a unique record in this dataset. For example, the record with ID as AE000041196, ELEMENT as TAVG, and DATE as 20220101 is unique.

In this tutorial, we assume that the files are updated with new records every day, and want to store only the latest record per the primary key (ID and ELEMENT) to make the latest snapshot data queryable. One typical approach is to do an INSERT for all the historical data, and calculate the latest records in queries; however, this can introduce additional overhead in all the queries. When you want to analyze only the latest records, it’s better to do an UPSERT (update and insert) based on the primary key and DATE field rather than just an INSERT in order to avoid duplicates and maintain a single updated row of data.

Prerequisites

To continue this tutorial, you need to create the following AWS resources in advance:

Process a Hudi dataset on the AWS Glue Studio visual editor

Let’s author an AWS Glue job to read daily records in 2022, and write the latest snapshot into the Hudi table on your S3 bucket using UPSERT. Complete following steps:

  1. Open AWS Glue Studio.
  2. Choose Jobs.
  3. Choose Visual with a source and target.
  4. For Source and Target, choose Amazon S3, then choose Create.

A new visual job configuration appears. The next step is to configure the data source to read an example dataset:

  1. Under Visual, choose Data source – S3 bucket.
  2. Under Node properties, for S3 source type, select S3 location.
  3. For S3 URL, enter s3://noaa-ghcn-pds/csv/by_year/2022.csv.

The data source is configured.

data-source

The next step is to configure the data target to ingest data in Apache Hudi on your S3 bucket:

  1. Choose Data target – S3 bucket.
  2. Under Data target properties- S3, for Format, choose Apache Hudi.
  3. For Hudi Table Name, enter ghcn.
  4. For Hudi Storage Type, choose Copy on write.
  5. For Hudi Write Operation, choose Upsert.
  6. For Hudi Record Key Fields, choose ID.
  7. For Hudi Precombine Key Field, choose DATE.
  8. For Compression Type, choose GZIP.
  9. For S3 Target location, enter s3://<Your S3 bucket name>/<Your S3 bucket prefix>/hudi_native/ghcn/. (Provide your S3 bucket name and prefix.)

To make it easy to discover the sample data, and also make it queryable from Athena, configure the job to create a table definition on the AWS Glue Data Catalog:

  1. For Data Catalog update options, select Create a table in the Data Catalog and on subsequent runs, update the schema and add new partitions.
  2. For Database, choose hudi_native.
  3. For Table name, enter ghcn.
  4. For Partition keys – optional, choose ELEMENT.

Now your data integration job is authored in the visual editor completely. Let’s add one remaining setting about the IAM role, then run the job:

  1. Under Job details, for IAM Role, choose your IAM role.
  2. Choose Save, then choose Run.

data-target

  1. Navigate to the Runs tab to track the job progress and wait for it to complete.

job-run

Query the table with Athena

Now that the job has successfully created the Hudi table, you can query the table through different engines, including Amazon Athena, Amazon EMR, and Amazon Redshift Spectrum, in addition to AWS Glue for Apache Spark.

To query through Athena, complete the following steps:

  1. On the Athena console, open the query editor.
  2. In the query editor, enter the following SQL and choose Run:
SELECT * FROM "hudi_native"."ghcn" limit 10;

The following screenshot shows the query result.
athena-query1

Let’s dive deep into the table to understand how the data is ingested and focus on the records with ID=’AE000041196′.

  1. Run the following query to focus on the very specific example records with ID='AE000041196':
SELECT * FROM "hudi_native"."ghcn" WHERE ID='AE000041196';

The following screenshot shows the query result.
athena-query2

The original source file 2022.csv has historical records for record ID='USW00012894' from 20220101 to 20221231, however the query result shows only four records, one record per ELEMENT at the latest snapshot of the day 20221230 or 20221231. Because we used the UPSERT write option when writing data, we configured the ID field as a Hudi record key field, the DATE field as a Hudi precombine field, and the ELEMENT field as partition key field. When two records have the same key value, Hudi picks the one with the largest value for the precombine field. When the job ingested data, it compared all the values in the DATE field for each pair of ID and ELEMENT, and then picked the record with the largest value in the DATE field.

According to the preceding result, we were able to ingest the latest snapshot from all the 2022 data. Now let’s do an UPSERT of the new 2023 data to overwrite the records on the target Hudi table.

  1. Go back to AWS Glue Studio console, modify the source S3 location to s3://noaa-ghcn-pds/csv/by_year/2023.csv, then save and run the job.

upsert-data-source

  1. Run the same Athena query from the Athena console.

athena-query3
Now you see that the four records have been updated with the new records in 2023.

If you have further future records, this approach works well to upsert new records based on the Hudi record key and Hudi precombine key.

Clean up

Now to the final step, cleaning up the resources:

  1. Delete the AWS Glue database hudi_native.
  2. Delete the AWS Glue table ghcn.
  3. Delete the S3 objects under s3://<Your S3 bucket name>/<Your S3 bucket prefix>/hudi_native/ghcn2022/.

Conclusion

This post demonstrated how to process Hudi datasets using the AWS Glue Studio visual editor. The AWS Glue Studio visual editor enables you to author jobs while taking advantage of data lake formats and without needing expertise in them. If you have comments or feedback, please feel free to leave them in the comments.


About the authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his new road bike.

Scott Long is a Front End Engineer on the AWS Glue team. He is responsible for implementing new features in AWS Glue Studio. In his spare time, he enjoys socializing with friends and participating in various outdoor activities.

Sean Ma is a Principal Product Manager on the AWS Glue team. He has an 18+ year track record of innovating and delivering enterprise products that unlock the power of data for users. Outside of work, Sean enjoys scuba diving and college football.

Introducing native Delta Lake table support with AWS Glue crawlers

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/introducing-native-delta-lake-table-support-with-aws-glue-crawlers/

Delta Lake is an open-source project that helps implement modern data lake architectures commonly built on Amazon S3 or other cloud storages. With Delta Lake, you can achieve ACID transactions, time travel queries, CDC, and other common use cases on the cloud. Delta Lake is available with multiple AWS services, such as AWS Glue Spark jobs, Amazon EMR, Amazon Athena, and Amazon Redshift Spectrum.

AWS Glue includes Delta crawler, a capability that makes discovering datasets simpler by scanning Delta Lake transaction logs in Amazon Simple Storage Service (Amazon S3), extracting their schema, creating manifest files in Amazon S3, and automatically populating the AWS Glue Data Catalog, which keeps the metadata current.  The newly created AWS Glue Data Catalog table has format SymlinkTextInputFormat. Delta crawler creates a manifest file, which is a text file containing the list of data files that query engines such as Presto, Trino, or Athena can use to query the table rather than finding the files with the directory listing. A previous blog post demonstrated how it works. Manifest files needed to be regenerated on a periodic basis to include newer transactions in the original Delta Lake tables which resulted in expensive I/O operations, longer processing times, and increased storage footprint.

With today’s launch, Glue crawler is adding support for creating AWS Glue Data Catalog tables for native Delta Lake tables and does not require generating manifest files. This improves customer experience because now you don’t have to regenerate manifest files whenever a new partition becomes available or a table’s metadata changes. With the native Delta Lake tables and automatic schema evolution with no additional manual intervention, this reduces the time to insight by making newly ingested data quickly available for analysis with your preferred analytics and machine learning (ML) tools.

Amazon Athena SQL engine version 3 started supporting Delta Lake native connector. AWS Glue for Apache Spark also started supporting Delta Lake native connector in Glue version 3.0 and later. Amazon EMR started supporting Delta Lake in EMR release version 6.9.0 and later. It means that you can query the Delta transaction log directly in Amazon Athena, AWS Glue for Apache Spark, and Amazon EMR. It makes the experience of working with native Delta Lake tables seamless across the platforms.

This post demonstrates how AWS Glue crawlers work with native Delta Lake tables and describes typical use cases to query native Delta Lake tables.

How AWS Glue crawler works with native Delta Lake tables

Now AWS Glue crawler has two different options:

  • Native table: Create a native Delta Lake table definition on AWS Glue Data Catalog.
  • Symlink table: Create a symlink-based manifest table definition on AWS Glue Data Catalog from a Delta Lake table, and generate its symlink files on Amazon S3.

Native table

Native Delta Lake tables are accessible from Amazon Athena (engine version 3), AWS Glue for Apache Spark (Glue version 3.0 and later), Amazon EMR (release version 6.9.0 and later), and other platforms that support Delta Lake tables. With the native Delta Lake tables, you have the capabilities such as ACID transactions, all while needing to maintain just a single source of truth.

Symlink table

Symlink tables are a consistent snapshot of a native Delta Lake table, represented using the SymlinkTextInputFormat using parquet files. The symlink tables are accessible from Amazon Athena and Amazon Redshift Spectrum.

Since the symlink tables are a snapshot of the original native Delta Lake tables, you need to maintain both the original native Delta Lake tables and the symlink tables. When the data or schema in an original Delta Lake table is updated, the symlink tables in the AWS Glue Data Catalog may become out of sync. It means that you can still query the symlink table and get a consistent result, but the result of the table is at the previous point in time.

Crawl native Delta Lake tables using AWS Glue crawler

In this section, let’s go through how to crawl native Delta Lake tables using AWS Glue crawler.

Prerequisite

Here’s the prerequisite for this tutorial:

  1. Install and configure AWS Command Line Interface (AWS CLI).
  2. Create your S3 bucket if you do not have it.
  3. Create your IAM role for AWS Glue crawler if you do not have it.
  4. Run the following command to copy the sample Delta Lake table into your S3 bucket. (Replace your_s3_bucket with your S3 bucket name.)
$ aws s3 sync s3://aws-bigdata-blog/artifacts/delta-lake-crawler/sample_delta_table/ s3://your_s3_bucket/data/sample_delta_table

Create a Delta Lake crawler

A Delta Lake crawler can be created through the AWS Glue console, AWS Glue SDK, or AWS CLI. Specify a DeltaTarget with the following configurations:

  • DeltaTables – A list of S3 DeltaPaths where the Delta Lake tables are located. (Note that each path must be the parent of a _delta_log folder. If the Delta transaction log is located at s3://bucket/sample_delta_table/_delta_log, then the path s3://bucket/sample_delta_table/ should be provided.
  • WriteManifest – A Boolean value indicating whether or not the crawler should write the manifest files for each DeltaPath. This parameter is only applicable for Delta Lake tables created via manifest files
  • CreateNativeDeltaTable – A Boolean value indicating whether the crawler should create a native Delta Lake table. If set to False, the crawler would create a symlink table instead. Note that both WriteManifest and CreateNativeDeltaTable options can’t be set to True.
  • ConnectionName – An optional connection name stored in the Data Catalog that the crawler should use to access Delta Lake tables backed by a VPC.

In this instruction, create the crawler through the console. Complete the following steps to create a Delta Lake crawler:

  1. Open the AWS Glue console.
  2. Choose Crawlers.
  3. Choose Create crawler.
  4. For Name, enter delta-lake-native-crawler, and choose Next.
  5. Under Data sources, choose Add a data source.
  6. For Data source, select Delta Lake.
  7. For Include delta lake table path(s), enter s3://your_s3_bucket/data/sample_delta_table/.
  8. For Create tables for querying, choose Create Native tables,
  9. Choose Add a Delta Lake data source.
  10. Choose Next.
  11. For Existing IAM role, choose your IAM role, then choose Next.
  12. For Target database, choose Add database, then Add database dialog appears. For Database name, enter delta_lake_native, then choose Create. Choose Next.
  13. Choose Create crawler.
  14. The Delta Lake crawler can be triggered to run through the console or through the SDK or AWS CLI using the StartCrawl API. It could also be scheduled through the console to trigger the crawlers at specific times. In this instruction, run the crawler through the console.
  15. Select delta-lake-native-crawler, and choose Run.
  16. Wait for the crawler to complete.

After the crawler has run, you can see the Delta Lake table definition in the AWS Glue console:

You can also verify an AWS Glue table definition through the following AWS CLI command:

$ aws glue get-table --database delta_lake_native --name sample_delta_table
{
    "Table": {
        "Name": "sample_delta_table",
        "DatabaseName": "delta_lake_native",
        "Owner": "owner",
        "CreateTime": "2022-11-08T12:11:20+09:00",
        "UpdateTime": "2022-11-08T13:19:06+09:00",
        "LastAccessTime": "2022-11-08T13:19:06+09:00",
        "Retention": 0,
        "StorageDescriptor": {
            "Columns": [
                {
                    "Name": "product_id",
                    "Type": "string"
                },
                {
                    "Name": "product_name",
                    "Type": "string"
                },
                {
                    "Name": "price",
                    "Type": "bigint"
                },
                {
                    "Name": "currency",
                    "Type": "string"
                },
                {
                    "Name": "category",
                    "Type": "string"
                },
                {
                    "Name": "updated_at",
                    "Type": "double"
                }
            ],
            "Location": "s3://your_s3_bucket/data/sample_delta_table/",
            "AdditionalLocations": [],
            "InputFormat": "org.apache.hadoop.mapred.SequenceFileInputFormat",
            "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat",
            "Compressed": false,
            "NumberOfBuckets": -1,
            "SerdeInfo": {
                "SerializationLibrary": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
                "Parameters": {
                    "serialization.format": "1",
                    "path": "s3://your_s3_bucket/data/sample_delta_table/"
                }
            },
            "BucketColumns": [],
            "SortColumns": [],
            "Parameters": {
                "EXTERNAL": "true",
                "UPDATED_BY_CRAWLER": "delta-lake-native-connector",
                "spark.sql.sources.schema.part.0": "{\"type\":\"struct\",\"fields\":[{\"name\":\"product_id\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"product_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"price\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"CURRENCY\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"category\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"updated_at\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}}]}",
                "CrawlerSchemaSerializerVersion": "1.0",
                "CrawlerSchemaDeserializerVersion": "1.0",
                "spark.sql.partitionProvider": "catalog",
                "classification": "delta",
                "spark.sql.sources.schema.numParts": "1",
                "spark.sql.sources.provider": "delta",
                "delta.lastCommitTimestamp": "1653462383292",
                "delta.lastUpdateVersion": "6",
                "table_type": "delta"
            },
            "StoredAsSubDirectories": false
        },
        "PartitionKeys": [],
        "TableType": "EXTERNAL_TABLE",
        "Parameters": {
            "EXTERNAL": "true",
            "UPDATED_BY_CRAWLER": "delta-lake-native-connector",
            "spark.sql.sources.schema.part.0": "{\"type\":\"struct\",\"fields\":[{\"name\":\"product_id\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"product_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"price\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"CURRENCY\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"category\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"updated_at\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}}]}",
            "CrawlerSchemaSerializerVersion": "1.0",
            "CrawlerSchemaDeserializerVersion": "1.0",
            "spark.sql.partitionProvider": "catalog",
            "classification": "delta",
            "spark.sql.sources.schema.numParts": "1",
            "spark.sql.sources.provider": "delta",
            "delta.lastCommitTimestamp": "1653462383292",
            "delta.lastUpdateVersion": "6",
            "table_type": "delta"
        },
        "CreatedBy": "arn:aws:sts::012345678901:assumed-role/AWSGlueServiceRole/AWS-Crawler",
        "IsRegisteredWithLakeFormation": false,
        "CatalogId": "012345678901",
        "IsRowFilteringEnabled": false,
        "VersionId": "1",
        "DatabaseId": "0bd458e335a2402c828108f267bc770c"
    }
}

After you create the table definition on AWS Glue Data Catalog, AWS analytics services such as Athena and AWS Glue Spark jobs are able to query the Delta Lake table.

Query Delta Lake tables using Amazon Athena

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon Simple Storage Service (Amazon S3) using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run on datasets at petabyte scale. You can use Athena to query your S3 data lake for use cases such as data exploration for machine learning (ML) and AI, business intelligence (BI) reporting, and ad hoc querying.

There are now two ways to use Delta Lake tables in Athena:

  • For native table: Use Athena’s newly launched native support for Delta Lake tables. You can learn more in Querying Delta Lake tables. This method no longer requires regenerating manifest files after every transaction. Data updates are available for queries in Athena as soon as they are performed in the original Delta Lake tables, and you get up to 40 percent improvement in query performance over querying manifest files. Since Athena optimizes data scans in native Delta Lake queries using statistics in Delta Lake files, you get the advantage of reduced cost for Athena queries. This post focuses on this approach.
  • For symlink table: Use SymlinkTextInputFormat to query symlink tables through manifest files generated from Delta Lake tables. This was previously the only manner in which Delta Lake table querying was supported via Athena and is no longer recommended when you use only Athena to query the Delta Lake tables.

To use the native Delta Lake connector in Athena, you need to use Athena engine version 3. If you are using an older engine version, change the engine version.

Complete following steps to start queries on Athena:

  1. Open the Amazon Athena console.
  2. Run the following query.
SELECT * FROM "delta_lake_native"."sample_delta_table" limit 10;

The following screenshot shows our output:

Query Delta Lake tables using AWS Glue for Apache Spark

AWS Glue for Apache Spark natively supports Delta Lake. AWS Glue version 3.0 (Apache Spark 3.1.1) supports Delta Lake 1.0.0, and AWS Glue version 4.0 (Apache Spark 3.3.0) supports Delta Lake 2.1.0. With this native support for Delta Lake, what you need for configuring Delta Lake is to provide a single job parameter --datalake-formats delta. There is no need to configure a separate connector for Delta Lake in AWS Marketplace. It reduces the configuration steps required to use these frameworks in AWS Glue for Apache Spark.

AWS Glue also provides a serverless notebook interface called AWS Glue Studio notebook to query and process data interactively. Complete the following steps to launch AWS Glue Studio notebook and query a Delta Lake table:

  1. On the AWS Glue console, choose Jobs in the navigation plane.
  2. Under Create job, select Jupyter Notebook.
  3. Choose Create a new notebook from scratch, and choose Create.
  4. For Job name, enter delta-sql.
  5. For IAM role,  choose your IAM role. If you don’t have your own role for the AWS Glue job, create it by following the steps documented in the AWS Glue Developer Guide.
  6. Choose Start notebook job.
  7. Copy and paste the following code to the first cell and run the cell.
    %glue_version 3.0
    %%configure
    {
      "--datalake-formats": "delta"
    }

  8. Run the existing cell containing the following code.
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
      
    sc = SparkContext.getOrCreate()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)

  9. Copy and paste the following code to the third cell and run the cell.
    %%sql
    SELECT * FROM `delta_lake_native`.`sample_delta_table` limit 10

The following screenshot shows our output:

Clean up

Now for the final step, cleaning up the resources:

  • Delete your data under your S3 path: s3://your_s3_bucket/data/sample_delta_table/.
  • Delete the AWS Glue crawler delta-lake-native-crawler.
  • Delete the AWS Glue database delta_lake_native.
  • Delete the AWS Glue notebook job delta-sql.

Conclusion

This post demonstrated how to crawl native Delta Lake tables using an AWS Glue crawler and how to query the crawled tables from Athena and Glue Spark jobs. Start using AWS Glue crawlers for your own native Delta Lake tables.

If you have comments or feedback, please feel free to leave them in the comments.


About the authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Kyle Duong is a Software Development Engineer on the AWS Glue and Lake Formation team. He is passionate about building big data technologies and distributed systems. In his free time, he enjoys cycling or playing basketball.

Sandeep Adwankar is a Senior Technical Product Manager at AWS. Based in the California Bay Area, he works with customers around the globe to translate business and technical requirements into products that enable customers to improve how they manage, secure, and access data.

Introducing the Cloud Shuffle Storage Plugin for Apache Spark

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/introducing-the-cloud-shuffle-storage-plugin-for-apache-spark/

AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning (ML), and application development. In AWS Glue, you can use Apache Spark, an open-source, distributed processing system for your data integration tasks and big data workloads.

Apache Spark utilizes in-memory caching and optimized query execution for fast analytic queries against your datasets, which are split into multiple Spark partitions on different nodes so that you can process a large amount of data in parallel. In Apache Spark, shuffling happens when data needs to be redistributed across the cluster. During a shuffle, data is written to local disk and transferred across the network. The shuffle operation is often constrained by the available local disk capacity, or data skew, which can cause straggling executors. Spark often throws a No space left on device or MetadataFetchFailedException error when there isn’t enough disk space left on the executor and there is no recovery. Such Spark jobs can’t typically succeed without adding additional compute and attached storage, wherein compute is often idle, and results in additional cost.

In 2021, we launched Amazon S3 shuffle for AWS Glue 2.0 with Spark 2.4. This feature disaggregated Spark compute and shuffle storage by utilizing Amazon Simple Storage Service (Amazon S3) to store Spark shuffle files. Using Amazon S3 for Spark shuffle storage enabled you to run data-intensive workloads more reliably. After the launch, we continued investing in this area, and collected customer feedback.

Today, we’re pleased to release Cloud Shuffle Storage Plugin for Apache Spark. It supports the latest Apache Spark 3.x distribution so you can take advantage of the plugin in AWS Glue or any other Spark environments. It’s now also natively available to use in AWS Glue Spark jobs on AWS Glue 3.0 and the latest AWS Glue version 4.0 without requiring any extra setup or bringing external libraries. Like the Amazon S3 shuffle for AWS Glue 2.0, the Cloud Shuffle Storage Plugin helps you solve constrained disk space errors during shuffle in serverless Spark environments.

We’re also excited to announce the release of software binaries for the Cloud Shuffle Storage Plugin for Apache Spark under the Apache 2.0 license. You can download the binaries and run them on any Spark environment. The new plugin is open-cloud, comes with out-of-the box support for Amazon S3, and can be easily configured to use other forms of cloud storage such as Google Cloud Storage and Microsoft Azure Blob Storage.

Understanding a shuffle operation in Apache Spark

In Apache Spark, there are two types of transformations:

  • Narrow transformation – This includes map, filter, union, and mapPartition, where each input partition contributes to only one output partition.
  • Wide transformation – This includes join, groupBykey, reduceByKey, and repartition, where each input partition contributes to many output partitions. Spark SQL queries including JOIN, ORDER BY, GROUP BY require wide transformations.

A wide transformation triggers a shuffle, which occurs whenever data is reorganized into new partitions with each key assigned to one of them. During a shuffle phase, all Spark map tasks write shuffle data to a local disk that is then transferred across the network and fetched by Spark reduce tasks. The volume of data shuffled is visible in the Spark UI. When shuffle writes take up more space than the local available disk capacity, it causes a No space left on device error.

To illustrate one of the typical scenarios, let’s use the query q80.sql from the standard TPC-DS 3 TB dataset as an example. This query attempts to calculate the total sales, returns, and eventual profit realized during a specific time frame. It involves multiple wide transformations (shuffles) caused by left outer join and group by.

Let’s run the following query on AWS Glue 3.0 job with 10 G1.X workers where a total of 640GB of local disk space is available:

with ssr as
 (select  s_store_id as store_id,
          sum(ss_ext_sales_price) as sales,
          sum(coalesce(sr_return_amt, 0)) as returns,
          sum(ss_net_profit - coalesce(sr_net_loss, 0)) as profit
  from store_sales left outer join store_returns on
         (ss_item_sk = sr_item_sk and ss_ticket_number = sr_ticket_number),
     date_dim, store, item, promotion
 where ss_sold_date_sk = d_date_sk
       and d_date between cast('2000-08-23' as date)
                  and (cast('2000-08-23' as date) + interval '30' day)
       and ss_store_sk = s_store_sk
       and ss_item_sk = i_item_sk
       and i_current_price > 50
       and ss_promo_sk = p_promo_sk
       and p_channel_tv = 'N'
 group by s_store_id),
 csr as
 (select  cp_catalog_page_id as catalog_page_id,
          sum(cs_ext_sales_price) as sales,
          sum(coalesce(cr_return_amount, 0)) as returns,
          sum(cs_net_profit - coalesce(cr_net_loss, 0)) as profit
  from catalog_sales left outer join catalog_returns on
         (cs_item_sk = cr_item_sk and cs_order_number = cr_order_number),
     date_dim, catalog_page, item, promotion
 where cs_sold_date_sk = d_date_sk
       and d_date between cast('2000-08-23' as date)
                  and (cast('2000-08-23' as date) + interval '30' day)
        and cs_catalog_page_sk = cp_catalog_page_sk
       and cs_item_sk = i_item_sk
       and i_current_price > 50
       and cs_promo_sk = p_promo_sk
       and p_channel_tv = 'N'
 group by cp_catalog_page_id),
 wsr as
 (select  web_site_id,
          sum(ws_ext_sales_price) as sales,
          sum(coalesce(wr_return_amt, 0)) as returns,
          sum(ws_net_profit - coalesce(wr_net_loss, 0)) as profit
  from web_sales left outer join web_returns on
         (ws_item_sk = wr_item_sk and ws_order_number = wr_order_number),
     date_dim, web_site, item, promotion
 where ws_sold_date_sk = d_date_sk
       and d_date between cast('2000-08-23' as date)
                  and (cast('2000-08-23' as date) + interval '30' day)
        and ws_web_site_sk = web_site_sk
       and ws_item_sk = i_item_sk
       and i_current_price > 50
       and ws_promo_sk = p_promo_sk
       and p_channel_tv = 'N'
 group by web_site_id)
 select channel, id, sum(sales) as sales, sum(returns) as returns, sum(profit) as profit
 from (select
        'store channel' as channel, concat('store', store_id) as id, sales, returns, profit
      from ssr
      union all
      select
        'catalog channel' as channel, concat('catalog_page', catalog_page_id) as id,
        sales, returns, profit
      from csr
      union all
      select
        'web channel' as channel, concat('web_site', web_site_id) as id, sales, returns, profit
      from  wsr) x
 group by rollup (channel, id)
 order by channel, id

The following screenshot shows the Executor tab in the Spark UI.
Spark UI Executor Tab

The following screenshot shows the status of Spark jobs included in the AWS Glue job run.
Spark UI Jobs
In the failed Spark job (job ID=7), we can see the failed Spark stage in the Spark UI.
Spark UI Failed stage
There was 167.8GiB shuffle write during the stage, and 14 tasks failed due to the error java.io.IOException: No space left on device because the host 172.34.97.212 ran out of local disk.
Spark UI Tasks

Cloud Shuffle Storage for Apache Spark

Cloud Shuffle Storage for Apache Spark allows you to store Spark shuffle files on Amazon S3 or other cloud storage services. This gives complete elasticity to Spark jobs, thereby allowing you to run your most data intensive workloads reliably. The following figure illustrates how Spark map tasks write the shuffle files to the Cloud Shuffle Storage. Reducer tasks consider the shuffle blocks as remote blocks and read them from the same shuffle storage.

This architecture enables your serverless Spark jobs to use Amazon S3 without the overhead of running, operating, and maintaining additional storage or compute nodes.
Chopper diagram
The following Glue job parameters enable and tune Spark to use S3 buckets for storing shuffle data. You can also enable at-rest encryption when writing shuffle data to Amazon S3 by using security configuration settings.

Key Value Explanation
--write-shuffle-files-to-s3 TRUE This is the main flag, which tells Spark to use S3 buckets for writing and reading shuffle data.
--conf spark.shuffle.storage.path=s3://<shuffle-bucket> This is optional, and specifies the S3 bucket where the plugin writes the shuffle files. By default, we use –TempDir/shuffle-data.

The shuffle files are written to the location and create files such as following:

s3://<shuffle-storage-path>/<Spark application ID>/[0-9]/<Shuffle ID>/shuffle_<Shuffle ID>_<Mapper ID>_0.data

With the Cloud Shuffle Storage plugin enabled and using the same AWS Glue job setup, the TPC-DS query now succeeded without any job or stage failures.
Spark UI Jobs with Chopper plugin

Software binaries for the Cloud Shuffle Storage Plugin

You can now also download and use the plugin in your own Spark environments and with other cloud storage services. The plugin binaries are available for use under the Apache 2.0 license.

Bundle the plugin with your Spark applications

You can bundle the plugin with your Spark applications by adding it as a dependency in your Maven pom.xml as you develop your Spark applications, as shown in the follwoing code. For more details on the plugin and Spark versions, refer to Plugin versions.

<repositories>
   ...
    <repository>
        <id>aws-glue-etl-artifacts</id>
        <url>https://aws-glue-etl-artifacts.s3.amazonaws.com/release/</url>
    </repository>
</repositories>
...
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>chopper-plugin</artifactId>
    <version>3.1-amzn-LATEST</version>
</dependency>

You can alternatively download the binaries from AWS Glue Maven artifacts directly and include them in your Spark application as follows:

#!/bin/bash
sudo wget -v https://aws-glue-etl-artifacts.s3.amazonaws.com/release/com/amazonaws/chopper-plugin/3.1-amzn-LATEST/chopper-plugin-3.1-amzn-LATEST.jar -P /usr/lib/spark/jars

Submit the Spark application by including the JAR files on the classpath and specifying the two Spark configs for the plugin:

spark-submit --deploy-mode cluster \
--conf spark.shuffle.sort.io.plugin.class=com.amazonaws.spark.shuffle.io.cloud.ChopperPlugin \
--conf spark.shuffle.storage.path=s3://<s3 bucket>/<shuffle-dir> \
 --class <your class> <your application jar> 

The following Spark parameters enable and configure Spark to use an external storage URI such as Amazon S3 for storing shuffle files; the URI protocol determines which storage system to use.

Key Value Explanation
spark.shuffle.storage.path s3://<shuffle-storage-path> It specifies an URI where the shuffle files are stored, which much be a valid Hadoop FileSystem and be configured as needed
spark.shuffle.sort.io.plugin.class com.amazonaws.spark.shuffle.io.cloud.ChopperPlugin The entry class in the plugin

Other cloud storage integration

This plugin comes with out-of-the box support for Amazon S3 and can also be configured to use other forms of cloud storage such as Google Cloud Storage and Microsoft Azure Blob Storage. To enable other Hadoop FileSystem compatible cloud storage services, you can simply add a storage URI for the corresponding service scheme, such as gs:// for Google Cloud Storage instead of s3:// for Amazon S3, add the FileSystem JAR files for the service, and set the appropriate authentication configurations.

For more information about how to integrate the plugin with Google Cloud Storage and Microsoft Azure Blob Storage, refer to Using AWS Glue Cloud Shuffle Plugin for Apache Spark with Other Cloud Storage Services.

Best practices and considerations

Note the following considerations:

  • This feature replaces local shuffle storage with Amazon S3. You can use it to address common failures with price/performance benefits for your serverless analytics jobs and pipelines. We recommend enabling this feature when you want to ensure reliable runs of your data-intensive workloads that create a large amount of shuffle data or when you’re getting No space left on device error. You can also use this plugin if your job encounters fetch failures org.apache.spark.shuffle.MetadataFetchFailedException or if your data is skewed.
  • We recommend setting S3 bucket lifecycle policies on the shuffle bucket (spark.shuffle.storage.s3.path) in order to clean up old shuffle data automatically.
  • The shuffle data on Amazon S3 is encrypted by default. You can also encrypt the data with your own AWS Key Management Service (AWS KMS) keys.

Conclusion

This post introduced the new Cloud Shuffle Storage Plugin for Apache Spark and described its benefits to independently scale storage in your Spark jobs without adding additional workers. With this plugin, you can expect jobs processing terabytes of data to run much more reliably.

The plugin is available in AWS Glue 3.0 and 4.0 Spark jobs in all AWS Glue supported Regions. We’re also releasing the plugin’s software binaries under the Apache 2.0 license. You can use the plugin in AWS Glue or other Spark environments. We look forward to hearing your feedback.


About the Authors

Noritaka Sekiyama s a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts that help customers build data lakes on the cloud.

Rajendra Gujja is a Senior Software Development Engineer on the AWS Glue team. He is passionate about distributed computing and everything and anything about the data.

Chuhan Liu is a Software Development Engineer on the AWS Glue team.

Gonzalo Herreros is a Senior Big Data Architect on the AWS Glue team.

Mohit Saxena is a Senior Software Development Manager on the AWS Glue team. His team focuses on building distributed systems to enable customers with data integration and connectivity to a variety of sources, efficiently manage data lakes on Amazon S3, and optimizes Apache Spark for fault-tolerance with ETL workloads.

Process Apache Hudi, Delta Lake, Apache Iceberg dataset at scale, part 2: Using AWS Glue Studio Visual Editor

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/part-2-integrate-apache-hudi-delta-lake-apache-iceberg-dataset-at-scale-using-aws-glue-studio-visual-editor/

Transactional data lake technologies such as Apache Hudi, Delta Lake, Apache Iceberg, and AWS Lake Formation governed tables is evolving rapidly, and gaining great popularity. These technologies simplified the data processing pipeline significantly, and they provided further useful capabilities like upserts, rolling back, and time travel queries.

In the first post of this series, we went through how to process Apache Hudi, Delta Lake, and Apache Iceberg datasets using AWS Glue connectors. AWS Glue simplifies reading and writing your data in those data lake formats, and building the data lakes on top of those technologies. Running the sample notebooks on AWS Glue Studio notebook, you could interactively develop and run your code, then immediately see the results. The notebooks let you explore how those technologies work when you have coding experience.

This second post focuses on other use cases for customers who prefer visual job authoring without writing custom code. Even without coding experience, you can easily build your transactional data lakes on AWS Glue Studio visual editor, and take advantage of those transactional data lake technologies. In addition, you can also use Amazon Athena to query the data stored using Hudi and Iceberg. This tutorial demonstrates how to read and write each format on AWS Glue Studio visual editor, and then how to query from Athena.

Process Apache Hudi, Delta Lake, Apache Iceberg dataset at scale

Prerequisites

The following are the instructions to read/write tables using each data lake format on AWS Glue Studio Visual Editor. You can use any of the marketplace connector or the custom connector based on your requirements.

To continue this tutorial, you must create the following AWS resources in advance:

Reads/writes using the connector on AWS Glue Studio Visual Editor

In this tutorial, you read and write each of the transaction data lake format data on the AWS Glue Studio Visual Editor. There are three main configurations: connection, connection options, and job parameters that you must configure per the data lake format. Note that no code is included in this tutorial. Let’s see how it works.

Apache Hudi writes

Complete following steps to write into Apache Hudi table using the connector:

  1. Open AWS Glue Studio.
  2. Choose Jobs.
  3. Choose Visual with a source and target.
  4. For Source, choose Amazon S3.
  5. For Target, choose hudi-0101-byoc-connector.
  6. Choose Create.
  7. Under Visual, choose Data source – S3 bucket.
  8. Under Node properties, for S3 source type, choose S3 location.
  9. For S3 URL, enter s3://covid19-lake/rearc-covid-19-world-cases-deaths-testing/json/.
  10. Choose Data target – Connector.
  11. Under Node properties, for Connection, choose hudi-0101-byoc-connection.
  12. For Connection options, enter the following pairs of Key and Value (choose Add new option to enter a new pair).
    1. Key: path. Value: <Your S3 path for Hudi table location>
    2. Key: hoodie.table.name, Value: test
    3. Key: hoodie.datasource.write.storage.type, Value: COPY_ON_WRITE
    4. Key: hoodie.datasource.write.operation, Value: upsert
    5. Key: hoodie.datasource.write.recordkey.field, Value: location
    6. Key: hoodie.datasource.write.precombine.field, Value: date
    7. Key: hoodie.datasource.write.partitionpath.field, Value: iso_code
    8. Key: hoodie.datasource.write.hive_style_partitioning, Value: true
    9. Key: hoodie.datasource.hive_sync.enable, Value: true
    10. Key: hoodie.datasource.hive_sync.database, Value: hudi
    11. Key: hoodie.datasource.hive_sync.table, Value: test
    12. Key: hoodie.datasource.hive_sync.partition_fields, Value: iso_code
    13. Key: hoodie.datasource.hive_sync.partition_extractor_class, Value: org.apache.hudi.hive.MultiPartKeysValueExtractor
    14. Key: hoodie.datasource.hive_sync.use_jdbc, Value: false
    15. Key: hoodie.datasource.hive_sync.mode, Value: hms
  13. Under Job details, for IAM Role, choose your IAM role.
  14. Under Advanced properties, for Job parameters, choose Add new parameter.
  15. For Key, enter --conf.
  16. For Value, enter spark.serializer=org.apache.spark.serializer.KryoSerializer.
  17. Choose Save.
  18. Choose Run.

Apache Hudi reads

Complete following steps to read from the Apache Hudi table that you created in the previous section using the connector:

  1. Open AWS Glue Studio.
  2. Choose Jobs.
  3. Choose Visual with a source and target.
  4. For Source, choose hudi-0101-byoc-connector.
  5. For Target, choose Amazon S3.
  6. Choose Create.
  7. Under Visual, choose Data source – Connection.
  8. Under Node properties, for Connection, choose hudi-0101-byoc-connection.
  9. For Connection options, choose Add new option.
  10. For Key, enter path. For Value, enter your S3 path for your Hudi table that you created in the previous section.
  11. Choose Transform – ApplyMapping, and choose Remove.
  12. Choose Data target – S3 bucket.
  13. Under Data target properties, for Format, choose JSON.
  14. For S3 Target type. choose S3 location.
  15. For S3 Target Location enter your S3 path for output location.
  16. Under Job details, for IAM Role, choose your IAM role.
  17. Choose Save.
  18. Choose Run.

Delta Lake writes

Complete the following steps to write into the Delta Lake table using the connector:

  1. Open AWS Glue Studio.
  2. Choose Jobs.
  3. Choose Visual with a source and target.
  4. For Source, choose Amazon S3.
  5. For Target, choose delta-100-byoc-connector.
  6. Choose Create.
  7. Under Visual, choose Data source – S3 bucket.
  8. Under Node properties, for S3 source type, choose S3 location.
  9. For S3 URL, enter s3://covid19-lake/rearc-covid-19-world-cases-deaths-testing/json/.
  10. Choose Data target – Connector.
  11. Under Node properties, for Connection, choose your delta-100-byoc-connection.
  12. For Connection options, choose Add new option.
  13. For Key, enter path. For Value, enter your S3 path for Delta table location. Choose Add new option.
  14. For Key, enter partitionKeys. For Value, enter iso_code.
  15. Under Job details, for IAM Role, choose your IAM role.
  16. Under Advanced properties, for Job parameters, choose Add new parameter.
  17. For Key, enter --conf.
  18. For Value, enter spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog.
  19. Choose Save.
  20. Choose Run.

Delta Lake reads

Complete the following steps to read from the Delta Lake table that you created in the previous section using the connector:

  1. Open AWS Glue Studio.
  2. Choose Jobs.
  3. Choose Visual with a source and target.
  4. For Source, choose delta-100-byoc-connector.
  5. For Target, choose Amazon S3.
  6. Choose Create.
  7. Under Visual, choose Data source – Connection.
  8. Under Node properties, for Connection, choose delta-100-byoc-connection.
  9. For Connection options, choose Add new option.
  10. For Key, enter path. For Value, enter your S3 path for Delta table that you created in the previous section. Choose Add new option.
  11. For Key, enter partitionKeys. For Value, enter iso_code.
  12. Choose Transform – ApplyMapping, and choose Remove.
  13. Choose Data target – S3 bucket.
  14. Under Data target properties, for Format, choose JSON.
  15. For S3 Target type, choose S3 location.
  16. For S3 Target Location enter your S3 path for output location.
  17. Under Job details, for IAM Role, choose your IAM role.
  18. Under Advanced properties, for Job parameters, choose Add new parameter.
  19. For Key, enter --conf.
  20. For Value, enter spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog.
  21. Choose Save.
  22. Choose Run.

Apache Iceberg writes

Complete the following steps to write into Apache Iceberg table using the connector:

  1. Open AWS Glue console.
  2. Choose Databases.
  3. Choose Add database.
  4. For database name, enter iceberg, and choose Create.
  5. Open AWS Glue Studio.
  6. Choose Jobs.
  7. Choose Visual with a source and target.
  8. For Source, choose Amazon S3.
  9. For Target, choose iceberg-0131-byoc-connector.
  10. Choose Create.
  11. Under Visual, choose Data source – S3 bucket.
  12. Under Node properties, for S3 source type, choose S3 location.
  13. For S3 URL, enter s3://covid19-lake/rearc-covid-19-world-cases-deaths-testing/json/.
  14. Choose Data target – Connector.
  15. Under Node properties, for Connection, choose iceberg-0131-byoc-connection.
  16. For Connection options, choose Add new option.
  17. For Key, enter path. For Value, enter glue_catalog.iceberg.test.
  18. Choose SQL under Transform to create a new AWS Glue Studio node.
  19. Under Node properties, for Node parents, choose ApplyMapping.
  20. Under Transform, for SQL alias, verify that myDataSource is entered.
  21. For SQL query, enter CREATE TABLE glue_catalog.iceberg.test AS SELECT * FROM myDataSource WHERE 1=2. This is to create a table definition with no records because the Iceberg target requires table definition before data ingestion.
  22. Under Job details, for IAM Role, choose your IAM role.
  23. Under Advanced properties, for Job parameters, choose Add new parameter.
  24. For Key, enter --conf.
  25. For Value, enter the following value (replace the placeholder your_s3_bucket with your S3 bucket name): spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.warehouse=s3://your_s3_bucket/iceberg/warehouse --conf spark.sql.catalog.glue_catalog.catalog-impl --conf park.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.catalog.glue_catalog.lock-impl=org.apache.iceberg.aws.glue.DynamoLockManager --conf spark.sql.catalog.glue_catalog.lock.table=iceberg_lock --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
  26. Choose Save.
  27. Choose Run.

Apache Iceberg reads

Complete the following steps to read from Apache Iceberg table that you created in the previous section using the connector:

  1. Open AWS Glue Studio.
  2. Choose Jobs.
  3. Choose Visual with a source and target.
  4. For Source, choose Apache Iceberg Connector for AWS Glue 3.0.
  5. For Target, choose Amazon S3.
  6. Choose Create.
  7. Under Visual, choose Data source – Connection.
  8. Under Node properties, for Connection, choose your Iceberg connection name.
  9. For Connection options, choose Add new option.
  10. For Key, enter path. For Value, enter glue_catalog.iceberg.test.
  11. Choose Transform – ApplyMapping, and choose Remove.
  12. Choose Data target – S3 bucket.
  13. Under Data target properties, for Format, choose JSON.
  14. For S3 Target type, choose S3 location.
  15. For S3 Target Location enter your S3 path for the output location.
  16. Under Job details, for IAM Role, choose your IAM role.
  17. Under Advanced properties, for Job parameters, choose Add new parameter.
  18. For Key, enter --conf.
  19. For Value, enter the following value (replace the placeholder your_s3_bucket with your S3 bucket name): spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.warehouse=s3://your_s3_bucket/iceberg/warehouse --conf spark.sql.catalog.glue_catalog.catalog-impl --conf park.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.catalog.glue_catalog.lock-impl=org.apache.iceberg.aws.glue.DynamoLockManager --conf spark.sql.catalog.glue_catalog.lock.table=iceberg_lock --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
  20. Choose Save.
  21. Choose Run.

Query from Athena

The Hudi table and the iceberg tables created with the above instructions are also queryable from Athena.

  1. Open the Athena console.
  2. Run the following SQL to query the Hudi table:
    SELECT * FROM "hudi"."test" LIMIT 10

  3. Run the following SQL to query the Iceberg table:
    SELECT * FROM "iceberg"."test" LIMIT 10

If you want to query the Delta table from Athena, follow Presto, Trino, and Athena to Delta Lake integration using manifests.

Conclusion

This post summarized how to utilize Apache Hudi, Delta Lake, and Apache Iceberg on AWS Glue platform, as well as demonstrated how each format works with the AWS Glue Studio Visual Editor. You can start using those data lake formats easily in any of the AWS Glue DynamicFrames, Spark DataFrames, and Spark SQL on the AWS Glue jobs, the AWS Glue Studio notebooks, and the AWS Glue Studio visual editor.


About the Author

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He enjoys collaborating with different teams to deliver results like this post. In his spare time, he enjoys playing video games with his family.

Process Apache Hudi, Delta Lake, Apache Iceberg datasets at scale, part 1: AWS Glue Studio Notebook

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/part-1-integrate-apache-hudi-delta-lake-apache-iceberg-datasets-at-scale-aws-glue-studio-notebook/

Cloud data lakes provides a scalable and low-cost data repository that enables customers to easily store data from a variety of data sources. Data scientists, business analysts, and line of business users leverage data lake to explore, refine, and analyze petabytes of data. AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development. Customers use AWS Glue to discover and extract data from a variety of data sources, enrich and cleanse the data before storing it in data lakes and data warehouses.

Over years, many table formats have emerged to support ACID transaction, governance, and catalog usecases. For example, formats such as Apache Hudi, Delta Lake, Apache Iceberg, and AWS Lake Formation governed tables, enabled customers to run ACID transactions on Amazon Simple Storage Service (Amazon S3). AWS Glue supports these table formats for batch and streaming workloads. This post focuses on Apache Hudi, Delta Lake, and Apache Iceberg, and summarizes how to use them in AWS Glue 3.0 jobs. If you’re interested in AWS Lake Formation governed tables, then visit Effective data lakes using AWS Lake Formation series.

Bring libraries for the data lake formats

Today, there are three available options for bringing libraries for the data lake formats on the AWS Glue job platform: Marketplace connectors, custom connectors (BYOL), and extra library dependencies.

Marketplace connectors

AWS Glue Connector Marketplace is the centralized repository for cataloging the available Glue connectors provided by multiple vendors. You can subscribe to more than 60 connectors offered in AWS Glue Connector Marketplace as of today. There are marketplace connectors available for Apache Hudi, Delta Lake, and Apache Iceberg. Furthermore, the marketplace connectors are hosted on Amazon Elastic Container Registry (Amazon ECR) repository, and downloaded to the Glue job system in runtime. When you prefer simple user experience by subscribing to the connectors and using them on your Glue ETL jobs, the marketplace connector is a good option.

Custom connectors as bring-your-own-connector (BYOC)

AWS Glue custom connector enables you to upload and register your own libraries located in Amazon S3 as Glue connectors. You have more control over the library versions, patches, and dependencies. Since it uses your S3 bucket, you can configure the S3 bucket policy to share the libraries only with specific users, you can configure private network access to download the libraries using VPC Endpoints, etc. When you prefer having more control over those configurations, the custom connector as BYOC is a good option.

Extra library dependencies

There is another option – to download the data lake format libraries, upload them to your S3 bucket, and add extra library dependencies to them. With this option, you can add libraries directly to the job without a connector and use them. In Glue job, you can configure in Dependent JARs path. In API, it’s the --extra-jars parameter. In Glue Studio notebook, you can configure in the %extra_jars magic. To download the relevant JAR files, see the library locations in the section Create a Custom connection (BYOC).

Create a Marketplace connection

To create a new marketplace connection for Apache Hudi, Delta Lake, or Apache Iceberg, complete the following steps.

Apache Hudi 0.10.1

Complete the following steps to create a marketplace connection for Apache Hudi 0.10.1:

  1. Open AWS Glue Studio.
  2. Choose Connectors.
  3. Choose Go to AWS Marketplace.
  4. Search for Apache Hudi Connector for AWS Glue, and choose Apache Hudi Connector for AWS Glue.
  5. Choose Continue to Subscribe.
  6. Review the Terms and conditions, pricing, and other details, and choose the Accept Terms button to continue.
  7. Make sure that the subscription is complete and you see the Effective date populated next to the product, and then choose Continue to Configuration.
  8. For Delivery Method, choose Glue 3.0.
  9. For Software version, choose 0.10.1.
  10. Choose Continue to Launch.
  11. Under Usage instructions, choose Activate the Glue connector in AWS Glue Studio. You’re redirected to AWS Glue Studio.
  12. For Name, enter a name for your connection.
  13. Optionally, choose a VPC, subnet, and security group.
  14. Choose Create connection.

Delta Lake 1.0.0

Complete the following steps to create a marketplace connection for Delta Lake 1.0.0:

  1. Open AWS Glue Studio.
  2. Choose Connectors.
  3. Choose Go to AWS Marketplace.
  4. Search for Delta Lake Connector for AWS Glue, and choose Delta Lake Connector for AWS Glue.
  5. Choose Continue to Subscribe.
  6. Review the Terms and conditions, pricing, and other details, and choose the Accept Terms button to continue.
  7. Make sure that the subscription is complete and you see the Effective date populated next to the product, and then choose Continue to Configuration.
  8. For Delivery Method, choose Glue 3.0.
  9. For Software version, choose 1.0.0-2.
  10. Choose Continue to Launch.
  11. Under Usage instructions, choose Activate the Glue connector in AWS Glue Studio. You’re redirected to AWS Glue Studio.
  12. For Name, enter a name for your connection.
  13. Optionally, choose a VPC, subnet, and security group.
  14. Choose Create connection.

Apache Iceberg 0.12.0

Complete the following steps to create a marketplace connection for Apache Iceberg 0.12.0:

  1. Open AWS Glue Studio.
  2. Choose Connectors.
  3. Choose Go to AWS Marketplace.
  4. Search for Apache Iceberg Connector for AWS Glue, and choose Apache Iceberg Connector for AWS Glue.
  5. Choose Continue to Subscribe.
  6. Review the Terms and conditions, pricing, and other details, and choose the Accept Terms button to continue.
  7. Make sure that the subscription is complete and you see the Effective date populated next to the product, and then choose Continue to Configuration.
  8. For Delivery Method, choose Glue 3.0.
  9. For Software version, choose 0.12.0-2.
  10. Choose Continue to Launch.
  11. Under Usage instructions, choose Activate the Glue connector in AWS Glue Studio. You’re redirected to AWS Glue Studio.
  12. For Name, enter iceberg-0120-mp-connection.
  13. Optionally, choose a VPC, subnet, and security group.
  14. Choose Create connection.

Create a Custom connection (BYOC)

You can create your own custom connectors from JAR files. In this section, you can see the exact JAR files that are used in the marketplace connectors. You can just use the files for your custom connectors for Apache Hudi, Delta Lake, and Apache Iceberg.

To create a new custom connection for Apache Hudi, Delta Lake, or Apache Iceberg, complete the following steps.

Apache Hudi 0.9.0

Complete following steps to create a custom connection for Apache Hudi 0.9.0:

  1. Download the following JAR files, and upload them to your S3 bucket.
    1. https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3-bundle_2.12/0.9.0/hudi-spark3-bundle_2.12-0.9.0.jar
    2. https://repo1.maven.org/maven2/org/apache/hudi/hudi-utilities-bundle_2.12/0.9.0/hudi-utilities-bundle_2.12-0.9.0.jar
    3. https://repo1.maven.org/maven2/org/apache/parquet/parquet-avro/1.10.1/parquet-avro-1.10.1.jar
    4. https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.1/spark-avro_2.12-3.1.1.jar
    5. https://repo1.maven.org/maven2/org/apache/calcite/calcite-core/1.10.0/calcite-core-1.10.0.jar
    6. https://repo1.maven.org/maven2/org/datanucleus/datanucleus-core/4.1.17/datanucleus-core-4.1.17.jar
    7. https://repo1.maven.org/maven2/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar
  2. Open AWS Glue Studio.
  3. Choose Connectors.
  4. Choose Create custom connector.
  5. For Connector S3 URL, enter comma separated Amazon S3 paths for the above JAR files.
  6. For Name, enter hudi-090-byoc-connector.
  7. For Connector Type, choose Spark.
  8. For Class name, enter org.apache.hudi.
  9. Choose Create connector.
  10. Choose hudi-090-byoc-connector.
  11. Choose Create connection.
  12. For Name, enter hudi-090-byoc-connection.
  13. Optionally, choose a VPC, subnet, and security group.
  14. Choose Create connection.

Apache Hudi 0.10.1

Complete the following steps to create a custom connection for Apache Hudi 0.9.0:

  1. Download following JAR files, and upload them to your S3 bucket.
    1. hudi-utilities-bundle_2.12-0.10.1.jar
    2. hudi-spark3.1.1-bundle_2.12-0.10.1.jar
    3. spark-avro_2.12-3.1.1.jar
  2. Open AWS Glue Studio.
  3. Choose Connectors.
  4. Choose Create custom connector.
  5. For Connector S3 URL, enter comma separated Amazon S3 paths for the above JAR files.
  6. For Name, enter hudi-0101-byoc-connector.
  7. For Connector Type, choose Spark.
  8. For Class name, enter org.apache.hudi.
  9. Choose Create connector.
  10. Choose hudi-0101-byoc-connector.
  11. Choose Create connection.
  12. For Name, enter hudi-0101-byoc-connection.
  13. Optionally, choose a VPC, subnet, and security group.
  14. Choose Create connection.

Note that the above Hudi 0.10.1 installation on Glue 3.0 does not fully support Merge On Read (MoR) tables.

Delta Lake 1.0.0

Complete the following steps to create a custom connector for Delta Lake 1.0.0:

  1. Download the following JAR file, and upload it to your S3 bucket.
    1. https://repo1.maven.org/maven2/io/delta/delta-core_2.12/1.0.0/delta-core_2.12-1.0.0.jar
  2. Open AWS Glue Studio.
  3. Choose Connectors.
  4. Choose Create custom connector.
  5. For Connector S3 URL, enter a comma separated Amazon S3 path for the above JAR file.
  6. For Name, enter delta-100-byoc-connector.
  7. For Connector Type, choose Spark.
  8. For Class name, enter org.apache.spark.sql.delta.sources.DeltaDataSource.
  9. Choose Create connector.
  10. Choose delta-100-byoc-connector.
  11. Choose Create connection.
  12. For Name, enter delta-100-byoc-connection.
  13. Optionally, choose a VPC, subnet, and security group.
  14. Choose Create connection.

Apache Iceberg 0.12.0

Complete the following steps to create a custom connection for Apache Iceberg 0.12.0:

  1. Download the following JAR files, and upload them to your S3 bucket.
    1. https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-spark3-runtime/0.12.0/iceberg-spark3-runtime-0.12.0.jar
    2. https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.15.40/bundle-2.15.40.jar
    3. https://repo1.maven.org/maven2/software/amazon/awssdk/url-connection-client/2.15.40/url-connection-client-2.15.40.jar
  2. Open AWS Glue Studio.
  3. Choose Connectors.
  4. Choose Create custom connector.
  5. For Connector S3 URL, enter comma separated Amazon S3 paths for the above JAR files.
  6. For Name, enter iceberg-0120-byoc-connector.
  7. For Connector Type, choose Spark.
  8. For Class name, enter iceberg.
  9. Choose Create connector.
  10. Choose iceberg-0120-byoc-connector.
  11. Choose Create connection.
  12. For Name, enter iceberg-0120-byoc-connection.
  13. Optionally, choose a VPC, subnet, and security group.
  14. Choose Create connection.

Apache Iceberg 0.13.1

Complete the following steps to create a custom connection for Apache Iceberg 0.13.1:

  1. Download the following JAR files, and upload them to your S3 bucket.
    1. iceberg-spark-runtime-3.1_2.12-0.13.1.jar
    2. https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.17.161/bundle-2.17.161.jar
    3. https://repo1.maven.org/maven2/software/amazon/awssdk/url-connection-client/2.17.161/url-connection-client-2.17.161.jar
  2. Open AWS Glue Studio.
  3. Choose Connectors.
  4. Choose Create custom connector.
  5. For Connector S3 URL, enter comma separated Amazon S3 paths for the above JAR files.
  6. For Name, enter iceberg-0131-byoc-connector.
  7. For Connector Type, choose Spark.
  8. For Class name, enter iceberg.
  9. Choose Create connector.
  10. Choose iceberg-0131-byoc-connector.
  11. Choose Create connection.
  12. For Name, enter iceberg-0131-byoc-connection.
  13. Optionally, choose a VPC, subnet, and security group.
  14. Choose Create connection.

Prerequisites

To continue this tutorial, you must create the following AWS resources in advance:

  • AWS Identity and Access Management (IAM) role for your ETL job or notebook as instructed in Set up IAM permissions for AWS Glue Studio. Note that AmazonEC2ContainerRegistryReadOnly or equivalent permissions are needed when you use the marketplace connectors.
  • Amazon S3 bucket for storing data.
  • Glue connection (one of the marketplace connector or the custom connector corresponding to the data lake format).

Reads/writes using the connector on AWS Glue Studio Notebook

The following are the instructions to read/write tables using each data lake format on AWS Glue Studio Notebook. As a prerequisite, make sure that you have created a connector and a connection for the connector using the information above.
The example notebooks are hosted on AWS Glue Samples GitHub repository. You can find 7 notebooks available. In the following instructions, we will use one notebook per data lake format.

Apache Hudi

To read/write Apache Hudi tables in the AWS Glue Studio notebook, complete the following:

  1. Download hudi_dataframe.ipynb.
  2. Open AWS Glue Studio.
  3. Choose Jobs.
  4. Choose Jupyter notebook and then choose Upload and edit an existing notebook. From Choose file, select your ipynb file and choose Open, then choose Create.
  5. On the Notebook setup page, for Job name, enter your job name.
  6. For IAM role, select your IAM role. Choose Create job. After a short time period, the Jupyter notebook editor appears.
  7. In the first cell, replace the placeholder with your Hudi connection name, and run the cell:
    %connections hudi-0101-byoc-connection (Alternatively you can use your connection name created from the marketplace connector).
  8. In the second cell, replace the S3 bucket name placeholder with your S3 bucket name, and run the cell.
  9. Run the cells in the section Initialize SparkSession.
  10. Run the cells in the section Clean up existing resources.
  11. Run the cells in the section Create Hudi table with sample data using catalog sync to create a new Hudi table with sample data.
  12. Run the cells in the section Read from Hudi table to verify the new Hudi table. There are five records in this table.
  13. Run the cells in the section Upsert records into Hudi table to see how upsert works on Hudi. This code inserts one new record, and updates the one existing record. You can verify that there is a new record product_id=00006, and the existing record product_id=00001’s price has been updated from 250 to 400.
  14. Run the cells in the section Delete a Record. You can verify that the existing record product_id=00001 has been deleted.
  15. Run the cells in the section Point in time query. You can verify that you’re seeing the previous version of the table where the upsert and delete operations haven’t been applied yet.
  16. Run the cells in the section Incremental Query. You can verify that you’re seeing only the recent commit about product_id=00006.

On this notebook, you could complete the basic Spark DataFrame operations on Hudi tables.

Delta Lake

To read/write Delta Lake tables in the AWS Glue Studio notebook, complete following:

  1. Download delta_sql.ipynb.
  2. Open AWS Glue Studio.
  3. Choose Jobs.
  4. Choose Jupyter notebook, and then choose Upload and edit an existing notebook. From Choose file, select your ipynb file and choose Open, then choose Create.
  5. On the Notebook setup page, for Job name, enter your job name.
  6. For IAM role, select your IAM role. Choose Create job. After a short time period, the Jupyter notebook editor appears.
  7. In the first cell, replace the placeholder with your Delta connection name, and run the cell:
    %connections delta-100-byoc-connection
  8. In the second cell, replace the S3 bucket name placeholder with your S3 bucket name, and run the cell.
  9. Run the cells in the section Initialize SparkSession.
  10. Run the cells in the section Clean up existing resources.
  11. Run the cells in the section Create Delta table with sample data to create a new Delta table with sample data.
  12. Run the cells in the section Create a Delta Lake table.
  13. Run the cells in the section Read from Delta Lake table to verify the new Delta table. There are five records in this table.
  14. Run the cells in the section Insert records. The query inserts two new records: record_id=00006, and record_id=00007.
  15. Run the cells in the section Update records. The query updates the price of the existing records record_id=00007, and record_id=00007 from 500 to 300.
  16. Run the cells in the section Upsert records. to see how upsert works on Delta. This code inserts one new record, and updates the one existing record. You can verify that there is a new record product_id=00008, and the existing record product_id=00001’s price has been updated from 250 to 400.
  17. Run the cells in the section Alter DeltaLake table. The queries add one new column, and update the values in the column.
  18. Run the cells in the section Delete records. You can verify that the record product_id=00006 because it’s product_name is Pen.
  19. Run the cells in the section View History to describe the history of operations that was triggered against the target Delta table.

On this notebook, you could complete the basic Spark SQL operations on Delta tables.

Apache Iceberg

To read/write Apache Iceberg tables in the AWS Glue Studio notebook, complete the following:

  1. Download iceberg_sql.ipynb.
  2. Open AWS Glue Studio.
  3. Choose Jobs.
  4. Choose Jupyter notebook and then choose Upload and edit an existing notebook. From Choose file, select your ipynb file and choose Open, then choose Create.
  5. On the Notebook setup page, for Job name, enter your job name.
  6. For IAM role, select your IAM role. Choose Create job. After a short time period, the Jupyter notebook editor appears.
  7. In the first cell, replace the placeholder with your Delta connection name, and run the cell:
    %connections iceberg-0131-byoc-connection (Alternatively you can use your connection name created from the marketplace connector).
  8. In the second cell, replace the S3 bucket name placeholder with your S3 bucket name, and run the cell.
  9. Run the cells in the section Initialize SparkSession.
  10. Run the cells in the section Clean up existing resources.
  11. Run the cells in the section Create Iceberg table with sample data to create a new Iceberg table with sample data.
  12. Run the cells in the section Read from Iceberg table.
  13. Run the cells in the section Upsert records into Iceberg table.
  14. Run the cells in the section Delete records.
  15. Run the cells in the section View History and Snapshots.

On this notebook, you could complete the basic Spark SQL operations on Iceberg tables.

Conclusion

This post summarized how to utilize Apache Hudi, Delta Lake, and Apache Iceberg on AWS Glue platform, as well as demonstrate how each format works with a Glue Studio notebook. You can start using those data lake formats easily in Spark DataFrames and Spark SQL on the Glue jobs or the Glue Studio notebooks.

This post focused on interactive coding and querying on notebooks. The upcoming part 2 will focus on the experience using AWS Glue Studio Visual Editor and Glue DynamicFrames for customers who prefer visual authoring without the need to write code.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He enjoys learning different use cases from customers and sharing knowledge about big data technologies with the wider community.

Dylan Qu is a Specialist Solutions Architect focused on Big Data & Analytics with AWS. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS.

Monjumi Sarma is a Data Lab Solutions Architect at AWS. She helps customers architect data analytics solutions, which gives them an accelerated path towards modernization initiatives.

Accelerate Amazon DynamoDB data access in AWS Glue jobs using the new AWS Glue DynamoDB Export connector

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/accelerate-amazon-dynamodb-data-access-in-aws-glue-jobs-using-the-new-aws-glue-dynamodb-elt-connector/

Modern data architectures encourage the integration of data lakes, data warehouses, and purpose-built data stores, enabling unified governance and easy data movement. With a modern data architecture on AWS, you can store data in a data lake and use a ring of purpose-built data services around the lake, allowing you to make decisions with speed and agility.

To achieve a modern data architecture, AWS Glue is the key service that integrates data over a data lake, data warehouse, and purpose-built data stores. AWS Glue simplifies data movement like inside-out, outside-in, or around the perimeter. A powerful purpose-built data store is Amazon DynamoDB, which is widely used by hundreds of thousands of companies, including Amazon.com. It’s common to move data from DynamoDB to a data lake built on top of Amazon Simple Storage Service (Amazon S3). Many customers move data from DynamoDB to Amazon S3 using AWS Glue extract, transform, and load (ETL) jobs.

Today, we’re pleased to announce the general availability of a new AWS Glue DynamoDB export connector. It’s built on top of the DynamoDB table export feature. It’s a scalable and cost-efficient way to read large DynamoDB table data in AWS Glue ETL jobs. This post describes the benefit of this new export connector and its use cases.

The following are typical use cases to read from DynamoDB tables using AWS Glue ETL jobs:

  • Move the data from DynamoDB tables to different data stores
  • Integrate the data with other services and applications
  • Retain historical snapshots for auditing
  • Build an S3 data lake from the DynamoDB data and analyze the data from various services, such as Amazon Athena, Amazon Redshift, and Amazon SageMaker

The new AWS Glue DynamoDB export connector

The old version of the AWS Glue DynamoDB connector reads DynamoDB tables through the DynamoDB Scan API. Instead, the new AWS Glue DynamoDB export connector reads DynamoDB data from the snapshot, which is exported from DynamoDB tables. This approach has following benefits:

  • It doesn’t consume read capacity units of the source DynamoDB tables
  • The read performance is consistent for large DynamoDB tables

Especially for large DynamoDB tables more than 100 GB, this new connector is significantly faster than the traditional connector.

To use this new export connector, you need to enable point-in-time recovery (PITR) for the source DynamoDB table in advance.

How to use the new connector on AWS Glue Studio Visual Editor

AWS Glue Studio Visual Editor is a graphical interface that makes it easy to create, run, and monitor AWS Glue ETL jobs in AWS Glue. The new DynamoDB export connector is available on AWS Glue Studio Visual Editor. You can choose Amazon DynamoDB as the source.

After you choose Create, you see the visual Directed Acyclic Graph (DAG). Here, you can choose your DynamoDB table that exists in this account or Region. This allows you to select DynamoDB tables (with PITR enabled) directly as a source in AWS Glue Studio. This provides a one-click export from any of your DynamoDB tables to Amazon S3. You can also easily add any data sources and targets or transformations to the DAG. For example, it allows you to join two different DynamoDB tables and export the result to Amazon S3, as shown in the following screenshot.

The following two connection options are automatically added. This location is used to store temporary data during the DynamoDB export phase. You can set S3 bucket lifecycle policies to expire temporary data.

  • dynamodb.s3.bucket – The S3 bucket to store temporary data during DynamoDB export
  • dynamodb.s3.prefix – The S3 prefix to store temporary data during DynamoDB export

How to use the new connector on the job script code

You can use the new export connector when you create an AWS Glue DynamicFrame in the job script code by configuring the following connection options:

  • dynamodb.export – (Required) You need to set this to ddb or s3
  • dynamodb.tableArn – (Required) Your source DynamoDB table ARN
  • dynamodb.unnestDDBJson – (Optional) If set to true, performs an unnest transformation of the DynamoDB JSON structure that is present in exports. The default value is false.
  • dynamodb.s3.bucket – (Optional) The S3 bucket to store temporary data during DynamoDB export
  • dynamodb.s3.prefix – (Optional) The S3 prefix to store temporary data during DynamoDB export

The following is the sample Python code to create a DynamicFrame using the new export connector:

dyf = glue_context.create_dynamic_frame.from_options(
    connection_type="dynamodb",
    connection_options={
        "dynamodb.export": "ddb",
        "dynamodb.tableArn": "test_source",
        "dynamodb.unnestDDBJson": True,
        "dynamodb.s3.bucket": "bucket name",
        "dynamodb.s3.prefix": "bucket prefix"
    }
)

The new export connector doesn’t require configurations related to AWS Glue job parallelism, unlike the old connector. Now you no longer need to change the configuration when you scale out the AWS Glue job. It also doesn’t require any configuration regarding DynamoDB table read/write capacity and its capacity mode (on demand or provisioned).

DynamoDB table schema handling

By default, the new export connector reads data in DynamoDB JSON structure that is present in exports. The following is an example schema of the frame using the Amazon Customer Review Dataset:

root
|-- Item: struct (nullable = true)
| |-- product_id: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- review_id: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- total_votes: struct (nullable = true)
| | |-- N: string (nullable = true)
| |-- product_title: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- star_rating: struct (nullable = true)
| | |-- N: string (nullable = true)
| |-- customer_id: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- marketplace: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- helpful_votes: struct (nullable = true)
| | |-- N: string (nullable = true)
| |-- review_headline: struct (nullable = true)
| | |-- S: string (nullable = true)
| | |-- NULL: boolean (nullable = true)
| |-- review_date: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- vine: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- review_body: struct (nullable = true)
| | |-- S: string (nullable = true)
| | |-- NULL: boolean (nullable = true)
| |-- verified_purchase: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- product_category: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- year: struct (nullable = true)
| | |-- N: string (nullable = true)
| |-- product_parent: struct (nullable = true)
| | |-- S: string (nullable = true)

To read DynamoDB item columns without handling nested data, you can set dynamodb.unnestDDBJson to True. The following is an example of the schema of the same data where dynamodb.unnestDDBJson is set to True:

root
|-- product_id: string (nullable = true)
|-- review_id: string (nullable = true)
|-- total_votes: string (nullable = true)
|-- product_title: string (nullable = true)
|-- star_rating: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- marketplace: string (nullable = true)
|-- helpful_votes: string (nullable = true)
|-- review_headline: string (nullable = true)
|-- review_date: string (nullable = true)
|-- vine: string (nullable = true)
|-- review_body: string (nullable = true)
|-- verified_purchase: string (nullable = true)
|-- product_category: string (nullable = true)
|-- year: string (nullable = true)
|-- product_parent: string (nullable = true)

Data freshness

Data freshness is the measure of staleness of the data from the live tables in the original source. In the new export connecor, the option dynamodb.export impacts data freshness.

When dynamodb.export is set to ddb, the AWS Glue job invokes a new export and then reads the export placed in an S3 bucket into DynamicFrame. It reads exports of the live table, so data can be fresh. On the other hand, when dynamodb.export is set to s3, the AWS Glue job skips invoking a new export and directly reads an export already placed in an S3 bucket. It reads exports of the past table, so data can be stale, but you can reduce overhead to trigger the exports.

The following table explains the data freshness and pros and cons of each option.

.. dynamodb.export Config Data Freshness Data Source Pros Cons
New export connector s3 Stale Export of the past table
  • RCU is not consumed
  • Can skip triggering exports
  • Data can be stale
New export connector ddb Fresh Export of the live table
  • Data can be fresh
  • RCU is not consumed
  • Overhead to trigger exports and wait for completion
Old connector N/A Most fresh Scan of the live tables
  • Data can be fresh
  • Read capacity unit (RCU) is consumed

Performance

The following benchmark shows the performance improvements between the old version of the AWS Glue DynamoDB connector and the new export connector. The comparison uses the DynamoDB tables storing the TPC-DS benchmark dataset with different scales from 10 MB to 2 TB. The sample Spark job reads from the DynamoDB table and calculates the count of the items. All the Spark jobs are run on AWS Glue 3.0, G.2X, 60 workers.

The following chart compares AWS Glue job duration between the old connector and the new export connector. For small DynamoDB tables, the old connector is faster. For large tables more than 80 GB, the new export connector is faster. In other words, the DynamoDB export connector is recommended for jobs that take the old connector more than 5–10 minutes to run. Also, the chart shows that the duration of the new export connector increases slowly as data size increases, although the duration of the old connector increases rapidly as data size increases. This means that the new export connector is suitable especially for larger tables.

The following chart compares dollar cost between the old connector and the new export connector. It contains the AWS Glue DPU hour cost summed with the cost for reading data from DynamoDB. For the old connector, we include the read request cost. For the new export connector, we include the cost in the DynamoDB data export to Amazon S3. Both are calculated in DynamoDB on-demand capacity mode.

With AWS Glue Auto Scaling

AWS Glue Auto Scaling is a new feature to automatically resize computing resources for better performance at lower cost. You can take advantage of AWS Glue Auto Scaling with the new DynamoDB export connector.

As the following chart shows, with AWS Glue Auto Scaling, the duration of the new export connector is shorter than the old connector when the size of the source DynamoDB table is 100 GB or more. It shows a similar trend without AWS Glue Auto Scaling.

You get the cost benefits as only Spark driver is active for most of the time duration during the DynamoDB export (which is nearly 30% of the total job duration time with the old scan-based connector).

Conclusion

AWS Glue is a key service to integrate with multiple data stores. At AWS, we keep improving the performance and cost-efficiency of our services. In this post, we announced the availability of the new AWS Glue DynamoDB export connector. With this new connector, you can easily integrate your large data on DynamoDB tables with different data stores. It helps you read the large tables faster from AWS Glue jobs at lower cost.

The new AWS Glue DynamoDB export connector is now generally available in all supported Glue Regions. Let’s start using the new AWS Glue DynamoDB export connector today! We are looking forward to your feedback and stories on how you utilize the connector for your needs.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts that help customers build data lakes on the cloud.

Neil Gupta is a Software Development Engineer on the AWS Glue team. He enjoys tackling big data problems and learning more about distributed systems.

Andrew Kim is a Software Development Engineer on the AWS Glue team. His passion is to build scalable and effective solutions to challenging problems and working with distributed systems.

Savio Dsouza is a Software Development Manager on the AWS Glue team. His team works on distributed systems for efficiently managing data lakes on AWS and optimizing Apache Spark for performance and reliability.

Synchronize your AWS Glue Studio Visual Jobs to different environments 

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/synchronize-your-aws-glue-studio-visual-jobs-to-different-environments/

AWS Glue has become a popular option for integrating data from disparate data sources due to its ability to integrate large volumes of data using distributed data processing frameworks. Many customers use AWS Glue to build data lakes and data warehouses. Data engineers who prefer to develop data processing pipelines visually using AWS Glue Studio to create data integration jobs. This post introduces Glue Visual Job API to author the Glue Studio Visual Jobs programmatically, and Glue Job Sync utility that uses the API to easily synchronize Glue jobs to different environments without losing the visual representation.

Glue Job Visual API

AWS Glue Studio has a graphical interface called Visual Editor that makes it easy to author extract, transform, and load (ETL) jobs in AWS Glue. The Glue jobs created in the Visual Editor contain its visual representation that composes data transformation. In this post, we call the jobs Glue Studio Visual Jobs.

For example, it’s common to develop and test AWS Glue jobs in a dev account, and then promote the jobs to a prod account. Previously, when you copied the AWS Glue Studio Visual jobs to a different environment, there was no mechanism to copy the visual representation together. This means that the visual representation of the job was lost and you could only copy the code produced with Glue Studio. It can be time consuming and tedious to either copy the code or recreate the job.

AWS Glue Job Visual API lets you programmatically create and update Glue Studio Visual Jobs by providing a JSON object that indicates visual representation, and also retrieve the visual representation from existing Glue Studio Visual Jobs. A Glue Studio Visual Job consists of data source nodes for reading the data, transform nodes for modifying the data, and data target nodes for writing the data.

There are some typical use cases for Glue Visual Job API:

  • Automate creation of Glue Visual Jobs.
  • Migrate your ETL jobs from third-party or on-premises ETL tools to AWS Glue. Many AWS partners, such as Bitwise, Bladebridge, and others have built convertors from the third-party ETL tools to AWS Glue.
  • Synchronize AWS Glue Studio Visual jobs from one environment to another without losing visual representation.

In this post, we focus on a utility that uses Glue Job Visual APIs to achieve the mass synchronization of your Glue Studio Visual Jobs without losing the visual representation.

Glue Job Sync Utility

There are common requirements to synchronize the Glue Visual Jobs between different environments.

  • Promote Glue Visual Jobs from a dev account to a prod account.
  • Transfer ownership of Glue Visual Jobs between different AWS accounts.
  • Replicate Glue Visual Job configurations from one region to another for disaster recovery purpose.

Glue Job Sync Utility is built on top of Glue Visual Job API, and the utility lets you synchronize the jobs to different accounts without losing the visual representation. The Glue Job Sync Utility is a python application that enables you to synchronize your AWS Glue Studio Visual jobs to different environments using the new Glue Job Visual API. This utility requires that you provide source and target AWS environment profiles. Optionally, you can provide a list of jobs that you want to synchronize, and specify how the utility should replace your environment-specific objects using a mapping file. For example, Amazon Simple Storage Service (Amazon S3) locations in your development environment and role can be different than your production environment. The mapping config file will be used to replace the environment specific objects.

How to use Glue Job Sync Utility

In this example, we’re synchronizing two AWS Glue Studio Visual jobs, test1 and test2, from the development environment to the production environment in a different account.

  • Source environment (dev environment)
    • AWS Account ID: 123456789012
    • AWS Region: eu-west-3 (Paris)
    • AWS Glue Studio Visual jobs: test1, test2
    • AWS Identity and Access Management (IAM) Role ARN for Glue job execution role: arn:aws:iam::123456789012:role/GlueServiceRole
    • Amazon S3 bucket for Glue job script and other asset location: s3://aws-glue-assets-123456789012-eu-west-3/
    • Amazon S3 bucket for data location: s3://dev-environment/
  • Destination environment (prod environment)
    • AWS Account ID: 234567890123
    • AWS Region: eu-west-3 (Paris)
    • IAM Role ARN for Glue job execution role: arn:aws:iam::234567890123:role/GlueServiceRole
    • Amazon S3 bucket for Glue job script and other asset location: s3://aws-glue-assets-234567890123-eu-west-3/
    • Amazon S3 bucket for data location: s3://prod-environment/

Set up the utility in your local environment

You will need the following prerequisites for this utility:

  • Python 3.6 or later.
  • Latest version of boto3.
  • Create two AWS named profiles, dev and prod, with the corresponding credentials in your environment. Follow this instruction.

Download the Glue Job Sync Utility

Download the sync utility from the GitHub repository to your local machine.

Create AWS Glue Studio Visual Jobs

  1. Create two AWS Glue Studio Visual jobs, test1, and test2, in the source account.
    • If you don’t have any AWS Glue Studio Visual jobs, then follow this instruction to create the Glue Studio Visual jobs.

  2. Open AWS Glue Studio in the destination account and verify that the test1 and test2 jobs aren’t present.

Run the Job Sync Utility

  1. Create a new file named mapping.json, and enter the following JSON code. With the configuration in line 1, the sync utility will replace all of the Amazon S3 references within the job (in this case s3://aws-glue-assets-123456789012-eu-west-3) to the mapped location (in this case s3://aws-glue-assets-234567890123-eu-west-3). Then, the utility will create the job to the destination environment. Along these lines, line 2 and line 3 will trigger appropriate substitutions in the job. Note that these are example values and you’ll need to substitute the right values that match your environment.

    {
        "s3://aws-glue-assets-123456789012-eu-west-3": "s3://aws-glue-assets-234567890123-eu-west-3",
        "arn:aws:iam::123456789012:role/GlueServiceRole": "arn:aws:iam::234567890123:role/GlueServiceRole",
        "s3://dev-environment": "s3://prod-environment"
    }

  2. Execute the utility by running the following command:
    $ python3 sync.py --src-profile dev --src-region eu-west-3 --dst-profile prod --dst-region eu-west-3 --src-job-names test1,test2 --config-path mapping.json

  3. Verify successful synchronization by opening AWS Glue Studio in the destination account:
  4. Open the Glue Studio Visual jobs, test1, and test2, and verify the visual representation of the DAG.

The screenshot above shows that you were able to copy the jobs test1 and test2 while keeping DAG into the destination account.

Conclusion

AWS Glue Job Visual API and the AWS Glue Sync Utility simplify how you synchronize your jobs to different environments. These are designed to easily integrate into your Continuous Integration pipelines while retaining the visual representation that improves the readability of the ETL pipeline.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for designing AWS features, implementing software artifacts, and helping customer architectures. In his spare time, he enjoys watching anime in Prime Video.

Aaron Meltzer is a Software Engineer on the AWS Glue Studio team. He leads the design and implementation of features to simplify the management of AWS Glue jobs. Outside of work, Aaron likes to read and learn new recipes.

Mohamed Kiswani is the Software Development Manager on the AWS Glue Team

Shiv Narayanan is a Senior Technical Product Manager on the AWS Glue team.

Introducing AWS Glue Auto Scaling: Automatically resize serverless computing resources for lower cost with optimized Apache Spark

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/introducing-aws-glue-auto-scaling-automatically-resize-serverless-computing-resources-for-lower-cost-with-optimized-apache-spark/

Data created in the cloud is growing fast in recent days, so scalability is a key factor in distributed data processing. Many customers benefit from the scalability of the AWS Glue serverless Spark runtime. Today, we’re pleased to announce the release of AWS Glue Auto Scaling, which helps you scale your AWS Glue Spark jobs automatically based on the requirements calculated dynamically during the job run, and accelerate job runs at lower cost without detailed capacity planning.

Before AWS Glue Auto Scaling, you had to predict workload patterns in advance. For example, in cases when you don’t have expertise in Apache Spark, when it’s the first time you’re processing the target data, or when the volume or variety of the data is significantly changing, it’s not so easy to predict the workload and plan the capacity for your AWS Glue jobs. Under-provisioning is error-prone and can lead to either missed SLA or unpredictable performance. On the other hand, over-provisioning can cause underutilization of resources and cost overruns. Therefore, it was a common best practice to experiment with your data, monitor the metrics, and adjust the number of AWS Glue workers before you deployed your Spark applications to production.

With AWS Glue Auto Scaling, you no longer need to plan AWS Glue Spark cluster capacity in advance. You can just set the maximum number of workers and run your jobs. AWS Glue monitors the Spark application execution, and allocates more worker nodes to the cluster in near-real time after Spark requests more executors based on your workload requirements. When there are idle executors that don’t have intermediate shuffle data, AWS Glue Auto Scaling removes the executors to save the cost.

AWS Glue Auto Scaling is available with the optimized Spark runtime on AWS Glue version 3.0, and you can start using it today. This post describes possible use cases and how it works.

Use cases and benefits for AWS Glue Auto Scaling

Traditionally, AWS Glue launches a serverless Spark cluster of a fixed size. The computing resources are held for the whole job run until it is completed. With the new AWS Glue Auto Scaling feature, after you enable it for your AWS Glue Spark jobs, AWS Glue dynamically allocates compute resource considering the given maximum number of workers. It also supports dynamic scale-out and scale-in of the AWS Glue Spark cluster size over the course of job. As more executors are requested by Spark, more AWS Glue workers are added to the cluster. When the executor has been idle without active computation tasks for a period of time and associated shuffle dependencies, the executor and corresponding worker are removed.

AWS Glue Auto Scaling makes it easy to run your data processing in the following typical use cases:

  • Batch jobs to process unpredictable amounts of data
  • Jobs containing driver-heavy workloads (for example, processing many small files)
  • Jobs containing multiple stages with uneven compute demands or due to data skews (for example, reading from a data store, repartitioning it to have more parallelism, and then processing further analytic workloads)
  • Jobs to write large amouns of data into data warehouses such as Amazon Redshift or read and write from databases

Configure AWS Glue Auto Scaling

AWS Glue Auto Scaling is available with the optimized Spark runtime on Glue version 3.0. To enable Auto Scaling on the AWS Glue Studio console, complete the following steps:

  1. Open AWS Glue Studio.
  2. Choose Jobs.
  3. Choose your job.
  4. Choose the Job details tab.
  5. For Glue version, choose Glue 3.0 – Supports spark 3.1, Scala 2, Python.
  6. Select Automatically scale the number of workers.
  7. For Maximum number of workers, enter the maximum workers that can be vended to the job run.
  8. Choose Save.

To enable Auto Scaling in the AWS Glue API or AWS Command Line Interface (AWS CLI), set the following job parameters:

  • Key--enable-auto-scaling
  • Valuetrue

Monitor AWS Glue Auto Scaling

In this section, we discuss three ways to monitor AWS Glue Auto Scaling: via Amazon CloudWatch metrics or Spark UI.

CloudWatch metrics

After you enable AWS Glue Auto Scaling, Spark dynamic allocation is enabled and the executor metrics are visible in CloudWatch. You can review the following metrics to understand the demand and optimized usage of executors in their Spark applications enabled with Auto Scaling:

  • glue.driver.ExecutorAllocationManager.executors.numberAllExecutors
  • glue.driver.ExecutorAllocationManager.executors.numberMaxNeededExecutors

AWS Glue Studio Monitoring page

In the Monitoring page in AWS Glue Studio, you can monitor the DPU hours you spent for a specific job run. The following screenshot shows two job runs that processed the same dataset; one without Auto Scaling which spent 8.71 DPU hours, and another one with Auto Scaling enabled which spent only 1.48 DPU hours. The DPU hour values per job run are also available with GetJobRun API responses.

Spark UI

With the Spark UI, you can monitor that the AWS Glue Spark cluster dynamically scales out and scales in with AWS Glue Auto Scaling. The event timeline shows when each executor is added and removed gradually over the Spark application run.

In the following sections, we demonstrate AWS Glue Auto Scaling with two use cases: jobs with driver-heavy workloads, and jobs with multiple stages.

Example 1: Jobs containing driver-heavy workloads

A typical workload for AWS Glue Spark jobs is to process many small files to prepare the data for further analysis. For such workloads, AWS Glue has built-in optimizations, including file grouping, a Glue S3 Lister, partition pushdown predicates, partition indexes, and more. For more information, see Optimize memory management in AWS Glue. All those optimizations execute on the Spark driver and speed up the planning phase on Spark driver to compute and distribute the work for parallel processing with Spark executors. However, without AWS Glue Auto Scaling, Spark executors are idle during the planning phase. With Auto Scaling, Glue jobs only allocate executors when the driver work is complete, thereby saving executor cost.

Here’s the example DAG shown in AWS Glue Studio. This AWS Glue job reads from an Amazon Simple Storage Service (Amazon S3) bucket, performs the ApplyMapping transformation, runs a simple SELECT query repartitioning data to have 800 partitions, and writes back to another location in Amazon S3.

Without AWS Glue Auto Scaling

The following screenshot shows the executor timeline in Spark UI when the AWS Glue job ran with 20 workers without Auto Scaling. You can confirm that all 20 workers started at the beginning of the job run.

With AWS Glue Auto Scaling

In contrast, the following screenshot shows the executor timeline of the same job with Auto Scaling enabled and the maximum workers set to 20. The driver and one executor started at the beginning, and other executors started only after the driver finished its computation for listing 367,920 partitions on the S3 bucket. These 19 workers were not charged during the long-running driver task.

Both jobs completed in 44 minutes. With AWS Glue Auto Scaling, the job completed in the same amount of time with lower cost.

Example 2: Jobs containing multiple stages

Another typical workload in AWS Glue is to read from the data store or large compressed files, repartition it to have more parallelism for downstream processing, and process further analytic queries. For example, when you want to read from a JDBC data store, you may not want to have many concurrent connections, so you can avoid impacting source database performance. For such workloads, you can have a small number of connections to read data from the JDBC data store, then repartition the data with higher parallelism for further analysis.

Here’s the example DAG shown in AWS Glue Studio. This AWS Glue job reads from the JDBC data source, runs a simple SELECT query adding one more column (mod_id) calculated from the column ID, performs the ApplyMapping node, then writes to an S3 bucket with partitioning by this new column mod_id. Note that the JDBC data source was already registered in the AWS Glue Data Catalog, and the table has two parameters, hashfield=id and hashpartitions=5, to read from JDBC through five concurrent connections.

Without AWS Glue Auto Scaling

The following screenshot shows the executor timeline in the Spark UI when the AWS Glue job ran with 20 workers without Auto Scaling. You can confirm that all 20 workers started at the beginning of the job run.

With AWS Glue Auto Scaling

The following screenshot shows the same executor timeline in the Spark UI with Auto Scaling enabled with 20 maximum workers. The driver and two executors started at the beginning, and other executors started later. The first two executors read data from the JDBC source with fewer number of concurrent connections. Later, the job increased parallelism and more executors were started. You can also observe that there were 16 executors, not 20, which further reduced cost.

Conclusion

This post discussed AWS Glue Auto Scaling, which automatically resizes the computing resources of your AWS Glue Spark job capacity and reduce cost. You can start using AWS Glue Auto Scaling for both your existing workloads and future new workloads, and take advantage of it today! For more information about AWS Glue Auto Scaling, see Using Auto Scaling for AWS Glue. Migrate your jobs to Glue version 3.0 and get the benefits of Auto Scaling.

Special thanks to everyone who contributed to the launch: Raghavendhar Thiruvoipadi Vidyasagar, Ping-Yao Chang, Shashank Bhardwaj, Sampath Shreekantha, Vaibhav Porwal, and Akash Gupta.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is passionate about architecting fast-growing data platforms, diving deep into distributed big data software like Apache Spark, building reusable software artifacts for data lakes, and sharing the knowledge in AWS Big Data blog posts. In his spare time, he enjoys taking care of killifish, hermit crabs, and grubs with his children.

Bo Li is a Software Development Engineer on the AWS Glue team. He is devoted to designing and building end-to-end solutions to address customers’ data analytic and processing needs with cloud-based, data-intensive technologies.

Rajendra Gujja is a Software Development Engineer on the AWS Glue team. He is passionate about distributed computing and everything and anything about data.

Mohit Saxena is a Senior Software Development Manager on the AWS Glue team. His team works on distributed systems for efficiently managing data lakes on AWS and optimizes Apache Spark for performance and reliability.

Best practices to optimize data access performance from Amazon EMR and AWS Glue to Amazon S3

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/best-practices-to-optimize-data-access-performance-from-amazon-emr-and-aws-glue-to-amazon-s3/

Customers are increasingly building data lakes to store data at massive scale in the cloud. It’s common to use distributed computing engines, cloud-native databases, and data warehouses when you want to process and analyze your data in data lakes. Amazon EMR and AWS Glue are two key services you can use for such use cases. Amazon EMR is a managed big data framework that supports several different applications, including Apache Spark, Apache Hive, Presto, Trino, and Apache HBase. AWS Glue Spark jobs run on top of Apache Spark, and distribute data processing workloads in parallel to perform extract, transform, and load (ETL) jobs to enrich, denormalize, mask, and tokenize data on a massive scale.

For data lake storage, customers typically use Amazon Simple Storage Service (Amazon S3) because it’s secure, scalable, durable, and highly available. Amazon S3 is designed for 11 9’s of durability and stores over 200 trillion objects for millions of applications around the world, making it the ideal storage destination for your data lake. Amazon S3 averages over 100 million operations per second, so your applications can easily achieve high request rates when using Amazon S3 as your data lake.

This post describes best practices to achieve the performance scaling you need when analyzing data in Amazon S3 using Amazon EMR and AWS Glue. We specifically focus on optimizing for Apache Spark on Amazon EMR and AWS Glue Spark jobs.

Optimizing Amazon S3 performance for large Amazon EMR and AWS Glue jobs

Amazon S3 is a very large distributed system, and you can scale to thousands of transactions per second in request performance when your applications read and write data to Amazon S3. Amazon S3 performance isn’t defined per bucket, but per prefix in a bucket. Your applications can achieve at least 3,500 PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per prefix in a bucket. Additionally, there are no limits to the number of prefixes in a bucket, so you can horizontally scale your read or write performance using parallelization. For example, if you create 10 prefixes in an S3 bucket to parallelize reads, you could scale your read performance to 55,000 read requests per second. You can similarly scale writes by writing data across multiple prefixes.

You can scale performance by utilizing automatic scaling in Amazon S3 and scan millions of objects for queries run over petabytes of data. Amazon S3 automatically scales in response to sustained new request rates, dynamically optimizing performance. While Amazon S3 is internally optimizing for a new request rate, you receive HTTP 503 request responses temporarily until the optimization completes:

AmazonS3Exception: Please reduce your request rate. (Service: Amazon S3; Status Code: 503; Error Code: SlowDown)

Such situations require the application to retry momentarily, but after Amazon S3 internally optimizes performance for the new request rate, all requests are generally served without retries. One such situation is when multiple workers in distributed compute engines such as Amazon EMR and AWS Glue momentarily generate a high number of requests to access data under the same prefix.

When using Amazon EMR and AWS Glue to process data in Amazon S3, you can employ certain best practices to manage request traffic and avoid HTTP Slow Down errors. Let’s look at some of these strategies.

Best practices to manage HTTP Slow Down responses

You can use the following approaches to take advantage of the horizontal scaling capability in Amazon S3 and improve the success rate of your requests when accessing Amazon S3 data using Amazon EMR and AWS Glue:

  • Modify the retry strategy for Amazon S3 requests
  • Adjust the number of Amazon S3 objects processed
  • Adjust the number of concurrent Amazon S3 requests

We recommend choosing and applying the options that fit best for your use case to optimize data processing on Amazon S3. In the following sections, we describe best practices of each approach.

Modify the retry strategy for Amazon S3 requests

This is the easiest way to avoid HTTP 503 Slow Down responses and improve the success rate of your requests. To access Amazon S3 data, both Amazon EMR and AWS Glue use the EMR File System (EMRFS), which retries Amazon S3 requests with jitters when it receives 503 Slow Down responses. To improve the success rate of your Amazon S3 requests, you can adjust your retry strategy by configuring certain properties. In Amazon EMR, you can configure parameters in your emrfs-site configuration. In AWS Glue, you can configure the parameters in job parameters. You can adjust your retry strategy in the following ways:

  • Increase the EMRFS default retry limit – By default, EMRFS uses an exponential backoff strategy to retry requests to Amazon S3. The default EMRFS retry limit is 15. However, you can increase this limit when you create a new cluster, on a running cluster, or at application runtime. To increase the retry limit, you can change the value of the fs.s3.maxRetries parameter. Note that you may experience longer job duration if you set a higher value for this parameter. We recommend experimenting with different values, such as 20 as a starting point, confirm the duration overhead of the jobs for each value, and then adjust this parameter based on your requirement.
  • For Amazon EMR, use the AIMD retry strategy – With Amazon EMR versions 6.4.0 and later, EMRFS supports an alternative retry strategy based on an additive-increase/multiplicative-decrease (AIMD) model. This strategy can be useful in shaping the request rate from large clusters. Instead of treating each request in isolation, this mode keeps track of the rate of recent successful and throttled requests. Requests are limited to a rate determined from the rate of recent successful requests. This decreases the number of throttled requests, and therefore the number of attempts needed per request. To enable the AIMD retry strategy, you can set the fs.s3.aimd.enabled property to true. You can further refine the AIMD retry strategy using the advanced AIMD retry settings.

Adjust the number of Amazon S3 objects processed

Another approach is to adjust the number of Amazon S3 objects processed so you have fewer requests made concurrently. When you lower the number of objects to be processed in a job, you use fewer Amazon S3 requests, thereby lowering the request rate or transactions per second (TPS) required for each job. Note the following considerations:

  • Preprocess the data by aggregating multiple smaller files into fewer, larger chunks – For example, use s3-dist-cp or an AWS Glue compaction blueprint to merge a large number of small files (generally less than 64 MB) into a smaller number of optimally sized files (such as 128–512 MB). This approach reduces the number of requests required, while simultaneously improving the aggregate throughput to read and process data in Amazon S3. You may need to experiment to arrive at the optimal size for your workload, because creating extremely large files can reduce the parallelism of the job.
  • Use partition pruning to scan data under specific partitions – In Apache Hive and Hive Metastore-compatible applications such as Apache Spark or Presto, one table can have multiple partition folders. Partition pruning is a technique to scan only the required data in a specific partition folder of a table. It’s useful when you want to read a specific portion from the entire table. To take advantage of predicate pushdown, you can use partition columns in the WHERE clause in Spark SQL or the filter expression in a DataFrame. In AWS Glue, you can also use a partition pushdown predicate when creating DynamicFrames.
  • For AWS Glue, enable job bookmarks – You can use AWS Glue job bookmarks to process continuously ingested data repeatedly. It only picks unprocessed data from the previous job run, thereby reducing the number of objects read or retrieved from Amazon S3.
  • For AWS Glue, enable bounded executionsAWS Glue bounded execution is a technique to only pick unprocessed data, with an upper bound on the dataset size or the number of files to be processed. This is another way to reduce the number of requests made to Amazon S3.

Adjust the number of concurrent Amazon S3 requests

To adjust the number of Amazon S3 requests to have fewer concurrent reads per prefix, you can configure Spark parameters. By default, Spark populates 10,000 tasks to list prefixes when creating Spark DataFrames. You may experience Slow Down responses, especially when you read from a table with highly nested prefix structures. In this case, it’s a good idea to configure Spark to limit the number of maximum listing parallelism by decreasing the parameter spark.sql.sources.parallelPartitionDiscovery.parallelism (the default is 10000).

To have fewer concurrent write requests per prefix, you can use the following techniques:

  • Reduce the number of Spark RDD partitions before writes – You can do this by using df.repartition(n) or df.coalesce(n) in DataFrames. For Spark SQL, you can also use query hints like REPARTITION or COALESCE. You can see the number of tasks (=RDD partitions) on the Spark UI.
  • For AWS Glue, group the input data – If the datasets are made up of small files, we recommend grouping the input data because it reduces the number of RDD partitions, and reduces the number of Amazon S3 requests to write the files.
  • Use the EMRFS S3-optimized committer – The EMRFS S3-optimized committer is used by default in Amazon EMR 5.19.0 and later, and AWS Glue 3.0. In AWS Glue 2.0, you can configure it in the job parameter --enable-s3-parquet-optimized-committer. The committer uses Amazon S3 multipart uploads instead of renaming files, and it usually reduces the number of HEAD/LIST requests significantly.

The following are other techniques to adjust the Amazon S3 request rate in Amazon EMR and AWS Glue. These options have the net effect of reducing parallelism of the Spark job, thereby reducing the probability of Amazon S3 Slow Down responses, although it can lead to longer job duration. We recommend testing and adjusting these values for your use case.

  • Reduce the number of concurrent jobs – Start with the most read/write heavy jobs. If you configured cross-account access for Amazon S3, keep in mind that other accounts might also be submitting jobs to the prefix.
  • Reduce the number of concurrent Spark tasks – You have several options:
    • For Amazon EMR, set the number of Spark executors (for example, the spark-submit option --num-executors and Spark parameter spark.executor.instance).
    • For AWS Glue, set the number of workers in the NumberOfWorkers parameter.
    • For AWS Glue, change the WorkerType parameter to a smaller one (for example, G.2X to G.1X).
    • Configure Spark parameters:
      • Decrease the number of spark.default.parallelism.
      • Decrease the number of spark.sql.shuffle.partitions.
      • Increase the number of spark.task.cpus (the default is 1) to allocate more CPU cores per Spark task.

Conclusion

In this post, we described the best practices to optimize data access from Amazon EMR and AWS Glue to Amazon S3. With these best practices, you can easily run Amazon EMR and AWS Glue jobs by taking advantage of Amazon S3 horizontal scaling, and process data in a highly distributed way at a massive scale.

For further guidance, please reach out to AWS Premium Support.

Appendix A: Configure CloudWatch request metrics

To monitor Amazon S3 requests, you can enable request metrics in Amazon CloudWatch for the bucket. Then, define a filter for the prefix. For a list of useful metrics to monitor, see Monitoring metrics with Amazon CloudWatch. After you enable metrics, use the data in the metrics to determine which of the aforementioned options is best for your use case.

Appendix B: Configure Spark parameters

To configure Spark parameters in Amazon EMR, there are several options:

  • spark-submit command – You can pass Spark parameters via the --conf option.
  • Job script – You can set Spark parameters in the SparkConf object in the job script codes.
  • Amazon EMR configurations – You can configure Spark parameters via API using Amazon EMR configurations. For more information, see Configure Spark.

To configure Spark parameters in AWS Glue, you can configure AWS Glue job parameters using key --conf with value like spark.hadoop.fs.s3.maxRetries=50.

To set multiple configs, configure your job parameters using key --conf with value like spark.hadoop.fs.s3.maxRetries=50 --conf spark.task.cpus=2.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is passionate about releasing AWS Glue connector custom blueprints and other software artifacts to help customers build their data lakes. In his spare time, he enjoys watching hermit crabs with his children.

Aditya Kalyanakrishnan is a Senior Product Manager on the Amazon S3 team at AWS. He enjoys learning from customers about how they use Amazon S3 and helping them scale performance. Adi’s based in Seattle, and in his spare time enjoys hiking and occasionally brewing beer.

Improve Amazon Athena query performance using AWS Glue Data Catalog partition indexes

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/improve-amazon-athena-query-performance-using-aws-glue-data-catalog-partition-indexes/

The AWS Glue Data Catalog provides partition indexes to accelerate queries on highly partitioned tables. In the post Improve query performance using AWS Glue partition indexes, we demonstrated how partition indexes reduce the time it takes to fetch partition information during the planning phase of queries run on Amazon EMR, Amazon Redshift Spectrum, and AWS Glue extract, transform, and load (ETL) jobs.

We’re pleased to announce Amazon Athena support for AWS Glue Data Catalog partition indexes. You can use the same indexes configured for Amazon EMR, Redshift Spectrum, and AWS Glue ETL jobs with Athena to reduce query planning times for highly partitioned tables, which is common in most data lakes on Amazon Simple Storage Service (Amazon S3).

In this post, we describe how to set up partition indexes and perform a few sample queries to demonstrate the performance improvement on Athena queries.

Set up resources with AWS CloudFormation

To help you get started quickly, we provide an AWS CloudFormation template, the same template we used in a previous post. You can review and customize it to suit your needs. Some of the resources this stack deploys incur costs when in use.

The CloudFormation template generates the following resources:

If you’re using AWS Lake Formation permissions, you need to make sure that the IAM user or role running AWS CloudFormation has the required permissions to create a database on the AWS Glue Data Catalog.

The tables created by the CloudFormation template use sample data located in an S3 public bucket. The data is partitioned by the columns year, month, day, and hour. There are 367,920 partition folders in total, and each folder has a single file in JSON format that contains an event similar to the following:

{
  "id": "95c4c9a7-4718-4031-9e79-b56b72220fbc",
  "value": 464.22130592811703
}

To create your resources, complete the following steps:

  1. Sign in to the AWS CloudFormation console.
  2. Choose Launch Stack:
  3. Choose Next.
  4. For DatabaseName, leave as the default.
  5. Choose Next.
  6. On the next page, choose Next.
  7. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  8. Choose Create.

Stack creation can take up to 5 minutes. When the stack is complete, you have two Data Catalog tables: table_with_index and table_without_index. Both tables point to the same S3 bucket, as mentioned previously, which holds data for more than 42 years (1980–2021) in 367,920 partitions. Each partition folder includes a data.json file containing the event data. In the following sections, we demonstrate how the partition indexes improve query performance with these tables using an example that represents large datasets in a data lake.

Set up partition indexes

You can create up to three partition indexes per table for new and existing tables. If you want to create a new table with partition indexes, you can include a list of PartitionIndex objects with the CreateTable API call. To add a partition index to an existing table, use the CreatePartitionIndex API call. You can also perform these actions from the AWS Glue console.

Let’s configure a new partition index for the table table_with_index we created with the CloudFormation template.

  1. On the AWS Glue console, choose Tables.
  2. Choose the table table_with_index.
  3. Choose Partitions and indices.
  4. Choose Add new index.
  5. For Index name, enter year-month-day-hour.
  6. For Selected keys from schema, select year, month, day, and hour. Make that you choose each column in this order, and confirm that Partition key for each column is correctly configured as follows:
    1. year: Partition (0)
    2. month: Partition (1)
    3. day: Partition (2)
    4. hour: Partition (3)
  7. Choose Add index.

The Status column of the newly created partition index shows as Creating. We need to wait for the partition index to be Active before it can be used by query engines. It should take about 1 hour to process and build the index for 367,920 partitions.

When the partition index is ready for table_with_index, you can use it when querying with Athena. For table_without_index, you should expect to see no change in query latency because no partition indexes were configured.

Enable partition filtering

To enable partition filtering in Athena, you need to update the table properties as follows:

  1. On the AWS Glue console, choose Tables.
  2. Choose the table table_with_index.
  3. Choose Edit table.
  4. Under Table properties, add the following:
    1. Keypartition_filtering.enabled
    2. Valuetrue
  5. Choose Apply.

Alternatively, you can set this parameter by running an ALTER TABLE SET PROPERTIES query in Athena:

ALTER TABLE partition_index.table_with_index
SET TBLPROPERTIES ('partition_filtering.enabled' = 'true')

Query tables using Athena

Now that your table has filtering enabled for Athena, let’s query both tables to see the performance differences.

First, query the table without using the partition index. In the Athena query editor, enter the following query:

SELECT count(*), sum(value) 
FROM partition_index.table_without_index 
WHERE year='2021' AND month='04' AND day='01'

The following screenshot shows the query took 44.9 seconds.

Next, query the table with using the partition index. You need to use the columns that are configured for the indexes in the WHERE clause to gain these performance benefits. Run the following query:

SELECT count(*), sum(value) 
FROM partition_index.table_with_index 
WHERE year='2021' AND month='04' AND day='01'

The following screenshot shows the query took just 1.3 seconds to complete, which is significantly faster than the table without indexes.

Query planning is the phase where the table and partition metadata are fetched from the AWS Glue Data Catalog. With partition indexes enabled, retrieving only the partitions required by the query can be done more efficiently and therefore quicker. Let’s retrieve the execution details of each query by using the AWS Command Line Interface (AWS CLI) to compare planning statistics.

The following is the query execution details for the query that ran against a table without partition indexes:

$ aws athena get-query-execution --query-execution-id 5e972df6-11f8-467a-9eea-77f509a23573 --query QueryExecution.Statistics --output table
--------------------------------------------
|             GetQueryExecution            |
+---------------------------------+--------+
|  DataScannedInBytes             |  1782  |
|  EngineExecutionTimeInMillis    |  44914 |
|  QueryPlanningTimeInMillis      |  44451 |
|  QueryQueueTimeInMillis         |  278   |
|  ServiceProcessingTimeInMillis  |  47    |
|  TotalExecutionTimeInMillis     |  45239 |
+---------------------------------+--------+

The following is the query execution details for a query that ran against a table with partition indexes:

% aws athena get-query-execution --query-execution-id 31d0b4ae-ae8d-4836-b20b-317fa9d9b79a --query QueryExecution.Statistics --output table
-------------------------------------------
|            GetQueryExecution            |
+---------------------------------+-------+
|  DataScannedInBytes             |  1782 |
|  EngineExecutionTimeInMillis    |  1361 |
|  QueryPlanningTimeInMillis      |  384  |
|  QueryQueueTimeInMillis         |  190  |
|  ServiceProcessingTimeInMillis  |  58   |
|  TotalExecutionTimeInMillis     |  1609 |
+---------------------------------+-------+

QueryPlanningTimeInMillis represents the number of milliseconds that Athena took to plan the query processing flow. This includes the time spent retrieving table partitions from the data source. Because the query engine performs the query planning, the query planning time is a subset of engine processing time.

Comparing the stats for both queries, we can see that QueryPlanningTimeInMillis is significantly lower in the query using partition indexes. It went from 44 seconds to 0.3 seconds when using partition indexes. The improvement in query planning resulted in a faster overall query runtime, going from 45 seconds to 1.3 seconds—a 35 times greater performance improvement.

Clean up

Now to the final step, cleaning up the resources:

  1. Delete the CloudFormation stack.
  2. Confirm both tables have been deleted from the AWS Glue Data Catalog.

Conclusion

At AWS, we strive to improve the performance of our services and our customers’ experience. The AWS Glue Data Catalog is a fully managed, Apache Hive compatible metastore that enables a wide range of big data, analytics, and machine learning services, like Athena, Amazon EMR, Redshift Spectrum, and AWS Glue ETL, to access data in the data lake. Athena customers can now further reduce query latency by enabling partition indexes for your tables in Amazon S3. Using partition indexes can improve the efficiency of retrieving metadata for highly partitioned tables ranging in the tens and hundreds of thousands and millions of partitions.

You can learn more about AWS Glue Data Catalog partition indexes in Working with Partition Indexes, and more about Athena best practices in Best Practices When Using Athena with AWS Glue.


About the Author

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is passionate about architecting fast-growing data platforms, diving deep into distributed big data software like Apache Spark, building reusable software artifacts for data lakes, and sharing the knowledge in AWS Big Data blog posts. In his spare time, he enjoys having and watching killifish, hermit crabs, and grubs with his children.