Tag Archives: AWS Glue

Build and manage your modern data stack using dbt and AWS Glue through dbt-glue, the new “trusted” dbt adapter

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/build-and-manage-your-modern-data-stack-using-dbt-and-aws-glue-through-dbt-glue-the-new-trusted-dbt-adapter/

dbt is an open source, SQL-first templating engine that allows you to write repeatable and extensible data transforms in Python and SQL. dbt focuses on the transform layer of extract, load, transform (ELT) or extract, transform, load (ETL) processes across data warehouses and databases through specific engine adapters to achieve extract and load functionality. It enables data engineers, data scientists, and analytics engineers to define the business logic with SQL select statements and eliminates the need to write boilerplate data manipulation language (DML) and data definition language (DDL) expressions. dbt lets data engineers quickly and collaboratively deploy analytics code following software engineering best practices like modularity, portability, continuous integration and continuous delivery (CI/CD), and documentation.

dbt is predominantly used by data warehouses (such as Amazon Redshift) customers who are looking to keep their data transform logic separate from storage and engine. We have seen a strong customer demand to expand its scope to cloud-based data lakes because data lakes are increasingly the enterprise solution for large-scale data initiatives due to their power and capabilities.

In 2022, AWS published a dbt adapter called dbt-glue—the open source, battle-tested dbt AWS Glue adapter that allows data engineers to use dbt for cloud-based data lakes along with data warehouses and databases, paying for just the compute they need. The dbt-glue adapter democratized access for dbt users to data lakes, and enabled many users to effortlessly run their transformation workloads on the cloud with the serverless data integration capability of AWS Glue. From the launch of the adapter, AWS has continued investing into dbt-glue to cover more requirements.

Today, we are pleased to announce that the dbt-glue adapter is now a trusted adapter based on our strategic collaboration with dbt Labs. Trusted adapters are adapters not maintained by dbt Labs, but adaptors that that dbt Lab is comfortable recommending to users for use in production.

The key capabilities of the dbt-glue adapter are as follows:

  • Runs SQL as Spark SQL on AWS Glue interactive sessions
  • Manages table definitions on the AWS Glue Data Catalog
  • Supports open table formats such as Apache Hudi, Delta Lake, and Apache Iceberg
  • Supports AWS Lake Formation permissions for fine-grained access control

In addition to those capabilities, the dbt-glue adapter is designed to optimize resource utilization with several techniques on top of AWS Glue interactive sessions.

This post demonstrates how the dbt-glue adapter helps your workload, and how you can build a modern data stack using dbt and AWS Glue using the dbt-glue adapter.

Common use cases

One common use case for using dbt-glue is if a central analytics team at a large corporation is responsible for monitoring operational efficiency. They ingest application logs into raw Parquet tables in an Amazon Simple Storage Service (Amazon S3) data lake. Additionally, they extract organized data from operational systems capturing the company’s organizational structure and costs of diverse operational components that they stored in the raw zone using Iceberg tables to maintain the original schema, facilitating easy access to the data. The team uses dbt-glue to build a transformed gold model optimized for business intelligence (BI). The gold model joins the technical logs with billing data and organizes the metrics per business unit. The gold model uses Iceberg’s ability to support data warehouse-style modeling needed for performant BI analytics in a data lake. The combination of Iceberg and dbt-glue allows the team to efficiently build a data model that’s ready to be consumed.

Another common use case is when an analytics team in a company that has an S3 data lake creates a new data product in order to enrich its existing data from its data lake with medical data. Let’s say that this company is located in Europe and the data product must comply with the GDPR. For this, the company uses Iceberg to meet needs such as the right to be forgotten and the deletion of data. The company uses dbt to model its data product on its existing data lake due to its compatibility with AWS Glue and Iceberg and the simplicity that the dbt-glue adapter brings to the use of this storage format.

How dbt and dbt-glue work

The following are key dbt features:

  • Project – A dbt project enforces a top-level structure on the staging, models, permissions, and adapters. A project can be checked into a GitHub repo for version control.
  • SQL – dbt relies on SQL select statements for defining data transformation logic. Instead of raw SQL, dbt offers templatized SQL (using Jinja) that allows code modularity. Instead of having to copy/paste SQL in multiple places, data engineers can define modular transforms and call those from other places within the project. Having a modular pipeline helps data engineers collaborate on the same project.
  • Models – dbt models are primarily written as a SELECT statement and saved as a .sql file. Data engineers define dbt models for their data representations. To learn more, refer to About dbt models.
  • Materializations – Materializations are strategies for persisting dbt models in a warehouse. There are five types of materializations built into dbt: table, view, incremental, ephemeral, and materialized view. To learn more, refer to Materializations and Incremental models.
  • Data lineage – dbt tracks data lineage, allowing you to understand the origin of data and how it flows through different transformations. dbt also supports impact analysis, which helps identify the downstream effects of changes.

The high-level data flow is as follows:

  1. Data engineers ingest data from data sources to raw tables and define table definitions for the raw tables.
  2. Data engineers write dbt models with templatized SQL.
  3. The dbt adapter converts dbt models to SQL statements compatible in a data warehouse.
  4. The data warehouse runs the SQL statements to create intermediate tables or final tables, views, or materialized views.

The following diagram illustrates the architecture.

dbt-glue works with the following steps:

  1. The dbt-glue adapter converts dbt models to SQL statements compatible in Spark SQL.
  2. AWS Glue interactive sessions run the SQL statements to create intermediate tables or final tables, views, or materialized views.
  3. dbt-glue supports csv, parquet, hudi, delta, and iceberg as fileformat.
  4. On the dbt-glue adapter, table or incremental are commonly used for materializations at the destination. There are three strategies for incremental materialization. The merge strategy requires hudi, delta, or iceberg. With the other two strategies, append and insert_overwrite, you can use csv, parquet, hudi, delta, or iceberg.

The following diagram illustrates this architecture.

Example use case

In this post, we use the data from the New York City Taxi Records dataset. This dataset is available in the Registry of Open Data on AWS (RODA), which is a repository containing public datasets from AWS resources. The raw Parquet table records in this dataset stores trip records.

The objective is to create the following three tables, which contain metrics based on the raw table:

  • silver_avg_metrics – Basic metrics based on NYC Taxi Open Data for the year 2016
  • gold_passengers_metrics – Metrics per passenger based on the silver metrics table
  • gold_cost_metrics – Metrics per cost based on the silver metrics table

The final goal is to create two well-designed gold tables that store already aggregated results in Iceberg format for ad hoc queries through Amazon Athena.

Prerequisites

The instruction requires following prerequisites:

  • An AWS Identity and Access Management (IAM) role with all the mandatory permissions to run an AWS Glue interactive session and the dbt-glue adapter
  • An AWS Glue database and table to store the metadata related to the NYC taxi records dataset
  • An S3 bucket to use as output and store the processed data
  • An Athena configuration (a workgroup and S3 bucket to store the output) to explore the dataset
  • An AWS Lambda function (created as an AWS CloudFormation custom resource) that updates all the partitions in the AWS Glue table

With these prerequisites, we simulate the situation that data engineers have already ingested data from data sources to raw tables, and defined table definitions for the raw tables.

For ease of use, we prepared a CloudFormation template. This template deploys all the required infrastructure. To create these resources, choose Launch Stack in the us-east-1 Region, and follow the instructions:

Install dbt, the dbt CLI, and the dbt adaptor

The dbt CLI is a command line interface for running dbt projects. It’s free to use and available as an open source project. Install dbt and the dbt CLI with the following code:

$ pip3 install --no-cache-dir dbt-core

For more information, refer to How to install dbt, What is dbt?, and Viewpoint.

Install the dbt adapter with the following code:

$ pip3 install --no-cache-dir dbt-glue

Create a dbt project

Complete the following steps to create a dbt project:

  1. Run the dbt init command to create and initialize a new empty dbt project:
    $ dbt init

  2. For the project name, enter dbt_glue_demo.
  3. For the database, choose glue.

Now the empty project has been created. The directory structure is shown as follows:

$ cd dbt_glue_demo 
$ tree .
.
├── README.md
├── analyses
├── dbt_project.yml
├── macros
├── models
│   └── example
│       ├── my_first_dbt_model.sql
│       ├── my_second_dbt_model.sql
│       └── schema.yml
├── seeds
├── snapshots
└── tests

Create a source

The next step is to create a source table definition. We add models/source_tables.yml with the following contents:

version: 2

sources:
  - name: data_source
    schema: nyctaxi

    tables:
      - name: records

This source definition corresponds to the AWS Glue table nyctaxi.records, which we created in the CloudFormation stack.

Create models

In this step, we create a dbt model that represents the average values for trip duration, passenger count, trip distance, and total amount of charges. Complete the following steps:

  1. Create the models/silver/ directory.
  2. Create the file models/silver/silver_avg_metrics.sql with the following contents:
    WITH source_avg as ( 
        SELECT avg((CAST(dropoff_datetime as LONG) - CAST(pickup_datetime as LONG))/60) as avg_duration 
        , avg(passenger_count) as avg_passenger_count 
        , avg(trip_distance) as avg_trip_distance 
        , avg(total_amount) as avg_total_amount
        , year
        , month 
        , type
        FROM {{ source('data_source', 'records') }} 
        WHERE year = "2016"
        AND dropoff_datetime is not null 
        GROUP BY year, month, type
    ) 
    SELECT *
    FROM source_avg

  3. Create the file models/silver/schema.yml with the following contents:
    version: 2
    
    models:
      - name: silver_avg_metrics
        description: This table has basic metrics based on NYC Taxi Open Data for the year 2016
    
        columns:
          - name: avg_duration
            description: The average duration of a NYC Taxi trip
    
          - name: avg_passenger_count
            description: The average number of passenger per NYC Taxi trip
    
          - name: avg_trip_distance
            description: The average NYC Taxi trip distance
    
          - name: avg_total_amount
            description: The avarage amount of a NYC Taxi trip
    
          - name: year
            description: The year of the NYC Taxi trip
    
          - name: month
            description: The month of the NYC Taxi trip 
    
          - name: type
            description: The type of the NYC Taxi 

  4. Create the models/gold/ directory.
  5. Create the file models/gold/gold_cost_metrics.sql with the following contents:
    {{ config(
        materialized='incremental',
        incremental_strategy='merge',
        unique_key=["year", "month", "type"],
        file_format='iceberg',
        iceberg_expire_snapshots='False',
        table_properties={'format-version': '2'}
    ) }}
    SELECT (avg_total_amount/avg_trip_distance) as avg_cost_per_distance
    , (avg_total_amount/avg_duration) as avg_cost_per_minute
    , year
    , month 
    , type 
    FROM {{ ref('silver_avg_metrics') }}

  6. Create the file models/gold/gold_passengers_metrics.sql with the following contents:
    {{ config(
        materialized='incremental',
        incremental_strategy='merge',
        unique_key=["year", "month", "type"],
        file_format='iceberg',
        iceberg_expire_snapshots='False',
        table_properties={'format-version': '2'}
    ) }}
    SELECT (avg_total_amount/avg_passenger_count) as avg_cost_per_passenger
    , (avg_duration/avg_passenger_count) as avg_duration_per_passenger
    , (avg_trip_distance/avg_passenger_count) as avg_trip_distance_per_passenger
    , year
    , month 
    , type 
    FROM {{ ref('silver_avg_metrics') }}

  7. Create the file models/gold/schema.yml with the following contents:
    version: 2
    
    models:
      - name: gold_cost_metrics
        description: This table has metrics per cost based on NYC Taxi Open Data
    
        columns:
          - name: avg_cost_per_distance
            description: The average cost per distance of a NYC Taxi trip
    
          - name: avg_cost_per_minute
            description: The average cost per minute of a NYC Taxi trip
    
          - name: year
            description: The year of the NYC Taxi trip
    
          - name: month
            description: The month of the NYC Taxi trip
    
          - name: type
            description: The type of the NYC Taxi
    
      - name: gold_passengers_metrics
        description: This table has metrics per passenger based on NYC Taxi Open Data
    
        columns:
          - name: avg_cost_per_passenger
            description: The average cost per passenger for a NYC Taxi trip
    
          - name: avg_duration_per_passenger
            description: The average number of passenger per NYC Taxi trip
    
          - name: avg_trip_distance_per_passenger
            description: The average NYC Taxi trip distance
    
          - name: year
            description: The year of the NYC Taxi trip
    
          - name: month
            description: The month of the NYC Taxi trip 
    
          - name: type
            description: The type of the NYC Taxi

  8. Remove the models/example/ folder, because it’s just an example created in the dbt init command.

Configure the dbt project

dbt_project.yml is a key configuration file for dbt projects. It contains the following code:

models:
  dbt_glue_demo:
    # Config indicated by + and applies to all files under models/example/
    example:
      +materialized: view

We configure dbt_project.yml to replace the preceding code with the following:

models:
  dbt_glue_demo:
    silver:
      +materialized: table

This is because that we want to materialize the models under silver as Parquet tables.

Configure a dbt profile

A dbt profile is a configuration that specifies how to connect to a particular database. The profiles are defined in the profiles.yml file within a dbt project.

Complete the following steps to configure a dbt profile:

  1. Create the profiles directory.
  2. Create the file profiles/profiles.yml with the following contents:
    dbt_glue_demo:
      target: dev
      outputs:
        dev:
          type: glue
          query-comment: demo-nyctaxi
          role_arn: "{{ env_var('DBT_ROLE_ARN') }}"
          region: us-east-1
          workers: 5
          worker_type: G.1X
          schema: "dbt_glue_demo_nyc_metrics"
          database: "dbt_glue_demo_nyc_metrics"
          session_provisioning_timeout_in_seconds: 120
          location: "{{ env_var('DBT_S3_LOCATION') }}"

  3. Create the profiles/iceberg/ directory.
  4. Create the file profiles/iceberg/profiles.yml with the following contents:
    dbt_glue_demo:
      target: dev
      outputs:
        dev:
          type: glue
          query-comment: demo-nyctaxi
          role_arn: "{{ env_var('DBT_ROLE_ARN') }}"
          region: us-east-1
          workers: 5
          worker_type: G.1X
          schema: "dbt_glue_demo_nyc_metrics"
          database: "dbt_glue_demo_nyc_metrics"
          session_provisioning_timeout_in_seconds: 120
          location: "{{ env_var('DBT_S3_LOCATION') }}"
          datalake_formats: "iceberg"
          conf: --conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.warehouse="{{ env_var('DBT_S3_LOCATION') }}"warehouse/ --conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"

The last two lines are added for setting Iceberg configurations on AWS Glue interactive sessions.

Run the dbt project

Now it’s time to run the dbt project. Complete the following steps:

  1. To run the project dbt, you should be in the project folder:
    $ cd dbt_glue_demo

  2. The project requires you to set environment variables in order to run on the AWS account:
    $ export DBT_ROLE_ARN="arn:aws:iam::$(aws sts get-caller-identity --query "Account" --output text):role/GlueInteractiveSessionRole"
    $ export DBT_S3_LOCATION="s3://aws-dbt-glue-datalake-$(aws sts get-caller-identity --query "Account" --output text)-us-east-1"

  3. Make sure the profile is set up correctly from the command line:
    $ dbt debug --profiles-dir profiles
    ...
    05:34:22 Connection test: [OK connection ok]
    05:34:22 All checks passed!

If you see any failures, check if you provided the correct IAM role ARN and S3 location in Step 2.

  1. Run the models with the following code:
    $ dbt run -m silver --profiles-dir profiles
    $ dbt run -m gold --profiles-dir profiles/iceberg/

Now the tables are successfully created in the AWS Glue Data Catalog, and the data is materialized in the Amazon S3 location.

You can verify those tables by opening the AWS Glue console, choosing Databases in the navigation pane, and opening dbt_glue_demo_nyc_metrics.

Query materialized tables through Athena

Let’s query the target table using Athena to verify the materialized tables. Complete the following steps:

  1. On the Athena console, switch the workgroup to athena-dbt-glue-aws-blog.
  2. If the workgroup athena-dbt-glue-aws-blog settings dialog box appears, choose Acknowledge.
  3. Use the following query to explore the metrics created by the dbt project:
    SELECT cm.avg_cost_per_minute
        , cm.avg_cost_per_distance
        , pm.avg_cost_per_passenger
        , cm.year
        , cm.month
        , cm.type
    FROM "dbt_glue_demo_nyc_metrics"."gold_passengers_metrics" pm
    LEFT JOIN "dbt_glue_demo_nyc_metrics"."gold_cost_metrics" cm
        ON cm.type = pm.type
        AND cm.year = pm.year
        AND cm.month = pm.month
    WHERE cm.type = 'yellow'
        AND cm.year = '2016'
        AND cm.month = '6'

The following screenshot shows the results of this query.

Review dbt documentation

Complete the following steps to review your documentation:

  1. Generate the following documentation for the project:
    $ dbt docs generate --profiles-dir profiles/iceberg
    11:41:51  Running with dbt=1.7.1
    11:41:51  Registered adapter: glue=1.7.1
    11:41:51  Unable to do partial parsing because profile has changed
    11:41:52  Found 3 models, 1 source, 0 exposures, 0 metrics, 478 macros, 0 groups, 0 semantic models
    11:41:52  
    11:41:53  Concurrency: 1 threads (target='dev')
    11:41:53  
    11:41:53  Building catalog
    11:43:32  Catalog written to /Users/username/Documents/workspace/dbt_glue_demo/target/catalog.json

  2. Run the following command to open the documentation on your browser:
    $ dbt docs serve --profiles-dir profiles/iceberg

  3. In the navigation pane, choose gold_cost_metrics under dbt_glue_demo/models/gold.

You can see the detailed view of the model gold_cost_metrics, as shown in the following screenshot.

  1. To see the lineage graph, choose the circle icon at the bottom right.

Clean up

To clean up your environment, complete the following steps:

  1. Delete the database created by dbt:
    $ aws glue delete-database —name dbt_glue_demo_nyc_metrics

  2. Delete all generated data:
    $ aws s3 rm s3://aws-dbt-glue-datalake-$(aws sts get-caller-identity —query "Account" —output text)-us-east-1/ —recursive
    $ aws s3 rm s3://aws-athena-dbt-glue-query-results-$(aws sts get-caller-identity —query "Account" —output text)-us-east-1/ —recursive

  3. Delete the CloudFormation stack:
    $ aws cloudformation delete-stack —stack-name dbt-demo

Conclusion

This post demonstrated how the dbt-glue adapter helps your workload, and how you can build a modern data stack using dbt and AWS Glue using the dbt-glue adapter. You learned the end-to-end operations and data flow for data engineers to build and manage a data stack using dbt and the dbt-glue adapter. To report issues or request a feature enhancement, feel free to open an issue on GitHub.


About the authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team at Amazon Web Services. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Benjamin Menuet is a Senior Data Architect on the AWS Professional Services team at Amazon Web Services. He helps customers develop data and analytics solutions to accelerate their business outcomes. Outside of work, Benjamin is a trail runner and has finished some iconic races like the UTMB.

Akira Ajisaka is a Senior Software Development Engineer on the AWS Glue team. He likes open source software and distributed systems. In his spare time, he enjoys playing arcade games.

Kinshuk Pahare is a Principal Product Manager on the AWS Glue team at Amazon Web Services.

Jason Ganz is the manager of the Developer Experience (DX) team at dbt Labs

Use anomaly detection with AWS Glue to improve data quality (preview)

Post Syndicated from Jeff Barr original https://aws.amazon.com/blogs/aws/use-anomaly-detection-with-aws-glue-to-improve-data-quality-preview/

We are launching a preview of a new AWS Glue Data Quality feature that will help to improve your data quality by using machine learning to detect statistical anomalies and unusual patterns. You get deep insights into data quality issues, data quality scores, and recommendations for rules that you can use to continuously monitor for anomalies, all without having to write any code.

Data quality counts
AWS customers already build data integration pipelines to extract and transform data. They set up data quality rules to ensure that the resulting data is of high quality and can be used to make accurate business decisions. In many cases, these rules assess the data based on criteria that were chosen and locked in at a specific point in time, reflecting the current state of the business. However, as the business environment changes and the properties of the data shift, the rules are not always reviewed and updated.

For example, a rule could be set to verify that daily sales are at least ten thousand dollars for an early-stage business. As the business succeeds and grows, the rule should be checked and updated from time to time, but in practice this rarely happens. As a result, if there’s an unexpected drop in sales, the outdated rule does not activate, and no one is happy.

Anomaly detection in action
To detect unusual patterns and to gain deeper insights into data, organizations try to create their own adaptive systems or turn to costly commercial solutions that require specific technical skills and specialized business knowledge.

To address this widespread challenge, Glue Data Quality now makes use of machine learning (ML).

Once activated, this cool new addition to Glue Data Quality gathers statistics as fresh data arrives, using ML and dynamic thresholds to learn from past patterns while looking outliers and unusual data patterns. This process produces observations and also visualizes trends so that you can quickly gain a better understanding of the anomaly.

You will also get rule recommendations as part of the Observations, and you can easily and progressively add them to your data pipelines. Rules can enforce an action such as stopping your data pipelines. In the past, you could only write static rules. Now, you can write Dynamic rules that have auto-adjusting thresholds and AnomalyDetection Rules that grasp recurring patterns and spot deviations. When you use rules as part of data pipelines, they can stop the data flow so that a data engineer can review, fix and resume.

To use anomaly detection, I add an Evaluate Data Quality node to my job:

I select the node and click Add analyzer to choose a statistic and the columns:

Glue Data Quality learns from the data to recognize patterns and then generates observations that will be shown in the Data quality tab:

And a visualization:

After I review the observations I add new rules. The first one sets adaptive thresholds that check the row count is between the smallest of the last 10 runs and the largest of the last 20 runs. The second one looks for unusual patters, for example RowCount being abnormally high on weekends:

Join the preview
This new capability is available in preview in the following AWS Regions: US East (Ohio), US East (N. Virginia), US West (Oregon), Asia Pacific (Tokyo), and Europe (Ireland). To learn more, read Data Quality Anomaly Detection]].

Stay tuned for a detailed blog post when this feature launches!

Learn more

Data Quality Anomaly Detection

Jeff;

Enhance query performance using AWS Glue Data Catalog column-level statistics

Post Syndicated from Sandeep Adwankar original https://aws.amazon.com/blogs/big-data/enhance-query-performance-using-aws-glue-data-catalog-column-level-statistics/

Today, we’re making available a new capability of AWS Glue Data Catalog that allows generating column-level statistics for AWS Glue tables. These statistics are now integrated with the cost-based optimizers (CBO) of Amazon Athena and Amazon Redshift Spectrum, resulting in improved query performance and potential cost savings.

Data lakes are designed for storing vast amounts of raw, unstructured, or semi-structured data at a low cost, and organizations share those datasets across multiple departments and teams. The queries on these large datasets read vast amounts of data and can perform complex join operations on multiple datasets. When talking with our customers, we learned that one the challenging aspect of data lake performance is how to optimize these analytics queries to execute faster.

The data lake performance optimization is especially important for queries with multiple joins and that is where cost-based optimizers helps the most. In order for CBO to work, column statistics need to be collected and updated based on changes in the data. We’re launching capability of generating column-level statistics such as number of distinct, number of nulls, max, and min on files such as Parquet, ORC, JSON, Amazon ION, CSV, XML on AWS Glue tables. With this launch, customers now have integrated end-to-end experience where statistics on Glue tables are collected and stored in the AWS Glue Catalog, and made available to analytics services for improved query planning and execution.

Using these statistics, cost-based optimizers improves query run plans and boosts the performance of queries run in Amazon Athena and Amazon Redshift Spectrum. For example, CBO can use column statistics such as number of distinct values and number of nulls to improve row prediction. Row prediction is the number of rows from a table that will be returned by a certain step during the query planning stage. The more accurate the row predictions are, the more efficient query execution steps are. This leads to faster query execution and potentially reduced cost. Some of the specific optimizations that CBO can employ include join reordering and push-down of aggregations based on the statistics available for each table and column.

For customers using data mesh with AWS Lake Formation permissions, tables from different data producers are cataloged in the centralized governance accounts. As they generate statistics on tables on centralized catalog and share those tables with consumers, queries on those tables in consumer accounts will see query performance improvements automatically. In this post, we’ll demonstrate the capability of AWS Glue Data Catalog to generate column statistics for our sample tables.

Solution overview

To demonstrate the effectiveness of this capability, we employ the industry-standard TPC-DS 3 TB dataset stored in an Amazon Simple Storage Service (Amazon S3) public bucket. We’ll compare the query performance before and after generating column statistics for the tables, by running queries in Amazon Athena and Amazon Redshift Spectrum. We are providing queries that we used in this post and we encourage to try out your own queries following workflow as illustrated in the following details.

The workflow consists of the following high level steps:

  1. Cataloging the Amazon S3 Bucket: Utilize AWS Glue Crawler to crawl the designated Amazon S3 bucket, extracting metadata, and seamlessly storing it in the AWS Glue data catalog. We’ll query these tables using Amazon Athena and Amazon Redshift Spectrum.
  2. Generating column statistics: Employ the enhanced capabilities of AWS Glue Data Catalog to generate comprehensive column statistics for the crawled data, thereby providing valuable insights into the dataset.
  3. Querying with Amazon Athena and Amazon Redshift Spectrum: Evaluate the impact of column statistics on query performance by utilizing Amazon Athena and Amazon Redshift Spectrum to execute queries on the dataset.

The following diagram illustrates the solution architecture.

Walkthrough

To implement the solution, we complete the following steps:

  1. Set up resources with AWS CloudFormation.
  2. Run AWS Glue Crawler on Public Amazon S3 bucket to list the 3TB TPC-DS dataset.
  3. Run queries on Amazon Athena and Amazon Redshift and note down query duration
  4. Generate statistics for AWS Glue Data Catalog tables
  5. Run queries on Amazon Athena and Amazon Redshift and compare query duration with previous run
  6. Optional: Schedule AWS Glue column statistics jobs using AWS Lambda and the Amazon EventBridge Scheduler

Set up resources with AWS CloudFormation

This post includes an AWS CloudFormation template for a quick setup. You can review and customize it to suit your needs. The template generates the following resources:

  • An Amazon Virtual Private Cloud (Amazon VPC), public subnet, private subnets and route tables.
  • An Amazon Redshift Serverless workgroup and namespace.
  • An AWS Glue crawler to crawl the public Amazon S3 bucket and create a table for the Glue Data Catalog for TPC-DS dataset
  • AWS Glue catalog databases and tables
  • An Amazon S3 bucket to store athena result.
  • AWS Identity and Access Management (AWS IAM) users and policies.
  • AWS Lambda and Amazon Event Bridge scheduler to schedule the AWS Glue Column statistics

To launch the AWS CloudFormation stack, complete the following steps:

Note: The AWS Glue data catalog tables are generated using the public bucket s3://blogpost-sparkoneks-us-east-1/blog/BLOG_TPCDS-TEST-3T-partitioned/, hosted in the us-east-1 region. If you intend to deploy this AWS CloudFormation template in a different region, it is necessary to either copy the data to the corresponding region or share the data within your deployed region for it to be accessible from Amazon Redshift.

  1. Log in to the AWS Management Console as AWS Identity and Access Management (AWS IAM) administrator.
  2. Choose Launch Stack to deploy a AWS CloudFormation template.
  3. Choose Next.
  4. On the next page, keep all the option as default or make appropriate changes based on your requirement choose Next.
  5. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  6. Choose Create.

This stack can take around 10 minutes to complete, after which you can view the deployed stack on the AWS CloudFormation console.

Run the AWS Glue Crawlers created by the AWS CloudFormation stack

To run your crawlers, complete the following steps:

  1. On the AWS Glue console to AWS Glue Console, choose Crawlers under Data Catalog in the navigation pane.
  2. Locate and run two crawlers tpcdsdb-without-stats and tpcdsdb-with-stats. It may take few mins to complete.

Once the crawler completes successfully, it would create two identical databases tpcdsdbnostats and tpcdsdbwithstats. The tables in tpcdsdbnostats will have No Stats and we’ll use them as reference. We generate statistics on tables in tpcdsdbwithstats. Please verify that you have those two databases and underlying tables from the AWS Glue Console. The tpcdsdbnostats database will look like below. At this time there are no statistics generated on these tables.

Run provided query using Amazon Athena on no-stats tables

To run your query in Amazon Athena on tables without statistics, complete the following steps:

  1. Download the athena queries from here.
  2. On the Amazon Athena Console, choose the provided query one at a time for tables in database tpcdsdbnostats.
  3. Run the query and note down the Run time for each query.

Run provided query using Amazon Redshift Spectrum on no-stats tables

To run your query in Amazon Redshift, complete the following steps:

  1. Download the Amazon Redshift queries from here.
  2. On the Redshift query editor v2, execute the Redshift Query for tables without stats section from downloaded query.
  3. Run the query and note down the query execution of each query.

Generate statistics on AWS Glue Catalog tables

To generate statistics on AWS Glue Catalog tables, complete the following steps:

  1. Navigate to the AWS Glue Console and choose the databases under Data Catalog.
  2. Click on tpcdsdbwithstats database and it will list all the available tables.
  3. Select any of these tables (e.g., call_center).
  4. Go to Column statistics – new tab and choose Generate statistics.
  5. Keep the default option. Under Choose columns keep Table (All columns) and Under Row sampling options Keep All rows, Under IAM role choose AWSGluestats-blog and select Generate statistics.

You’ll be able to see status of the statistics generation run as shown in the following illustration:

After generate statistics on AWS Glue Catalog tables, you should be able to see detailed column statistics for that table:

Reiterate steps 2–5 to generate statistics for all necessary tables, such as catalog_sales, catalog_returns, warehouse, item, date_dim, store_sales, customer, customer_address, web_sales, time_dim, ship_mode, web_site, web_returns. Alternatively, you can follow the “Schedule AWS Glue Statistics Runs” section near the end of this blog to generate statistics for all tables. Once done, assess query performance for each query.

Run provided query using Athena Console on stats tables

  1. On the Amazon Athena console, execute the Athena Query for tables with stats section from downloaded query.
  2. Run and note down the query execution of each query.

In our sample run of the queries on the tables, we observed the query execution time as per the below table. We saw clear improvement in the query performance, ranging from 13 to 55%.

Athena query time improvement

TPC-DS 3T Queries without glue stats (sec) with glue stats (sec) performance improvement (%)
Query 2 33.62 15.17 55%
Query 4 132.11 72.94 45%
Query 14 134.77 91.48 32%
Query 28 55.99 39.36 30%
Query 38 29.32 25.58 13%

Run the provided query using Amazon Redshift Spectrum on statistics tables

  1. On the Amazon Redshift query editor v2, execute the Redshift Query for tables with stats section from downloaded query.
  2. Run the query and note down the query execution of each query.

In our sample run of the queries on the tables, we observed the query execution time as per the below table. We saw clear improvement in the query performance, ranging from 13 to 89%.

Amazon Redshift Spectrum query time improvement

TPC-DS 3T Queries without glue stats (sec) with glue stats (sec) performance improvement (%)
Query 40 124.156 13.12 89%
Query 60 29.52 16.97 42%
Query 66 18.914 16.39 13%
Query 95 308.806 200 35%
Query 99 20.064 16 20%

Schedule AWS Glue statistics Runs

In this segment of the post, we’ll guide you through the steps of scheduling AWS Glue column statistics runs using AWS Lambda and the Amazon EventBridge Scheduler. To streamline this process, a AWS Lambda function and an Amazon EventBridge scheduler were created as part of the CloudFormation stack deployment.

  1. AWS Lambda function setup:

To begin, we utilize an AWS Lambda function to trigger the execution of the AWS Glue column statistics job. The AWS Lambda function invokes the start_column_statistics_task_run API through the boto3 (AWS SDK for Python) library. This sets the groundwork for automating the column statistics update.

Let’s explore the AWS Lambda function:

    • Go to the AWS Glue Lambda Console.
    • Select Functions and locate the GlueTableStatisticsFunctionv1.
    • For a clearer understanding of the AWS Lambda function, we recommend reviewing the code in the Code section and examining the environment variables under Configuration.
  1. Amazon EventBridge scheduler configuration

The next step involves scheduling the AWS Lambda function invocation using the Amazon EventBridge Scheduler. The scheduler is configured to trigger the AWS Lambda function daily at a specific time – in this case, 08:00 PM. This ensures that the AWS Glue column statistics job runs on a regular and predictable basis.

Now, let’s explore how you can update the schedule:

Cleaning up

To avoid unwanted charges to your AWS account, delete the AWS resources:

  1. Sign into the AWS CloudFormation console as the AWS IAM administrator used for creating the AWS CloudFormation stack.
  2. Delete the AWS CloudFormation stack you created.

Conclusion

In this post, we showed you how you can use AWS Glue Data Catalog to generate column-level statistics for AWS Glue tables. These statistics are now integrated with cost-based optimizer from Amazon Athena and Amazon Redshift Spectrum, resulting in improved query performance and potential costs savings. Refer to Docs for support for Glue Catalog Statistics across various AWS analytical services.

If you have questions or suggestions, submit them in the comments section.


About the Authors

Sandeep Adwankar is a Senior Technical Product Manager at AWS. Based in the California Bay Area, he works with customers around the globe to translate business and technical requirements into products that enable customers to improve how they manage, secure, and access data.

Navnit Shukla serves as an AWS Specialist Solution Architect with a focus on Analytics. He possesses a strong enthusiasm for assisting clients in discovering valuable insights from their data. Through his expertise, he constructs innovative solutions that empower businesses to arrive at informed, data-driven choices. Notably, Navnit Shukla is the accomplished author of the book titled Data Wrangling on AWS. He can be reached via LinkedIn.

Introducing Apache Hudi support with AWS Glue crawlers

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/introducing-apache-hudi-support-with-aws-glue-crawlers/

Apache Hudi is an open table format that brings database and data warehouse capabilities to data lakes. Apache Hudi helps data engineers manage complex challenges, such as managing continuously evolving datasets with transactions while maintaining query performance. Data engineers use Apache Hudi for streaming workloads as well as to create efficient incremental data pipelines. Hudi provides tables, transactions, efficient upserts and deletes, advanced indexes, streaming ingestion services, data clustering and compaction optimizations, and concurrency control, all while keeping your data in open source file formats. Hudi’s advanced performance optimizations make analytical workloads faster with any of the popular query engines including Apache Spark, Presto, Trino, Hive, and so on.

Many AWS customers adopted Apache Hudi on their data lakes built on top of Amazon S3 using AWS Glue, a serverless data integration service that makes it easier to discover, prepare, move, and integrate data from multiple sources for analytics, machine learning (ML), and application development. AWS Glue Crawler is a component of AWS Glue, which allows you to create table metadata from data content automatically without requiring manual definition of the metadata.

AWS Glue crawlers now support Apache Hudi tables, simplifying the adoption of AWS Glue Data Catalog as the catalog for Hudi tables. One typical use case is to register Hudi tables, which does not have catalog table definition. Another typical use case is migration from other Hudi catalogs, such as Hive metastore. When migrating from other Hudi Catalogs, you can create and schedule an AWS Glue crawler and provide one or more Amazon S3 paths where the Hudi table files are located. You have the option to provide the maximum depth of Amazon S3 paths that the AWS Glue crawler can traverse. With each run, AWS Glue crawlers will extract schema and partition information and update AWS Glue Data Catalog with the schema and partition changes. AWS Glue crawlers updates the latest metadata file location in the AWS Glue Data Catalog that AWS analytical engines can directly use.

With this launch, you can create and schedule an AWS Glue crawler to register Hudi tables in AWS Glue Data Catalog. You can then provide one or multiple Amazon S3 paths where the Hudi tables are located. You have the option to provide the maximum depth of Amazon S3 paths that crawlers can traverse. With each crawler run, the crawler inspects each of the S3 paths and catalogs the schema information, such as new tables, deletes, and updates to schemas in the AWS Glue Data Catalog. Crawlers inspect partition information and add newly added partitions to AWS Glue Data Catalog. Crawlers also update the latest metadata file location in the AWS Glue Data Catalog that AWS analytical engines can directly use.

This post demonstrates how this new capability to crawl Hudi tables works.

How AWS Glue crawler works with Hudi tables

Hudi tables have two categories, with specific implications for each:

  • Copy on write (CoW) – Data is stored in a columnar format (Parquet), and each update creates a new version of files during a write.
  • Merge on read (MoR) – Data is stored using a combination of columnar (Parquet) and row-based (Avro) formats. Updates are logged to row-based delta files and are compacted as needed to create new versions of the columnar files.

With CoW datasets, each time there is an update to a record, the file that contains the record is rewritten with the updated values. With a MoR dataset, each time there is an update, Hudi writes only the row for the changed record. MoR is better suited for write- or change-heavy workloads with fewer reads. CoW is better suited for read-heavy workloads on data that change less frequently.

Hudi provides three query types for accessing the data:

  • Snapshot queries – Queries that see the latest snapshot of the table as of a given commit or compaction action. For MoR tables, snapshot queries expose the most recent state of the table by merging the base and delta files of the latest file slice at the time of the query.
  • Incremental queries – Queries only see new data written to the table, since a given commit or compaction. This effectively provides change streams to enable incremental data pipelines.
  • Read optimized queries – For MoR tables, queries see the latest data compacted. For CoW tables, queries see the latest data committed.

For copy-on-write tables, crawlers create a single table in the AWS Glue Data Catalog with the ReadOptimized Serde  org.apache.hudi.hadoop.HoodieParquetInputFormat.

For merge-on-read tables, crawlers create two tables in AWS Glue Data Catalog for the same table location:

  • A table with suffix _ro, which uses the ReadOptimized Serde org.apache.hudi.hadoop.HoodieParquetInputFormat
  • A table with suffix _rt, which uses the RealTime Serde allowing for Snapshot queries: org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat

During each crawl, for each Hudi path provided, crawlers make an Amazon S3 list API call, filter based on the .hoodie folders, and find the most recent metadata file under that Hudi table metadata folder.

Crawl a Hudi CoW table using AWS Glue crawler

In this section, let’s go through how to crawl a Hudi CoW using AWS Glue crawlers.

Prerequisites

Here are the prerequisites for this tutorial:

  1. Install and configure AWS Command Line Interface (AWS CLI).
  2. Create your S3 bucket if you do not have it.
  3. Create your IAM role for AWS Glue if you do not have it. You need s3:GetObject for s3://your_s3_bucket/data/sample_hudi_cow_table/.
  4. Run the following command to copy the sample Hudi table into your S3 bucket. (Replace your_s3_bucket with your S3 bucket name.)
$ aws s3 sync s3://aws-bigdata-blog/artifacts/hudi-crawler/product_cow/ s3://your_s3_bucket/data/sample_hudi_cow_table/

This instruction guides you to copy sample data, but you can create any Hudi tables easily using AWS Glue. Learn more in Introducing native support for Apache Hudi, Delta Lake, and Apache Iceberg on AWS Glue for Apache Spark, Part 2: AWS Glue Studio Visual Editor.

Create a Hudi crawler

In this instruction, create the crawler through the console. Complete the following steps to create a Hudi crawler:

  1. On the AWS Glue console, choose Crawlers.
  2. Choose Create crawler.
  3. For Name, enter hudi_cow_crawler. Choose Next.
  4. Under Data source configuration,  choose Add data source.
    1. For Data source, choose Hudi.
    2. For Include hudi table paths, enter s3://your_s3_bucket/data/sample_hudi_cow_table/. (Replace your_s3_bucket with your S3 bucket name.)
    3. Choose Add Hudi data source.
  5. Choose Next.
  6. For Existing IAM role, choose your IAM role, then choose Next.
  7. For Target database, choose Add database, then the Add database dialog appears. For Database name, enter hudi_crawler_blog, then choose Create. Choose Next.
  8. Choose Create crawler.

Now a new Hudi crawler has been successfully created. The crawler can be triggered to run through the console or through the SDK or AWS CLI using the StartCrawl API. It could also be scheduled through the console to trigger the crawlers at specific times. In this instruction, run the crawler through the console.

  1. Choose Run crawler.
  2. Wait for the crawler to complete.

After the crawler has run, you can see the Hudi table definition in the AWS Glue console:

You have successfully crawled the Hudi CoR table with data on Amazon S3 and created an AWS Glue Data Catalog table with the schema populated. After you create the table definition on AWS Glue Data Catalog, AWS analytics services such as Amazon Athena are able to query the Hudi table.

Complete the following steps to start queries on Athena:

  1. Open the Amazon Athena console.
  2. Run the following query.
SELECT * FROM "hudi_crawler_blog"."sample_hudi_cow_table" limit 10;

The following screenshot shows our output:

Crawl a Hudi MoR table using AWS Glue crawler with AWS Lake Formation data permissions

In this section, let’s go through how to crawl a Hudi MoR table using AWS Glue. This time, you use AWS Lake Formation data permission for crawling Amazon S3 data sources instead of IAM and Amazon S3 permission. This is optional, but it simplifies permission configurations when your data lake is managed by AWS Lake Formation permissions.

Prerequisites

Here are the prerequisites for this tutorial:

  1. Install and configure AWS Command Line Interface (AWS CLI).
  2. Create your S3 bucket if you do not have it.
  3. Create your IAM role for AWS Glue if you do not have it. You need lakeformation:GetDataAccess. But you do not need s3:GetObject for s3://your_s3_bucket/data/sample_hudi_mor_table/ because we use Lake Formation data permission to access the files.
  4. Run the following command to copy the sample Hudi table into your S3 bucket. (Replace your_s3_bucket with your S3 bucket name.)
$ aws s3 sync s3://aws-bigdata-blog/artifacts/hudi-crawler/product_mor/ s3://your_s3_bucket/data/sample_hudi_mor_table/

In addition to the processing steps, complete the following steps to update the AWS Glue Data Catalog settings to use Lake Formation permissions to control catalog resources instead of IAM-based access control:

  1. Sign in to the Lake Formation console as a data lake administrator.
    1. If this is the first time accessing the Lake Formation console, add yourself as the data lake administrator.
  2. Under Administration, choose Data catalog settings.
  3. For Default permissions for newly created databases and tables, deselect Use only IAM access control for new databases and Use only IAM access control for new tables in new databases.
  4. For Cross account version setting, choose Version 3.
  5. Choose Save.

The next step is to register your S3 bucket in Lake Formation data lake locations:

  1. On the Lake Formation console, choose Data lake locations, and choose Register location.
  2. For Amazon S3 path, enter s3://your_s3_bucket/. (Replace your_s3_bucket with your S3 bucket name.)
  3. Choose Register location.

Then, grant Glue crawler role access to data location so that the crawler can use Lake Formation permission to access the data and create tables in the location:

  1. On the Lake Formation console, choose Data locations and choose Grant.
  2. For IAM users and roles, select the IAM role you used for the crawler.
  3. For Storage location, enter s3://your_s3_bucket/data/. (Replace your_s3_bucket with your S3 bucket name.)
  4. Choose Grant.

Then, grant crawler role to create tables under the database hudi_crawler_blog:

  1. On the Lake Formation console, choose Data lake permissions.
  2. Choose Grant.
  3. For Principals, choose IAM users and roles, and choose the crawler role.
  4. For LF tags or catalog resources, choose Named data catalog resources.
  5. For Database, choose the database hudi_crawler_blog.
  6. Under Database permissions, select Create table.
  7. Choose Grant.

Create a Hudi crawler with Lake Formation data permissions

Complete the following steps to create a Hudi crawler:

  1. On the AWS Glue console, choose Crawlers.
  2. Choose Create crawler.
  3. For Name, enter hudi_mor_crawler. Choose Next.
  4. Under Data source configuration,  choose Add data source.
    1. For Data source, choose Hudi.
    2. For Include hudi table paths, enter s3://your_s3_bucket/data/sample_hudi_mor_table/. (Replace your_s3_bucket with your S3 bucket name.)
    3. Choose Add Hudi data source.
  5. Choose Next.
  6. For Existing IAM role, choose your IAM role.
  7. Under Lake Formation configuration – optional, select Use Lake Formation credentials for crawling S3 data source.
  8. Choose Next.
  9. For Target database, choose hudi_crawler_blog. Choose Next.
  10. Choose Create crawler.

Now a new Hudi crawler has been successfully created. The crawler uses Lake Formation credentials for crawling Amazon S3 files. Let’s run the new crawler:

  1. Choose Run crawler.
  2. Wait for the crawler to complete.

After the crawler has run, you can see two tables of the Hudi table definition in the AWS Glue console:

  • sample_hudi_mor_table_ro (read optimized table)
  • sample_hudi_mor_table_rt (real time table)

You registered the data lake bucket with Lake Formation and enabled crawling access to the data lake using Lake Formation permissions. You have successfully crawled the Hudi MoR table with data on Amazon S3 and created an AWS Glue Data Catalog table with the schema populated. After you create the table definitions on AWS Glue Data Catalog, AWS analytics services such as Amazon Athena are able to query the Hudi table.

Complete the following steps to start queries on Athena:

  1. Open the Amazon Athena console.
  2. Run the following query.
    SELECT * FROM "hudi_crawler_blog"."sample_hudi_mor_table_rt" limit 10;

The following screenshot shows our output:

  1. Run the following query.
    SELECT * FROM "hudi_crawler_blog"."sample_hudi_mor_table_ro" limit 10;

The following screenshot shows our output:

Fine-grained access control using AWS Lake Formation permissions

To apply fine-grained access control on the Hudi table, you can benefit from AWS Lake Formation permissions. Lake Formation permissions allow you to restrict access to specific tables, columns, or rows and then query the Hudi tables through Amazon Athena with fine-grained access control. Let’s configure Lake Formation permission for the Hudi MoR table.

Prerequisites

Here are the prerequisites for this tutorial:

  1. Complete the previous section Crawl a Hudi MoR table using AWS Glue crawler with AWS Lake Formation data permissions.
  2. Create an IAM user DataAnalyst, who has AWS managed policy AmazonAthenaFullAccess.

Create a Lake Formation data cell filter

Let’s first set up a filter for the MoR read optimized table.

  1. Sign in to the Lake Formation console as a data lake administrator.
  2. Choose Data filters.
  3. Choose Create new filter.
  4. For Data filter name, enter exclude_product_price.
  5. For Target database, choose the database hudi_crawler_blog.
  6. For Target table, choose the table sample_hudi_mor_table_ro.
  7. For Column-level access, select Exclude columns, and choose the column price.
  8. For Row filter expression, enter true.
  9. Choose Create filter.

Grant Lake Formation permissions to the DataAnalyst user

Complete the following steps to grant Lake Formation permission to the DataAnalyst user

  1. On the Lake Formation console, choose Data lake permissions.
  2. Choose Grant.
  3. For Principals, choose IAM users and roles, and choose the user DataAnalyst.
  4. For LF tags or catalog resources, choose Named data catalog resources.
  5. For Database, choose the database hudi_crawler_blog.
  6. For Table – optional, choose the table sample_hudi_mor_table_ro.
  7. For Data filters – optional, select exclude_product_price.
  8. For Data filter permissions, select Select.
  9. Choose Grant.

You granted Lake Formation permission on the database hudi_crawler_blog and the table sample_hudi_mor_table_ro, excluding the column price to the DataAnalyst user. Now let’s validate user access to the data using Athena.

  1. Sign in to the Athena console as a DataAnalyst user.
  2. On the query editor, run the following query:
    SELECT * FROM "hudi_crawler_blog"."sample_hudi_mor_table_ro" limit 10;

The following screenshot shows our output:

Now you validated that the column price is not shown, but the other columns product_id, product_name, update_at, and category are shown.

Clean up

To avoid unwanted charges to your AWS account, delete the following AWS resources:

  1. Delete AWS Glue database hudi_crawler_blog.
  2. Delete AWS Glue crawlers hudi_cow_crawler and hudi_mor_crawler.
  3. Delete Amazon S3 files under s3://your_s3_bucket/data/sample_hudi_cow_table/ and s3://your_s3_bucket/data/sample_hudi_mor_table/.

Conclusion

This post demonstrated how AWS Glue crawlers work for Hudi tables. With the support for Hudi crawler, you can quickly move to using AWS Glue Data Catalog as your primary Hudi table catalog. You can start building your serverless transactional data lake using Hudi on AWS using AWS Glue, AWS Glue Data Catalog, and Lake Formation fine-grained access controls for tables and formats supported by AWS analytical engines.


About the authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Kyle Duong is a Software Development Engineer on the AWS Glue and Lake Formation team. He is passionate about building big data technologies and distributed systems.

Sandeep Adwankar is a Senior Technical Product Manager at AWS. Based in the California Bay Area, he works with customers around the globe to translate business and technical requirements into products that enable customers to improve how they manage, secure, and access data.

Enhance monitoring and debugging for AWS Glue jobs using new job observability metrics

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/enhance-monitoring-and-debugging-for-aws-glue-jobs-using-new-job-observability-metrics/

For any modern data-driven company, having smooth data integration pipelines is crucial. These pipelines pull data from various sources, transform it, and load it into destination systems for analytics and reporting. When running properly, it provides timely and trustworthy information. However, without vigilance, the varying data volumes, characteristics, and application behavior can cause data pipelines to become inefficient and problematic. Performance can slow down or pipelines can become unreliable. Undetected errors result in bad data and impact downstream analysis. That’s why robust monitoring and troubleshooting for data pipelines is essential across the following four areas:

  • Reliability
  • Performance
  • Throughput
  • Resource utilization

Together, these four aspects of monitoring provide end-to-end visibility and control over a data pipeline and its operations.

Today we are pleased to announce a new class of Amazon CloudWatch metrics reported with your pipelines built on top of AWS Glue for Apache Spark jobs. The new metrics provide aggregate and fine-grained insights into the health and operations of your job runs and the data being processed. In addition to providing insightful dashboards, the metrics provide classification of errors, which helps with root cause analysis of performance bottlenecks and error diagnosis. With this analysis, you can evaluate and apply the recommended fixes and best practices for architecting your jobs and pipelines. As a result, you gain the benefit of higher availability, better performance, and lower cost for your AWS Glue for Apache Spark workload.

This post demonstrates how the new enhanced metrics help you monitor and debug AWS Glue jobs.

Enable the new metrics

The new metrics can be configured through the job parameter enable-observability-metrics.

The new metrics are enabled by default on the AWS Glue Studio console. To configure the metrics on the AWS Glue Studio console, complete the following steps:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Under Your jobs, choose your job.
  3. On the Job details tab, expand Advanced properties.
  4. Under Job observability metrics, select Enable the creation of additional observability CloudWatch metrics when this job runs.

To enable the new metrics in the AWS Glue CreateJob and StartJobRun APIs, set the following parameters in the DefaultArguments property:

  • Key--enable-observability-metrics
  • Valuetrue

To enable the new metrics in the AWS Command Line Interface (AWS CLI), set the same job parameters in the --default-arguments argument.

Use case

A typical workload for AWS Glue for Apache Spark jobs is to load data from a relational database to a data lake with SQL-based transformations. The following is a visual representation of an example job where the number of workers is 10.

When the example job ran, the workerUtilization metrics showed the following trend.

Note that workerUtilization showed values between 0.20 (20%) and 0.40 (40%) for the entire duration. This typically happens when the job capacity is over-provisioned and many Spark executors were idle, resulting in unnecessary cost. To improve resource utilization efficiency, it’s a good idea to enable AWS Glue Auto Scaling. The following screenshot shows the same workerUtilization metrics graph when AWS Glue Auto Scaling is enabled for the same job.

workerUtilization showed 1.0 in the beginning because of AWS Glue Auto Scaling and it trended between 0.75 (75%) and 1.0 (100%) based on the workload requirements.

Query and visualize metrics in CloudWatch

Complete the following steps to query and visualize metrics on the CloudWatch console:

  1. On the CloudWatch console, choose All metrics in the navigation pane.
  2. Under Custom namespaces, choose Glue.
  3. Choose Observability Metrics (or Observability Metrics Per Source, or Observability Metrics Per Sink).
  4. Search for and select the specific metric name, job name, job run ID, and observability group.
  5. On the Graphed metrics tab, configure your preferred statistic, period, and so on.

Query metrics using the AWS CLI

Complete the following steps for querying using the AWS CLI (for this example, we query the worker utilization metric):

  1. Create a metric definition JSON file (provide your AWS Glue job name and job run ID):
    $ cat multiplequeries.json
    [
      {
        "Id": "avgWorkerUtil_0",
        "MetricStat" : {
          "Metric" : {
            "Namespace": "Glue",
            "MetricName": "glue.driver.workerUtilization",
            "Dimensions": [
              {
                  "Name": "JobName",
                  "Value": "<your-Glue-job-name-A>"
              },
              {
                "Name": "JobRunId",
                "Value": "<your-Glue-job-run-id-A>"
              },
              {
                "Name": "Type",
                "Value": "gauge"
              },
              {
                "Name": "ObservabilityGroup",
                "Value": "resource_utilization"
              }
            ]
          },
          "Period": 1800,
          "Stat": "Minimum",
          "Unit": "None"
        }
      },
      {
          "Id": "avgWorkerUtil_1",
          "MetricStat" : {
          "Metric" : {
            "Namespace": "Glue",
            "MetricName": "glue.driver.workerUtilization",
            "Dimensions": [
               {
                 "Name": "JobName",
                 "Value": "<your-Glue-job-name-B>"
               },
               {
                 "Name": "JobRunId",
                 "Value": "<your-Glue-job-run-id-B>"
               },
               {
                 "Name": "Type",
                 "Value": "gauge"
               },
               {
                 "Name": "ObservabilityGroup",
                 "Value": "resource_utilization"
               }
            ]
          },
          "Period": 1800,
          "Stat": "Minimum",
          "Unit": "None"
        }
      }
    ]

  2. Run the get-metric-data command:
    $ aws cloudwatch get-metric-data --metric-data-queries file://multiplequeries.json \
         --start-time '2023-10-28T18:20' \
         --end-time '2023-10-28T19:10'  \
         --region us-east-1
    {
        "MetricDataResults": [
          {
             "Id": "avgWorkerUtil_0",
             "Label": "<your label A>",
             "Timestamps": [
                   "2023-10-28T18:20:00+00:00"
             ], 
             "Values": [
                   0.06718750000000001
             ],
             "StatusCode": "Complete"
          },
          {
             "Id": "avgWorkerUtil_1",
             "Label": "<your label B>",
             "Timestamps": [
                  "2023-10-28T18:20:00+00:00"
              ],
              "Values": [
                  0.5959183673469387
              ],
              "StatusCode": "Complete"
           }
        ],
        "Messages": []
    }

Create a CloudWatch alarm

You can create static threshold-based alarms for the different metrics. For instructions, refer to Create a CloudWatch alarm based on a static threshold.

For example, for skewness, you can set an alarm for skewness.stage with a threshold of 1.0, and skewness.job with a threshold of 0.5. This threshold is just a recommendation; you can adjust the threshold based on your specific use case (for example, some jobs are expected to be skewed and it’s not an issue to be alarmed for). Our recommendation is to evaluate the metric values of your job runs for some time before qualifying the anomalous values and configuring the thresholds to alarm.

Other enhanced metrics

For a full list of other enhanced metrics available with AWS Glue jobs, refer to Monitoring with AWS Glue Observability metrics. These metrics allow you to capture the operational insights of your jobs, such as resource utilization (memory and disk), normalized error classes such as compilation and syntax, user or service errors, and throughput for each source or sink (records, files, partitions, and bytes read or written).

Job observability dashboards

You can further simplify observability for your AWS Glue jobs using dashboards for the insight metrics that enable real-time monitoring using Amazon Managed Grafana, and enable visualization and analysis of trends with Amazon QuickSight.

Conclusion

This post demonstrated how the new enhanced CloudWatch metrics help you monitor and debug AWS Glue jobs. With these enhanced metrics, you can more easily identify and troubleshoot issues in real time. This results in AWS Glue jobs that experience higher uptime, faster processing, and reduced expenditures. The end benefit for you is more effective and optimized AWS Glue for Apache Spark workloads. The metrics are available in all AWS Glue supported Regions. Check it out!


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his new road bike.

Shenoda Guirguis is a Senior Software Development Engineer on the AWS Glue team. His passion is in building scalable and distributed Data Infrastructure/Processing Systems. When he gets a chance, Shenoda enjoys reading and playing soccer.

Sean Ma is a Principal Product Manager on the AWS Glue team. He has an 18+ year track record of innovating and delivering enterprise products that unlock the power of data for users. Outside of work, Sean enjoys scuba diving and college football.

Mohit Saxena is a Senior Software Development Manager on the AWS Glue team. His team focuses on building distributed systems to enable customers with interactive and simple to use interfaces to efficiently manage and transform petabytes of data seamlessly across data lakes on Amazon S3, databases and data-warehouses on cloud.

Introducing AWS Glue serverless Spark UI for better monitoring and troubleshooting

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/introducing-aws-glue-studio-serverless-spark-ui-for-better-monitoring-and-troubleshooting/

In AWS, hundreds of thousands of customers use AWS Glue, a serverless data integration service, to discover, combine, and prepare data for analytics and machine learning. When you have complex datasets and demanding Apache Spark workloads, you may experience performance bottlenecks or errors during Spark job runs. Troubleshooting these issues can be difficult and delay getting jobs working in production. Customers often use Apache Spark Web UI, a popular debugging tool that is part of open source Apache Spark, to help fix problems and optimize job performance. AWS Glue supports Spark UI in two different ways, but you need to set it up yourself. This requires time and effort spent managing networking and EC2 instances, or through trial-and error with Docker containers.

Today, we are pleased to announce serverless Spark UI built into the AWS Glue console. You can now use Spark UI easily as it’s a built-in component of the AWS Glue console, enabling you to access it with a single click when examining the details of any given job run. There’s no infrastructure setup or teardown required. AWS Glue serverless Spark UI is a fully-managed serverless offering and generally starts up in a matter of seconds. Serverless Spark UI makes it significantly faster and easier to get jobs working in production because you have ready access to low level details for your job runs.

This post describes how the AWS Glue serverless Spark UI helps you to monitor and troubleshoot your AWS Glue job runs.

Getting started with serverless Spark UI

You can access the serverless Spark UI for a given AWS Glue job run by navigating from your Job’s page in AWS Glue console.

  1. On the AWS Glue console, choose ETL jobs.
  2. Choose your job.
  3. Choose the Runs tab.
  4. Select the job run you want to investigate, then choose Spark UI.

The Spark UI will display in the lower pane, as shown in the following screen capture:

Alternatively, you can get to the serverless Spark UI for a specific job run by navigating from Job run monitoring in AWS Glue.

  1. On the AWS Glue console, choose job run monitoring under ETL jobs.
  2. Select your job run, and choose View run details.

Scroll down to the bottom to view the Spark UI for the job run.

Prerequisites

Complete the following prerequisite steps:

  1. Enable Spark UI event logs for your job runs. It is enabled by default on Glue console and once enabled, Spark event log files will be created during the job run, and stored in your S3 bucket. The serverless Spark UI parses a Spark event log file generated in your S3 bucket to visualize detailed information for both running and completed job runs. A progress bar shows the percentage to completion, with a typical parsing time of less than a minute. Once logs are parsed, you can
  2. When logs are parsed, you can use the built-in Spark UI to debug, troubleshoot, and optimize your jobs.

For more information about Apache Spark UI, refer to Web UI in Apache Spark.

Monitor and Troubleshoot with Serverless Spark UI

A typical workload for AWS Glue for Apache Spark jobs is loading data from relational databases to S3-based data lakes. This section demonstrates how to monitor and troubleshoot an example job run for the above workload with serverless Spark UI. The sample job reads data from MySQL database and writes to S3 in Parquet format. The source table has approximately 70 million records.

The following screen capture shows a sample visual job authored in AWS Glue Studio visual editor. In this example, the source MySQL table has already been registered in the AWS Glue Data Catalog in advance. It can be registered through AWS Glue crawler or AWS Glue catalog API. For more information, refer to Data Catalog and crawlers in AWS Glue.

Now it’s time to run the job! The first job run finished in 30 minutes and 10 seconds as shown:

Let’s use Spark UI to optimize the performance of this job run. Open Spark UI tab in the Job runs page. When you drill down to Stages and view the Duration column, you will notice that Stage Id=0 spent 27.41 minutes to run the job, and the stage had only one Spark task in the Tasks:Succeeded/Total column. That means there was no parallelism to load data from the source MySQL database.

To optimize the data load, introduce parameters called hashfield and hashpartitions to the source table definition. For more information, refer to Reading from JDBC tables in parallel. Continuing to the Glue Catalog table, add two properties: hashfield=emp_no, and hashpartitions=18 in Table properties.

This means the new job runs reading parallelize data load from the source MySQL table.

Let’s try running the same job again! This time, the job run finished in 9 minutes and 9 seconds. It saved 21 minutes from the previous job run.

As a best practice, view the Spark UI and compare them before and after the optimization. Drilling down to Completed stages, you will notice that there was one stage and 18 tasks instead of one task.

In the first job run, AWS Glue automatically shuffled data across multiple executors before writing to destination because there were too few tasks. On the other hand, in the second job run, there was only one stage because there was no need to do extra shuffling, and there were 18 tasks for loading data in parallel from source MySQL database.

Considerations

Keep in mind the following considerations:

  • Serverless Spark UI is supported in AWS Glue 3.0 and later
  • Serverless Spark UI will be available for jobs that ran after November 20, 2023, due to a change in how AWS Glue emits and stores Spark logs
  • Serverless Spark UI can visualize Spark event logs which is up to 1 GB in size
  • There is no limit in retention because serverless Spark UI scans the Spark event log files on your S3 bucket
  • Serverless Spark UI is not available for Spark event logs stored in S3 bucket that can only be accessed by your VPC

Conclusion

This post described how the AWS Glue serverless Spark UI helps you monitor and troubleshoot your AWS Glue jobs. By providing instant access to the Spark UI directly within the AWS Management Console, you can now inspect the low-level details of job runs to identify and resolve issues. With the serverless Spark UI, there is no infrastructure to manage—the UI spins up automatically for each job run and tears down when no longer needed. This streamlined experience saves you time and effort compared to manually launching Spark UIs yourself.

Give the serverless Spark UI a try today. We think you’ll find it invaluable for optimizing performance and quickly troubleshooting errors. We look forward to hearing your feedback as we continue improving the AWS Glue console experience.


About the authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling on his road bike.

Alexandra Tello is a Senior Front End Engineer with the AWS Glue team in New York City. She is a passionate advocate for usability and accessibility. In her free time, she’s an espresso enthusiast and enjoys building mechanical keyboards.

Matt Sampson is a Software Development Manager on the AWS Glue team. He loves working with his other Glue team members to make services that our customers benefit from. Outside of work, he can be found fishing and maybe singing karaoke.

Matt Su is a Senior Product Manager on the AWS Glue team. He enjoys helping customers uncover insights and make better decisions using their data with AWS Analytic services. In his spare time, he enjoys skiing and gardening.

AWS Weekly Roundup – EC2 DL2q instances, PartyRock, Amplify’s 6th birthday, and more – November 20, 2023

Post Syndicated from Channy Yun original https://aws.amazon.com/blogs/aws/aws-weekly-roundup-ec2-dl2q-instances-partyrock-amplifys-6th-birthday-and-more-november-20-2023/

Last week I saw an astonishing 160+ new service launches. There were so many updates that we decided to publish a weekly roundup again. This continues the same innovative pace of the previous week as we are getting closer to AWS re:Invent 2023.

Our News Blog team is also finalizing new blog posts for re:Invent to introduce awesome launches with service teams for your reading pleasure. Jeff Barr shared The Road to AWS re:Invent 2023 to explain our blogging journey and process. Please stay tuned in the next week!

Last week’s launches
Here are some of the launches that caught my attention last week:

Amazon EC2 DL2q instances – New DL2q instances are powered by Qualcomm AI 100 Standard accelerators and are the first to feature Qualcomm’s AI technology in the public cloud. With eight Qualcomm AI 100 Standard accelerators and 128 GiB of total accelerator memory, you can run popular generative artificial intelligence (AI) applications and extend to edge devices across smartphones, autonomous driving, personal compute, and extended reality headsets to develop and validate these AI workloads before deploying.

PartyRock for Amazon Bedrock – We introduced PartyRock, a fun and intuitive hands-on, generative AI app-building playground powered by Amazon Bedrock. You can experiment, learn all about prompt engineering, build mini-apps, and share them with your friends—all without writing any code or creating an AWS account.

You also can now access the Meta Llama 2 Chat 13B foundation model and Cohere Command Light, Embed English, and multilingual models for Amazon Bedrock.

AWS Amplify celebrates its sixth birthday – We announced six new launches; a new documentation site, support for Next.js 14 with our hosting and JavaScript library, added custom token providers and an automatic React Native social sign-in update to Amplify Auth, new ChangePassword and DeleteUser account settings components, and updated all Amplify UI packages to use new Amplify JavaScript v6. You can also use wildcard subdomains when using a custom domain with your Amplify application deployed to AWS Amplify Hosting.

Amplify docs site UI

Also check out other News Blog posts about major launches published in the past week:

Other AWS service launches
Here are some other bundled feature launches per AWS service:

Amazon Athena  – You can use a new cost-based optimizer (CBO) to enhance query performance based on table and column statistics, collected by AWS Glue Data Catalog and Athena JDBC 3.x driver, a new alternative that supports almost all authentication plugins. You can also use Amazon EMR Studio to develop and run interactive queries on Amazon Athena.

Amazon CloudWatch – You can use a new CloudWatch metric called EBS Stalled I/O Check to monitor the health of your Amazon EBS volumes, the regular expression for Amazon CloudWatch Logs Live Tail filter pattern syntax to search and match relevant log events, observability of SAP Sybase ASE database in CloudWatch Application Insights, and up to two stats commands in a Log Insights query to perform aggregations on the results.

Amazon CodeCatalyst – You can connect to a Amazon Virtual Private Cloud (Amazon VPC) from CodeCatalyst Workflows, provision infrastructure using Terraform within CodeCatalyst Workflows, access CodeCatalyst with your workforce identities configured in IAM Identity Center, and create teams made up of members of the CodeCatalyst space.

Amazon Connect – You can use a pre-built queue performance dashboard and Contact Lens conversational analytics dashboard to view and compare real-time and historical aggregated queue performance. You can use quick responses for chats, previously written formats such as typing in ‘/#greet’ to insert a personalized response, and scanning attachments to detect malware or other unwanted content.

AWS Glue – AWS Glue for Apache Spark added new six database connectors: Teradata, SAP HANA, Azure SQL, Azure Cosmos DB, Vertica, and MongoDB, as well as the native connectivity to Amazon OpenSearch Service.

AWS Lambda – You can see single pane view of metrics, logs, and traces in the AWS Lambda console and advanced logging controls to natively capture logs in JSON structured format. You can view the SAM template on the Lambda console and export the function’s configuration to AWS Application Composer. AWS Lambda also supports Java 21 and NodeJS 20 versions built on the new Amazon Linux 2023 runtime.

AWS Local Zones in Dallas – You can enable the new Local Zone in Dallas, Texas, us-east-1-dfw-2a, with Amazon EC2 C6i, M6i, R6i, C6gn, and M6g instances and Amazon EBS volume types gp2, gp3, io1, sc1, and st1. You can also access Amazon ECS, Amazon EKS, Application Load Balancer, and AWS Direct Connect in this new Local Zone to support a broad set of workloads at the edge.

Amazon Managed Streaming for Apache Kafka (Amazon MSK) – You can standardize access control to Kafka resources using AWS Identity and Access Management (IAM) and build Kafka clients for Amazon MSK Serverless written in all programming languages. These are open source client helper libraries and code samples for popular languages, including Java, Python, Go, and JavaScript. Also, Amazon MSK now supports an enhanced version of Apache Kafka 3.6.0 that offers generally available Tiered Storage and automatically sends you storage capacity alerts when you are at risk of exhausting your storage.

Amazon OpenSearch Service Ingestion – You can migrate your data from Elasticsearch version 7.x clusters to the latest versions of Amazon OpenSearch Service and use persistent buffering to protect the durability of incoming data.

Amazon RDS –Amazon RDS for MySQL now supports creating active-active clusters using the Group Replication plugin, upgrading MySQL 5.7 snapshots to MySQL 8.0, and Innovation Release version of MySQL 8.1.

Amazon RDS Custom for SQL Server extends point-in-time recovery support for up to 1,000 databases, supports Service Master Key Retention to use transparent data encryption (TDE), table- and column-level encryption, DBMail and linked servers, and use SQL Server Developer edition with the bring your own media (BYOM).

Additionally, Amazon RDS Multi-AZ deployments with two readable standbys now supports minor version upgrades and system maintenance updates with typically less than one second of downtime when using Amazon RDS Proxy.

AWS Partner Central – You can use an improved user experience in AWS Partner Central to build and promote your offerings and the new Investments tab in the Partner Analytics Dashboard to gain actionable insights. You can now link accounts and associated users between Partner Central and AWS Marketplace and use an enhanced co-sell experience with APN Customer Engagements (ACE) manager.

Amazon QuickSight – You can programmatically manage user access and custom permissions support for roles to restrict QuickSight functionality to the QuickSight account for IAM Identity Center and Active Directory using APIs. You can also use shared restricted folders, a Contributor role and support for data source asset types in folders and the Custom Week Start feature, an addition designed to enhance the data analysis experience for customers across diverse industries and social contexts.

AWS Trusted Advisor – You can use new APIs to programmatically access Trusted Advisor best practices checks, recommendations, and prioritized recommendations and 37 new Amazon RDS checks that provide best practices guidance by analyzing DB instance configuration, usage, and performance data.

There’s a lot more launch news that I haven’t covered. See AWS What’s New for more details.

See you virtually in AWS re:Invent
AWS re:Invent 2023Next week we’ll hear the latest from AWS, learn from experts, and connect with the global cloud community in Las Vegas. If you come, check out the agenda, session catalog, and attendee guides before your departure.

If you’re not able to attend re:Invent in person this year, we’re offering the option to livestream our Keynotes and Innovation Talks. With the registration for online pass, you will have access to on-demand keynote, Innovation Talks, and selected breakout sessions after the event.

Channy

Visualize Amazon DynamoDB insights in Amazon QuickSight using the Amazon Athena DynamoDB connector and AWS Glue

Post Syndicated from Antonio Samaniego Jurado original https://aws.amazon.com/blogs/big-data/visualize-amazon-dynamodb-insights-in-amazon-quicksight-using-the-amazon-athena-dynamodb-connector-and-aws-glue/

Amazon DynamoDB is a fully managed, serverless, key-value NoSQL database designed to run high-performance applications at any scale. DynamoDB offers built-in security, continuous backups, automated multi-Region replication, in-memory caching, and data import and export tools. The scalability and flexible data schema of DynamoDB make it well-suited for a variety of use cases. These include internet-scale web and mobile applications, low-latency metadata stores, high-traffic retail websites, Internet of Things (IoT) and time series data, online gaming, and more.

Data stored in DynamoDB is the basis for valuable business intelligence (BI) insights. To make this data accessible to data analysts and other consumers, you can use Amazon Athena. Athena is a serverless, interactive service that allows you to query data from a variety of sources in heterogeneous formats, with no provisioning effort. Athena accesses data stored in DynamoDB via the open source Amazon Athena DynamoDB connector. Table metadata, such as column names and data types, is stored using the AWS Glue Data Catalog.

Finally, to visualize BI insights, you can use Amazon QuickSight, a cloud-powered business analytics service. QuickSight makes it straightforward for organizations to build visualizations, perform ad hoc analysis, and quickly get business insights from their data, anytime, on any device. Its generative BI capabilities enable you to ask questions about your data using natural language, without having to write SQL queries or learn a BI tool.

This post shows how you can use the Athena DynamoDB connector to easily query data in DynamoDB with SQL and visualize insights in QuickSight.

Solution overview

The following diagram illustrates the solution architecture.

Architecture Diagram

  1. The Athena DynamoDB connector runs in a pre-built, serverless AWS Lambda function. You don’t need to write any code.
  2. AWS Glue provides supplemental metadata from the DynamoDB table. In particular, an AWS Glue crawler is run to infer and store the DynamoDB table format, schema, and associated properties in the Glue Data Catalog.
  3. The Athena editor is used to test the connector and perform analysis via SQL queries.
  4. QuickSight uses the Athena connector to visualize BI insights from DynamoDB.

This walkthrough uses data from the ProductCatalog table, part of the DynamoDB developer guide sample data files.

Prerequisites

Before you get started, you should meet the following prerequisites:

Set up the Athena DynamoDB connector

The Athena DynamoDB connector comprises a pre-built, serverless Lambda function provided by AWS that communicates with DynamoDB so you can query your tables with SQL using Athena. The connector is available in the AWS Serverless Application Repository, and is used to create the Athena data source for later use in data analysis and visualization. To set up the connector, complete the following steps:

  1. On the Athena console, choose Data sources in the navigation pane.
  2. Choose Create data source.
  3. In the search bar, search for and choose Amazon DynamoDB.
  4. Choose Next.
  5. Under Data source details, enter a name. Note that this name should be unique and will be referenced in your SQL statements when you query your Athena data source.
  6. Under Connection details, choose Create Lambda function.

This will take you to the Lambda applications page on the Lambda console. Do not close the Athena data source creation tab; you will return to it in a later step.

  1. Scroll down to Application settings and enter a value for the following parameters (leave the other parameters as default):
    • SpillBucket – Specifies the Amazon Simple Storage Service (Amazon S3) bucket name for storing data that exceeds Lambda function response size limits. To create an S3 bucket, refer to Creating a bucket.
    • AthenaCatalogName – A lowercase name for the Lambda function to be created.Lambda Application Settings
  2. Select the acknowledgement check box and choose Deploy.

Wait for deployment to complete before moving to the next step.

  1. Return to the Athena data source creation tab.
  2. Under Connection details, choose the refresh icon and choose the Lambda function you created.Lambda Connection Details
  3. Choose Next.
  4. Review and choose Create data source.

Provide supplemental metadata via AWS Glue

The Athena connector already comes with a built-in inference capability to discover the schema and table properties of your data source. However, this capability is limited. To accurately discover the metadata of your DynamoDB table and centralize schema management as your data evolves over time, the connector integrates with AWS Glue.

To achieve this, an AWS Glue crawler is run to automatically determine the format, schema, and associated properties of the raw data stored in your DynamoDB table, writing the resulting metadata to a Glue database. Glue databases contain tables, which hold metadata from different data stores, independent from the actual location of the data. The Athena connector then references the Glue table and retrieves the corresponding DynamoDB metadata to enable queries.

Create the AWS Glue database

Complete the following steps to create the Glue database:

  1. On the AWS Glue console, under Data Catalog in the navigation pane, choose Databases.
  2. Choose Add database (you can also edit an existing database if you already have one).
  3. For Name, enter a database name.
  4. For Location, enter the string literal dynamo-db-flag. This keyword indicates that the database contains tables that the connector can use for supplemental metadata.
  5. Choose Create database.

Following security best practices, it is also recommended that you enable encryption at rest for your Data Catalog. For details, refer to Encrypting your Data Catalog.

Create the AWS Glue crawler

Complete the following steps to create and run the Glue crawler:

  1. On the AWS Glue console, under Data Catalog in the navigation pane, choose Crawlers.
  2. Choose Create crawler.
  3. Enter a crawler name and choose Next.
  4. For Data sources, choose Add a data source.
  5. On the Data source drop-down menu, choose DynamoDB. For Table name, enter the name of your DynamoDB table (string literal).
  6. Choose Add a DynamoDB data source.
  7. Choose Next.
  8. For IAM Role, choose Create new IAM role.
  9. Enter a role name and choose Create. This will automatically create an IAM role that trusts AWS Glue and has permissions to access the crawler targets.
  10. Choose Next.
  11. For Target database, choose the database previously created.
  12. Choose Next.
  13. Review and choose Create crawler.
  14. On the newly created crawler page, choose Run crawler.

Crawler runtimes depend on your DynamoDB table size and properties. You can find crawler run details under Crawler runs.

Validate the output metadata

When your crawler run status shows as Completed, follow the below steps to validate the output metadata:

  1. On the AWS Glue console, choose Tables in the navigation pane. Here, you can confirm a new table has been added to the database as a result of the crawler run.
  2. Navigate to the newly created table and take a look at the Schema tab. This tab shows the column names, data types, and other parameters inferred from your DynamoDB table.
  3. If needed, edit the schema by choosing Edit schema.Glue Table Details
  4. Choose Advanced properties.
  5. Under Table properties, verify the crawler automatically created and set the classification key to dynamodb. This indicates to the Athena connector that the table can be used for supplemental metadata.
  6. Optionally, add the following properties to correctly catalog and reference DynamoDB data in AWS Glue and Athena queries. This is due to capital letters not being permitted in AWS Glue table and column names, but being permitted in DynamoDB table and attribute names.
    1. If your DynamoDB table name contains any capital letters, choose Actions and Edit Table and add an extra table property as follows:
      • Key: sourceTable
      • Value: YourDynamoDBTableName
    2. If your DynamoDB table has attributes that contain any capital letters, add an extra table property as follows:
      • Key: columnMapping
      • Value: yourcolumn1=YourColumn1, yourcolumn2=YourColumn2, …

Test the connector with the Athena SQL editor

After the Athena DynamoDB connector is deployed and the AWS Glue table is populated with supplemental metadata, the DynamoDB table is ready for analysis. The example in this post uses the Athena editor to make SQL queries to the ProductCatalog table. For further options to interact with Athena, see Accessing Athena.

Complete the following steps to test the connector:

  1. Open the Athena query editor.
  2. If this is your first time visiting the Athena console in your current AWS Region, complete the following steps. This is a prerequisite before you can run Athena queries. See Getting Started for more details.
    1. Choose Query editor in the navigation pane to open the editor.
    2. Navigate to Settings and choose Manage to set up a query result location in Amazon S3.
  3. Under Data, select the data source and database you created (you may need to choose the refresh icon for them to sync up with Athena).
  4. Tables belonging to the selected database appear under Tables. You can choose a table name for Athena to show the table column list and data types.
  5. Test the connector by pulling data from your table via a SELECT statement. When you run Athena queries, you can reference Athena data sources, databases, and tables as <datasource_name>.<database>.<table_name>. Retrieved records are shown under Results.

For increased security, refer to Encrypting Athena query results stored in Amazon S3 to encrypt query results at rest.

Athena Query Results

For this post, we run a SELECT statement to validate the process. You can refer to the SQL reference for Athena to build more complex queries and analyses.

Visualize in QuickSight

QuickSight allows for building modern interactive dashboards, paginated reports, embedded analytics, and natural language queries through a unified BI solution. In this step, we use QuickSight to generate visual insights from the DynamoDB table by connecting to the Athena data source previously created.

Allow QuickSight to access to resources

Complete the following steps to grant QuickSight access to resources:

  1. On the QuickSight console, choose the profile icon and choose Manage QuickSight.
  2. In the navigation pane, choose Security & Permissions.
  3. Under QuickSight access to AWS services, choose Manage.
  4. QuickSight may ask you to switch to the Region in which users and groups in your account are managed. To change the current Region, navigate to the profile icon on the QuickSight console and choose the Region you want to switch to.
  5. For IAM Role, choose Use QuickSight-managed role (default).

Subsequent instructions assume that the default QuickSight-managed role is being used. If this is not the case, make sure to update the existing role to the same effect.

  1. Under Allow access and autodiscovery for these resources, select IAM and Amazon S3.
  2. For Amazon S3, choose Select S3 buckets.
  3. Choose the spill bucket you specified in earlier when deploying the Lambda function for the connector and the bucket you specified as the Athena query result location in Amazon S3.
  4. For both buckets, select Write permission for Athena Workgroup.
  5. Choose Amazon Athena.
  6. In the pop-up window, choose Next.
  7. Choose Lambda and choose the Amazon Resource Name (ARN) of the Lambda function previously used for the Athena data source connector.
  8. Choose Finish.
  9. Choose Save.

Create the Athena dataset

To create the Athena dataset, complete the following steps:

  1. On the QuickSight console, choose the user profile and switch to the Region you deployed the Athena data source to.
  2. Return to the QuickSight home page.
  3. In the navigation pane, choose Datasets.
  4. Choose New dataset.
  5. For Create a Dataset, select Athena.
  6. For Data source name, enter a name and choose Validate connection.
  7. When the connection shows as Validated, choose Create data source.
  8. Under Catalog, Database, and Tables, select the Athena data source, AWS Glue database, and AWS Glue table previously created.
  9. Choose Select.
  10. On the Finish dataset creation page, select Import to SPICE for quicker analytics.
  11. Choose Visualize.

For additional information on QuickSight query modes, see Importing data into SPICE and Using SQL to customize data.

Build QuickSight visualizations

Once the DynamoDB data is available in QuickSight via the Athena DynamoDB connector, it is ready to be visualized. The QuickSight analysis in the below example shows a vertical stacked bar chart with the average price per product category for the ProductCatalog sample dataset. In addition, it shows a donut chart with the proportion of products by product category, and a tree map containing the count of bicycles per bicycle type.

If you use data imported to SPICE in a QuickSight analysis, the dataset will only be available after the import is complete. For further details, see Using SPICE data in an analysis.

Quicksight Analysis

For comprehensive information on how to create and share visualizations in QuickSight, refer to Visualizing data in Amazon QuickSight and Sharing and subscribing to data in Amazon QuickSight.

Clean up

To avoid incurring continued AWS usage charges, make sure you delete all resources created as part of this walkthrough.

  • Delete the Athena data source:
    1. On the Athena console, switch to the Region you deployed your resources in.
    2. Choose Data sources in the navigation pane.
    3. Select the data source you created and on the Actions menu, choose Delete.
  • Delete the Lambda application:
    1. On the AWS CloudFormation console, switch to the Region you deployed your resources in.
    2. Choose Stacks in the navigation pane.
    3. Select serverlessrepo-AthenaDynamoDBConnector and choose Delete.
  • Delete the AWS Glue resources:
    1. On the AWS Glue console, switch to the Region you deployed your resources in.
    2. Choose Databases in the navigation pane.
    3. Select the database you created and choose Delete.
    4. Choose Crawlers in the navigation pane.
    5. Select the crawler you created and on the Action menu, choose Delete crawler.
  • Delete the QuickSight resources:
    1. On the QuickSight console, switch to the Region you deployed your resources in.
    2. Delete the analysis created for this walkthrough.
    3. Delete the Athena dataset created for this walkthrough.
    4. If you no longer need the Athena data source to create other datasets, delete the data source.

Summary

This post demonstrated how you can use the Athena DynamoDB connector to query data in DynamoDB with SQL and build visualizations in QuickSight.

Learn more about the Athena DynamoDB connector in the Amazon Athena User Guide. Discover more available data source connectors to query and visualize a variety of data sources without setting up or managing any infrastructure while only paying for the queries you run.

For advanced QuickSight capabilities powered by AI, see Gaining insights with machine learning (ML) in Amazon QuickSight and Answering business questions with Amazon QuickSight Q.


About the Authors

Antonio Samaniego Jurado is a Solutions Architect at Amazon Web Services. With a strong passion for modern technology, Antonio helps customers build state-of-the-art applications on AWS. A creator at heart, he loves community-driven learning and sharing of best practices across the AWS service portfolio to make the best of customers cloud journey.

Pascal Vogel is a Solutions Architect at Amazon Web Services. Pascal helps startups and enterprises build cloud-native solutions. As a cloud enthusiast, Pascal loves learning new technologies and connecting with like-minded customers who want to make a difference in their cloud journey.

Use generative AI with Amazon EMR, Amazon Bedrock, and English SDK for Apache Spark to unlock insights

Post Syndicated from Saurabh Bhutyani original https://aws.amazon.com/blogs/big-data/use-generative-ai-with-amazon-emr-amazon-bedrock-and-english-sdk-for-apache-spark-to-unlock-insights/

In this era of big data, organizations worldwide are constantly searching for innovative ways to extract value and insights from their vast datasets. Apache Spark offers the scalability and speed needed to process large amounts of data efficiently.

Amazon EMR is the industry-leading cloud big data solution for petabyte-scale data processing, interactive analytics, and machine learning (ML) using open source frameworks such as Apache Spark, Apache Hive, and Presto. Amazon EMR is the best place to run Apache Spark. You can quickly and effortlessly create managed Spark clusters from the AWS Management Console, AWS Command Line Interface (AWS CLI), or Amazon EMR API. You can also use additional Amazon EMR features, including fast Amazon Simple Storage Service (Amazon S3) connectivity using the Amazon EMR File System (EMRFS), integration with the Amazon EC2 Spot market and the AWS Glue Data Catalog, and EMR Managed Scaling to add or remove instances from your cluster. Amazon EMR Studio is an integrated development environment (IDE) that makes it straightforward for data scientists and data engineers to develop, visualize, and debug data engineering and data science applications written in R, Python, Scala, and PySpark. EMR Studio provides fully managed Jupyter notebooks, and tools like Spark UI and YARN Timeline Service to simplify debugging.

To unlock the potential hidden within the data troves, it’s essential to go beyond traditional analytics. Enter generative AI, a cutting-edge technology that combines ML with creativity to generate human-like text, art, and even code. Amazon Bedrock is the most straightforward way to build and scale generative AI applications with foundation models (FMs). Amazon Bedrock is a fully managed service that makes FMs from Amazon and leading AI companies available through an API, so you can quickly experiment with a variety of FMs in the playground, and use a single API for inference regardless of the models you choose, giving you the flexibility to use FMs from different providers and keep up to date with the latest model versions with minimal code changes.

In this post, we explore how you can supercharge your data analytics with generative AI using Amazon EMR, Amazon Bedrock, and the pyspark-ai library. The pyspark-ai library is an English SDK for Apache Spark. It takes instructions in English language and compiles them into PySpark objects like DataFrames. This makes it straightforward to work with Spark, allowing you to focus on extracting value from your data.

Solution overview

The following diagram illustrates the architecture for using generative AI with Amazon EMR and Amazon Bedrock.

Solution Overview

EMR Studio is a web-based IDE for fully managed Jupyter notebooks that run on EMR clusters. We interact with EMR Studio Workspaces connected to a running EMR cluster and run the notebook provided as part of this post. We use the New York City Taxi data to garner insights into various taxi rides taken by users. We ask the questions in natural language on top of the data loaded in Spark DataFrame. The pyspark-ai library then uses the Amazon Titan Text FM from Amazon Bedrock to create a SQL query based on the natural language question. The pyspark-ai library takes the SQL query, runs it using Spark SQL, and provides results back to the user.

In this solution, you can create and configure the required resources in your AWS account with an AWS CloudFormation template. The template creates the AWS Glue database and tables, S3 bucket, VPC, and other AWS Identity and Access Management (IAM) resources that are used in the solution.

The template is designed to demonstrate how to use EMR Studio with the pyspark-ai package and Amazon Bedrock, and is not intended for production use without modification. Additionally, the template uses the us-east-1 Region and may not work in other Regions without modification. The template creates resources that incur costs while they are in use. Follow the cleanup steps at the end of this post to delete the resources and avoid unnecessary charges.

Prerequisites

Before you launch the CloudFormation stack, ensure you have the following:

  • An AWS account that provides access to AWS services
  • An IAM user with an access key and secret key to configure the AWS CLI, and permissions to create an IAM role, IAM policies, and stacks in AWS CloudFormation
  • The Titan Text G1 – Express model is currently in preview, so you need to have preview access to use it as part of this post

Create resources with AWS CloudFormation

The CloudFormation creates the following AWS resources:

  • A VPC stack with private and public subnets to use with EMR Studio, route tables, and NAT gateway.
  • An EMR cluster with Python 3.9 installed. We are using a bootstrap action to install Python 3.9 and other relevant packages like pyspark-ai and Amazon Bedrock dependencies. (For more information, refer to the bootstrap script.)
  • An S3 bucket for the EMR Studio Workspace and notebook storage.
  • IAM roles and policies for EMR Studio setup, Amazon Bedrock access, and running notebooks

To get started, complete the following steps:

  1. Choose Launch Stack:
    Launch Button
  2. Select I acknowledge that this template may create IAM resources.

The CloudFormation stack takes approximately 20–30 minutes to complete. You can monitor its progress on the AWS CloudFormation console. When its status reads CREATE_COMPLETE, your AWS account will have the resources necessary to implement this solution.

Create EMR Studio

Now you can create an EMR Studio and Workspace to work with the notebook code. Complete the following steps:

  1. On the EMR Studio console, choose Create Studio.
  2. Enter the Studio Name as GenAI-EMR-Studio and provide a description.
  3. In the Networking and security section, specify the following:
    • For VPC, choose the VPC you created as part of the CloudFormation stack that you deployed. Get the VPC ID using the CloudFormation outputs for the VPCID key.
    • For Subnets, choose all four subnets.
    • For Security and access, select Custom security group.
    • For Cluster/endpoint security group, choose EMRSparkAI-Cluster-Endpoint-SG.
    • For Workspace security group, choose EMRSparkAI-Workspace-SG.VPC Networking and Security
  4. In the Studio service role section, specify the following:
    • For Authentication, select AWS Identity and Access Management (IAM).
    • For AWS IAM service role, choose EMRSparkAI-StudioServiceRole.
  5. In the Workspace storage section, browse and choose the S3 bucket for storage starting with emr-sparkai-<account-id>.
  6. Choose Create Studio.Create Studio
  7. When the EMR Studio is created, choose the link under Studio Access URL to access the Studio.
  8. When you’re in the Studio, choose Create workspace.
  9. Add emr-genai as the name for the Workspace and choose Create workspace.
  10. When the Workspace is created, choose its name to launch the Workspace (make sure you’ve disabled any pop-up blockers).

Big data analytics using Apache Spark with Amazon EMR and generative AI

Now that we have completed the required setup, we can start performing big data analytics using Apache Spark with Amazon EMR and generative AI.

As a first step, we load a notebook that has the required code and examples to work with the use case. We use NY Taxi dataset, which contains details about taxi rides.

  1. Download the notebook file NYTaxi.ipynb and upload it to your Workspace by choosing the upload icon.
  2. After the notebook is imported, open the notebook and choose PySpark as the kernel.

PySpark AI by default uses OpenAI’s ChatGPT4.0 as the LLM model, but you can also plug in models from Amazon Bedrock, Amazon SageMaker JumpStart, and other third-party models. For this post, we show how to integrate the Amazon Bedrock Titan model for SQL query generation and run it with Apache Spark in Amazon EMR.

  1. To get started with the notebook, you need to associate the Workspace to a compute layer. To do so, choose the Compute icon in the navigation pane and choose the EMR cluster created by the CloudFormation stack.
  2. Configure the Python parameters to use the updated Python 3.9 package with Amazon EMR:
    %%configure -f
    {
    "conf": {
    "spark.executorEnv.PYSPARK_PYTHON": "/usr/local/python3.9.18/bin/python3.9",
    "spark.yarn.appMasterEnv.PYSPARK_PYTHON": "/usr/local/python3.9.18/bin/python3.9"
    }
    }

  3. Import the necessary libraries:
    from pyspark_ai import SparkAI
    from pyspark.sql import SparkSession
    from langchain.chat_models import ChatOpenAI
    from langchain.llms.bedrock import Bedrock
    import boto3
    import os

  4. After the libraries are imported, you can define the LLM model from Amazon Bedrock. In this case, we use amazon.titan-text-express-v1. You need to enter the Region and Amazon Bedrock endpoint URL based on your preview access for the Titan Text G1 – Express model.
    boto3_bedrock = boto3.client('bedrock-runtime', '<region>', endpoint_url='<bedrock endpoint url>')
    llm = Bedrock(
    model_id="amazon.titan-text-express-v1",
    client=boto3_bedrock)

  5. Connect Spark AI to the Amazon Bedrock LLM model for SQL query generation based on questions in natural language:
    #Connecting Spark AI to the Bedrock Titan LLM
    spark_ai = SparkAI(llm = llm, verbose=False)
    spark_ai.activate()

Here, we have initialized Spark AI with verbose=False; you can also set verbose=True to see more details.

Now you can read the NYC Taxi data in a Spark DataFrame and use the power of generative AI in Spark.

  1. For example, you can ask the count of the number of records in the dataset:
    taxi_records.ai.transform("count the number of records in this dataset").show()

We get the following response:

> Entering new AgentExecutor chain...
Thought: I need to count the number of records in the table.
Action: query_validation
Action Input: SELECT count(*) FROM spark_ai_temp_view_ee3325
Observation: OK
Thought: I now know the final answer.
Final Answer: SELECT count(*) FROM spark_ai_temp_view_ee3325
> Finished chain.
+----------+
| count(1)|
+----------+
|2870781820|
+----------+

Spark AI internally uses LangChain and SQL chain, which hide the complexity from end-users working with queries in Spark.

The notebook has a few more example scenarios to explore the power of generative AI with Apache Spark and Amazon EMR.

Clean up

Empty the contents of the S3 bucket emr-sparkai-<account-id>, delete the EMR Studio Workspace created as part of this post, and then delete the CloudFormation stack that you deployed.

Conclusion

This post showed how you can supercharge your big data analytics with the help of Apache Spark with Amazon EMR and Amazon Bedrock. The PySpark AI package allows you to derive meaningful insights from your data. It helps reduce development and analysis time, reducing time to write manual queries and allowing you to focus on your business use case.


About the Authors

Saurabh Bhutyani is a Principal Analytics Specialist Solutions Architect at AWS. He is passionate about new technologies. He joined AWS in 2019 and works with customers to provide architectural guidance for running generative AI use cases, scalable analytics solutions and data mesh architectures using AWS services like Amazon Bedrock, Amazon SageMaker, Amazon EMR, Amazon Athena, AWS Glue, AWS Lake Formation, and Amazon DataZone.

Harsh Vardhan is an AWS Senior Solutions Architect, specializing in analytics. He has over 8 years of experience working in the field of big data and data science. He is passionate about helping customers adopt best practices and discover insights from their data.

Clean up your Excel and CSV files without writing code using AWS Glue DataBrew

Post Syndicated from Ismail Makhlouf original https://aws.amazon.com/blogs/big-data/clean-up-your-excel-and-csv-files-without-writing-code-using-aws-glue-databrew/

Managing data within an organization is complex. Handling data from outside the organization adds even more complexity. As the organization receives data from multiple external vendors, it often arrives in different formats, typically Excel or CSV files, with each vendor using their own unique data layout and structure. In this blog post, we’ll explore a solution that streamlines this process by leveraging the capabilities of AWS Glue DataBrew.

DataBrew is an excellent tool for data quality and preprocessing. You can use its built-in transformations, recipes, as well as integrations with the AWS Glue Data Catalog and Amazon Simple Storage Service (Amazon S3) to preprocess the data in your landing zone, clean it up, and send it downstream for analytical processing.

In this post, we demonstrate the following:

  • Extracting non-transactional metadata from the top rows of a file and merging it with transactional data
  • Combining multi-line rows into single-line rows
  • Extracting unique identifiers from within strings or text

Solution overview

For this use case, imagine you’re a data analyst working at your organization. The sales leadership have requested a consolidated view of the net sales they are making from each of the organization’s suppliers. Unfortunately, this information is not available in a database. The sales data comes from each supplier in layouts like the following example.

However, with hundreds of resellers, manually extracting the information at the top is not feasible. Your goal is to clean up and flatten the data into the following output layout.

image2

To achieve this, you can use pre-built transformations in DataBrew to quickly get the data in the layout you want.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Connect to the dataset

The first thing we need to do is upload the input dataset to Amazon S3. Create an S3 bucket for the project and create a folder to upload the raw input data. The output data will be stored in another folder in a later step.

Next, we need to connect DataBrew to our CSV file. We create what we call a dataset, which
is an artifact that points to whatever data source we will be using. Navigate to “Datasets” on
the left hand menu.

Ensure the Column header values field is set to Add default header. The input CSV has an irregular format, so the first row will not have the needed column values.

Create a project

To create a new project, complete the following steps:

  1. On the DataBrew console, choose Projects in the navigation pane.
  2. Choose Create project.
  3. For Project name, enter FoodMartSales-AllUpProject.
  4. For Attached recipe, choose Create new recipe.
  5. For Recipe name, enter FoodMartSales-AllUpProject-recipe.
  6. For Select a dataset, select My datasets.
  7. Select the FoodMartSales-AllUp dataset.
  8. Under Permissions, for Role name, choose the IAM role you created as a prerequisite or create a new role.
  9. Choose Create project.

After the project is opened, an interactive session is created where you can author transformations on a sample of the data.

Extract non-transactional metadata from within the contents of the file and merge it with transactional data

In this section, we consider data that has metadata on the first few rows of the file, followed by transactional data. We walk through how to extract data relevant to the whole file from the top of the document and combine it with the transactional data into one flat table.

Extract metadata from the header and remove invalid rows

Complete the following steps to extract metadata from the header:

  1. Choose Conditions and then choose IF.
  2. For Matching conditions, choose Match all conditions.
  3. For Source, choose Value of and Column_1.
  4. For Logical condition, choose Is exactly.
  5. For Enter a value, choose Enter custom value and enter RESELLER NAME.
  6. For Flag result value as, choose Custom value.
  7. For Value if true, choose Select source column and set Value of to Column_2.
  8. For Value if false, choose Enter custom value and enter INVALID.
  9. Choose Apply.

Your dataset should now look like the following screenshot, with the Reseller Name value extracted to a column by itself.

Next, you remove invalid rows and fill the rows with the Reseller Name value.

  1. Choose Clean and then choose Custom values.
  2. For Source column, choose ResellerName.
  3. For Specify values to remove, choose Custom value.
  4. For Values to remove, choose Invalid.
  5. For Apply transform to, choose All rows.
  6. Choose Apply.
  7. Choose Missing and then choose Fill with most frequent value.
  8. For Source column, choose FirstTransactionDate.
  9. For Missing value action, choose Fill with most frequent value.
  10. For Apply transform to, choose All rows.
  11. Choose Apply.

Your dataset should now look like the following screenshot, with the Reseller Name value extracted to a column by itself.

Repeat the same steps in this section for the rest of the metadata, including Reseller Email Address, Reseller ID, and First Transaction Date.

Promote column headers and clean up data

To promote column headers, complete the following steps:

  1. Reorder the columns to put the metadata columns to the left of the dataset by choosing Column, Move column, and Start of the table.
  2. Rename the columns with the appropriate names.

Now you can clean up some columns and rows.

  1. Delete unnecessary columns, such as Column_7.

You can also delete invalid rows by filtering out records that don’t have a transaction date value.

  1. Choose the ABC icon on the menu of the Transaction_Date column and choose date.

  2. For Handle invalid values, select Delete rows, then choose Apply.

The dataset should now have the metadata extracted and the column headers promoted.

Combine multi-line rows into single-line rows

The next issue to address is transactions pertaining to the same row that are split across multiple lines. In the following steps, we extract the needed data from the rows and merge it into single-line transactions. For this example specifically, the Reseller Margin data is split across two lines.


Complete the following steps to get the Reseller Margin value on the same line as the corresponding transaction. First, we identify the Reseller Margin rows and store them in a temporary column.

  1. Choose Conditions and then choose IF.
  2. For Matching conditions, choose Match all conditions.
  3. For Source, choose Value of and Transaction_ID.
  4. For Logical condition, choose Contains.
  5. For Enter a value, choose Enter custom value and enter Reseller Margin.
  6. For Flag result value as, choose Custom value.
  7. For Value if true, choose Select source column set Value of to TransactionAmount.
  8. For Value if false, choose Enter custom value and enter Invalid.
  9. For Destination column, choose ResellerMargin_Temp.
  10. Choose Apply.

Next, you shift the Reseller Margin value up one row.

  1. Choose Functions and then choose NEXT.
  2. For Source column, choose ResellerMargin_Temp.
  3. For Number of rows, enter 1.
  4. For Destination column, choose ResellerMargin.
  5. For Apply transform to, choose All rows.
  6. Choose Apply.

Next, delete the invalid rows.

  1. Choose Missing and then choose Remove missing rows.
  2. For Source column, choose TransactionDate.
  3. For Missing value action, choose Delete rows with missing values.
  4. For Apply transform to, choose All rows.
  5. Choose Apply.

Your dataset should now look like the following screenshot, with the Reseller Margin value extracted to a column by itself.

With the data structured properly, we can move on to mining the cleaned data.

Extract unique identifiers from within strings and text

Many types of data contain important information stored as unstructured text in a cell. In this section, we look at how to extract this data. Within the sample dataset, the BankTransferText column has valuable information around our resellers’ registered bank account numbers as well as the currency of the transaction, namely IBAN, SWIFT Code, and Currency.

Complete the following steps to extract IBAN, SWIFT code, and Currency into separate columns. First, you extract the IBAN number from the text using a regular expression (regex).

  1. Choose Extract and then choose Custom value or pattern.
  2. For Create column options, choose Extract values.
  3. For Source column, choose BankTransferText.
  4. For Extract options, choose Custom value or pattern.
  5. For Values to extract, enter [a-zA-Z][a-zA-Z][0-9]{2}[A-Z0-9]{1,30}.
  6. For Destination column, choose IBAN.
  7. For Apply transform to, choose All rows.
  8. Choose Apply.
  9. Extract the SWIFT code from the text using a regex following the same steps used to extract the IBAN number, but using the following regex instead: (?!^)(SWIFT Code: )([A-Z]{2}[A-Z0-9]+).

Next, remove the SWIFT Code: label from the extracted text.

  1. Choose Remove and then choose Custom values.
  2. For Source column, choose SWIFT Code.
  3. For Specify values to remove, choose Custom value.
  4. For Apply transform to, choose All rows.
  5. Extract the currency from the text using a regex following the same steps used to extract the IBAN number, but using the following regex instead: (?!^)(Currency: )([A-Z]{3}).
  6. Remove the Currency: label from the extracted text following the same steps used to remove the SWIFT Code: label.

You can clean up by deleting any unnecessary columns.

  1. Choose Column and then choose Delete.
  2. For Source columns, choose BankTransferText.
  3. Choose Apply.
  4. Repeat for any remaining columns.

Your dataset should now look like the following screenshot, with IBAN, SWIFT Code, and Currency extracted to separate columns.

Write the transformed data to Amazon S3

With all the steps captured in the recipe, the last step is to write the transformed data to Amazon S3.

  1. On the DataBrew console, choose Run job.
  1. For Job name, enter FoodMartSalesToDataLake.
  2. For Output to, choose Amazon S3.
  3. For File type, choose CSV.
  4. For Delimiter, choose Comma (,).
  5. For Compression, choose None.
  6. For S3 bucket owners’ account, select Current AWS account.
  7. For S3 location, enter s3://{name of S3 bucket}/clean/.
  8. For Role name, choose the IAM role created as a prerequisite or create a new role.
  9. Choose Create and run job.
  10. Go to the Jobs tab and wait for the job to complete.
  11. Navigate to the job output folder on the Amazon S3 console.
  12. Download the CSV file and view the transformed output.

Your dataset should look similar to the following screenshot.

Clean up

To optimize cost, make sure to clean up the resources deployed for this project by completing the following steps:

  1. Delete every DataBrew project along with their linked recipes.
  2. Delete all the DataBrew datasets.
  3. Delete the contents in your S3 bucket.
  4. Delete the S3 bucket.

Conclusion

The reality of exchanging data with suppliers is that we can’t always control the shape of the input data. With DataBrew, we can use a list of pre-built transformations and repeatable steps to transform incoming data into a desired layout and extract relevant data and insights from Excel or CSV files. Start using DataBrew today and transform 3 rd party files into structured datasets ready for consumption by your business.


About the Author

Ismail Makhlouf is a Senior Specialist Solutions Architect for Data Analytics at AWS. Ismail focuses on architecting solutions for organizations across their end-to-end data analytics estate, including batch and real-time streaming, big data, data warehousing, and data lake workloads. He primarily works with direct-to-consumer platform companies in the ecommerce, FinTech, PropTech, and HealthTech space to achieve their business objectives with well-architected data platforms.

Converting Apache Kafka events from Avro to JSON using EventBridge Pipes

Post Syndicated from Pascal Vogel original https://aws.amazon.com/blogs/compute/converting-apache-kafka-events-from-avro-to-json-using-eventbridge-pipes/

This post is written by Pascal Vogel, Solutions Architect, and Philipp Klose, Global Solutions Architect.

Event streaming with Apache Kafka has become an important element of modern data-oriented and event-driven architectures (EDAs), unlocking use cases such as real-time analytics of user behavior, anomaly and fraud detection, and Internet of Things event processing. Stream producers and consumers in Kafka often use schema registries to ensure that all components follow agreed-upon event structures when sending (serializing) and processing (deserializing) events to avoid application bugs and crashes.

A common schema format in Kafka is Apache Avro, which supports rich data structures in a compact binary format. To integrate Kafka with other AWS and third-party services more easily, AWS offers Amazon EventBridge Pipes, a serverless point-to-point integration service. However, many downstream services expect JSON-encoded events, requiring custom, and repetitive schema validation and conversion logic from Avro to JSON in each downstream service.

This blog post shows how to reliably consume, validate, convert, and send Avro events from Kafka to AWS and third-party services using EventBridge Pipes, allowing you to reduce custom deserialization logic in downstream services. You can also use EventBridge event buses as targets in Pipes to filter and distribute events from Pipes to multiple targets, including cross-account and cross-Region delivery.

This blog describes two scenarios:

  1. Using Amazon Managed Streaming for Apache Kafka (Amazon MSK) and AWS Glue Schema Registry.
  2. Using Confluent Cloud and the Confluent Schema Registry.

See the associated GitHub repositories for Glue Schema Registry or Confluent Schema Registry for full source code and detailed deployment instructions.

Kafka event streaming and schema validation on AWS

To build event streaming applications with Kafka on AWS, you can use Amazon MSK, offerings such as Confluent Cloud, or self-hosted Kafka on Amazon Elastic Compute Cloud (Amazon EC2) instances.

To avoid common issues in event streaming and event-driven architectures, such as data inconsistencies and incompatibilities, it is a recommended practice to define and share event schemas between event producers and consumers. In Kafka, schema registries are used to manage, evolve, and enforce schemas for event producers and consumers. The AWS Glue Schema Registry provides a central location to discover, manage, and evolve schemas. In the case of Confluent Cloud, the Confluent Schema Registry serves the same role. Both the Glue Schema Registry and the Confluent Schema Registry support common schema formats such as Avro, Protobuf, and JSON.

To integrate Kafka with AWS services, third-party services, and your own applications, you can use EventBridge Pipes. EventBridge Pipes helps you create point-to-point integrations between event sources and targets with optional filtering, transformation, and enrichment. EventBridge Pipes reduces the amount of integration code that you have to write and maintain when building EDAs.

Many AWS and third-party services expect JSON-encoded payloads (events) as input, meaning they cannot directly consume Avro or Protobuf payloads. To replace repetitive Avro-to-JSON validation and conversion logic in each consumer, you can use the EventBridge Pipes enrichment step. This solution uses an AWS Lambda function in the enrichment step to deserialize and validate Kafka events with a schema registry, including error handling with dead-letter queues, and convert events to JSON before passing them to downstream services.

Solution overview

Architecture overview of the solution

The solution presented in this blog post consists of the following key elements:

  1. The source of the pipe is a Kafka cluster deployed using MSK or Confluent Cloud. EventBridge Pipes reads events from the Kafka stream in batches and sends them to the enrichment function (see here for an example event).
  2. The enrichment step (Lambda function) deserializes and validates the events against the configured schema registry (Glue or Confluent), converts events from Avro to JSON with integrated error handling, and returns them to the pipe.
  3. The target of this example solution is an EventBridge custom event bus that is invoked by EventBridge Pipes with JSON-encoded events returned by the enrichment Lambda function. EventBridge Pipes supports a variety of other targets, including Lambda, AWS Step Functions, Amazon API Gateway, API destinations, and more, enabling you to build EDAs without writing integration code.
  4. In this sample solution, the event bus sends all events to Amazon CloudWatch Logs via an EventBridge rule. You can extend the example to invoke additional EventBridge targets.

Optionally, you can add OpenAPI 3 or JSONSchema Draft 4 schemas for your events in the EventBridge schema registry by either manually generating it from the Avro schema or using EventBridge schema discovery. This allows you to download code bindings for the JSON-converted events for various programming languages, such as JavaScript, Python, and Java, to correctly use them in your EventBridge targets.

The remainder of this blog post describes this solution for the Glue and Confluent schema registries with code examples.

EventBridge Pipes with the Glue Schema Registry

This section describes how to implement event schema validation and conversion from Avro to JSON using EventBridge Pipes and the Glue Schema Registry. You can find the source code and detailed deployment instructions on GitHub.

Prerequisites

You need an Amazon MSK serverless cluster running and the Glue Schema registry configured. This example includes a Avro schema and a Glue Schema Registry. See the following AWS blog post for an introduction to schema validation with the Glue Schema Registry: Validate, evolve, and control schemas in Amazon MSK and Amazon Kinesis Data Streams with AWS Glue Schema Registry.

EventBridge Pipes configuration

Use the AWS Cloud Development Kit (AWS CDK) template provided in the GitHub repository to deploy:

  1. An EventBridge pipe that connects to your existing Amazon MSK Serverless Kafka topic as the source via AWS Identity and Access Management (IAM) authentication.
  2. EventBridge Pipes reads events from your Kafka topic using the Amazon MSK source type.
  3. An enrichment Lambda function in Java to perform event deserialization, validation, and conversion from Avro to JSON.
  4. An Amazon Simple Queue Service (Amazon SQS) dead letter queue to hold events for which deserialization failed.
  5. An EventBridge custom event bus as the pipe target. An EventBridge rule sends all incoming events into a CloudWatch Logs log group.

For MSK-based sources, EventBridge supports configuration parameters, such as batch window, batch size, and starting position, which you can set using the parameters of the CfnPipe class in the example CDK stack.

The example EventBridge pipe consumes events from Kafka in batches of 10 because it is targeting an EventBridge event bus, which has a max batch size of 10. See batching and concurrency in the EventBridge Pipes User Guide to choose an optimal configuration for other targets.

EventBridge Pipes with the Confluent Schema Registry

This section describes how to implement event schema validation and conversion from Avro to JSON using EventBridge Pipes and the Confluent Schema Registry. You can find the source code and detailed deployment instructions on GitHub.

Prerequisites

To set up this solution, you need a Kafka stream running on Confluent Cloud as well as the Confluent Schema Registry set up. See the corresponding Schema Registry tutorial for Confluent Cloud to set up a schema registry for your Confluent Kafka stream.

To connect to your Confluent Cloud Kafka cluster, you need an API key for Confluent Cloud and Confluent Schema Registry. AWS Secrets Manager is used to securely store your Confluent secrets.

EventBridge Pipes configuration

Use the AWS CDK template provided in the GitHub repository to deploy:

  1. An EventBridge pipe that connects to your existing Confluent Kafka topic as the source via an API secret stored in Secrets Manager.
  2. EventBridge Pipes reads events from your Confluent Kafka topic using the self-managed Apache Kafka stream source type, which includes all non-MSK Kafka clusters.
  3. An enrichment Lambda function in Python to perform event deserialization, validation, and conversion from Avro to JSON.
  4. An SQS dead letter queue to hold events for which deserialization failed.
  5. An EventBridge custom event bus as the pipe target. An EventBridge rule writes all incoming events into a CloudWatch Logs log group.

For self-managed Kafka sources, EventBridge supports configuration parameters, such as batch window, batch size, and starting position, which you can set using the parameters of the CfnPipe class in the example CDK stack.

The example EventBridge pipe consumes events from Kafka in batches of 10 because it is targeting an EventBridge event bus, which has a max batch size of 10. See batching and concurrency in the EventBridge Pipes User Guide to choose an optimal configuration for other targets.

Enrichment Lambda functions

Both of the solutions described previously include an enrichment Lambda function for schema validation and conversion from Avro to JSON.

The Java Lambda function integrates with the Glue Schema Registry using the AWS Glue Schema Registry Library. The Python Lambda function integrates with the Confluent Schema Registry using the confluent-kafka library and uses Powertools for AWS Lambda (Python) to implement Serverless best practices such as logging and tracing.

The enrichment Lambda functions perform the following tasks:

  1. In the events polled from the Kafka stream by the EventBridge pipe, the key and value of the event are base64 encoded. Therefore, for each event in the batch passed to the function, the key and the value are decoded.
  2. The event key is assumed to be serialized by the producer as a string type.
  3. The event value is deserialized using the Glue Schema registry Serde (Java) or the confluent-kafka AvroDeserializer (Python).
  4. The function then returns the successfully converted JSON events to the EventBridge pipe, which then invokes the target for each of them.
  5. Events for which Avro deserialization failed are sent to the SQS dead letter queue.

Conclusion

This blog post shows how to implement event consumption, Avro schema validation, and conversion to JSON using Amazon EventBridge Pipes, Glue Schema Registry, and Confluent Schema Registry.

The source code for the presented example is available in the AWS Samples GitHub repository for Glue Schema Registry and Confluent Schema Registry. For more patterns, visit the Serverless Patterns Collection.

For more serverless learning resources, visit Serverless Land.

AWS Glue Data Catalog now supports automatic compaction of Apache Iceberg tables

Post Syndicated from Sébastien Stormacq original https://aws.amazon.com/blogs/aws/aws-glue-data-catalog-now-supports-automatic-compaction-of-apache-iceberg-tables/

Today, we’re making available a new capability of AWS Glue Data Catalog to allow automatic compaction of transactional tables in the Apache Iceberg format. This allows you to keep your transactional data lake tables always performant.

Data lakes were initially designed primarily for storing vast amounts of raw, unstructured, or semi structured data at a low cost, and they were commonly associated with big data and analytics use cases. Over time, the number of possible use cases for data lakes has evolved as organizations have recognized the potential to use data lakes for more than just reporting, requiring the inclusion of transactional capabilities to ensure data consistency.

Data lakes also play a pivotal role in data quality, governance, and compliance, particularly as data lakes store increasing volumes of critical business data, which often requires updates or deletion. Data-driven organizations also need to keep their back end analytics systems in near real-time sync with customer applications. This scenario requires transactional capabilities on your data lake to support concurrent writes and reads without data integrity compromise. Finally, data lakes now serve as integration points, necessitating transactions for safe and reliable data movement between various sources.

To support transactional semantics on data lake tables, organizations adopted an open table format (OTF), such as Apache Iceberg. Adopting OTF formats comes with its own set of challenges: transforming existing data lake tables from Parquet or Avro formats to an OTF format, managing a large number of small files as each transaction generates a new file on Amazon Simple Storage Service (Amazon S3), or managing object and meta-data versioning at scale, just to name a few. Organizations are typically building and managing their own data pipelines to address these challenges, leading to additional undifferentiated work on infrastructure. You need to write code, deploy Spark clusters to run your code, scale the cluster, manage errors, and so on.

When talking with our customers, we learned that the most challenging aspect is the compaction of individual small files produced by each transactional write on tables into a few large files. Large files are faster to read and scan, making your analytics jobs and queries faster to execute. Compaction optimizes the table storage with larger-sized files. It changes the storage for the table from a large number of small files to a small number of larger files. It reduces metadata overhead, lowers network round trips to S3, and improves performance. When you use engines that charge for the compute, the performance improvement is also beneficial to the cost of usage as the queries require less compute capacity to run.

But building custom pipelines to compact and optimize Iceberg tables is time-consuming and expensive. You have to manage the planning, provision infrastructure, and schedule and monitor the compaction jobs. This is why we launch automatic compaction today.

Let’s see how it works
To show you how to enable and monitor automatic compaction on Iceberg tables, I start from the AWS Lake Formation page or the AWS Glue page of the AWS Management Console. I have an existing database with tables in the Iceberg format. I execute transactions on this table over the course of a couple of days, and the table starts to fragment into small files on the underlying S3 bucket.

List of Iceberg table on Lake Formation console

I select the table on which I want to enable compaction, and then I select Enable compaction.

View details of a table in lake formation

An IAM role is required to pass permissions to the Lake Formation service to access my AWS Glue tables, S3 buckets, and CloudWatch log streams. Either I choose to create a new IAM role, or I select an existing one. Your existing role must have lakeformation:GetDataAccess and glue:UpdateTable permissions on the table. The role also needs logs:CreateLogGroup, logs:CreateLogStream, logs:PutLogEvents, to “arn:aws:logs:*:your_account_id:log-group:/aws-lakeformation-acceleration/compaction/logs:*“. The role trusted permission service name must be set to glue.amazonaws.com.

Then, I select Turn on compaction. Et voilà! Compaction is automatic; there is nothing to manage on your side.

The service starts to measure the table’s rate of change. As Iceberg tables can have multiple partitions, the service calculates this change rate for each partition and schedules managed jobs to compact the partitions where this rate of change breaches a threshold value.

When the table accumulates a high number of changes, you will be able to view the Compaction history under the Optimization tab in the console.

Lake formation compaction history in the console

You can also monitor the whole process either by observing the number of files on your S3 bucket (use the NumberOfObjects metric) or one of the two new Lake Formation metrics: numberOfBytesCompacted or numberOfFilesCompacted.

Iceberg table compaction metrics in the cloudwatch console

In addition to the AWS console, there are six new APIs that expose this new capability:CreateTableOptimizer, BatchGetTableOptimizer , UpdateTableOptimizer, DeleteTableOptimizer, GetTableOptimizer, and ListTableOptimizerRuns. These APIs are available in the AWS SDKs and AWS Command Line Interface (AWS CLI). As usual, don’t forget to update the SDK or the CLI to their latest versions to get access to these new APIs.

Things to know
As we launched this new capability today, there are a couple of additional points I’d like to share with you:

Availability
This new capability is available starting today in all AWS Regions where AWS Glue Data Catalog is available.

The pricing metric is the data processing unit (DPU), a relative measure of processing power that consists of 4 vCPUs of compute capacity and 16 GB of memory. There is a charge per DPU/hours metered by second, with a minimum of one minute.

Now it’s time to decommission your existing compaction data pipeline and switch to this new, entirely managed capability today.

— seb

Deploy Amazon QuickSight dashboards to monitor AWS Glue ETL job metrics and set alarms

Post Syndicated from Michael Hamilton original https://aws.amazon.com/blogs/big-data/deploy-amazon-quicksight-dashboards-to-monitor-aws-glue-etl-job-metrics-and-set-alarms/

No matter the industry or level of maturity within AWS, our customers require better visibility into their AWS Glue usage. Better visibility can lend itself to gains in operational efficiency, informed business decisions, and further transparency into your return on investment (ROI) when using the various features available through AWS Glue.

As your company grows, you should be able to answer simple questions about your AWS Glue usage, such as the following:

  • Where am I spending the most with AWS Glue?
  • Where can I save the most by taking advantage of new AWS Glue features?
  • What does my overall usage look like using AWS Glue?

AWS offers services such as Amazon QuickSight, a serverless business intelligence (BI) service that lets you centralize this view and even ask natural language questions of your data, using Amazon QuickSight Q. QuickSight can give business leaders and their technology counterparts a common landscape for reporting important details of their usage, providing automated narratives to bridge communication gaps.

In this post, we explore how to combine AWS Glue usage information and metrics with centralized reporting and visualization using QuickSight. This can provide you with a more comprehensive view of your usage and tools to help you dive deep into your AWS Glue job run environment. You have metrics available per job run within the AWS Glue console, but they don’t cover all available AWS Glue job metrics, and the visuals aren’t as interactive compared to the QuickSight dashboard.

Although we don’t cover optimizing your jobs for costs in this post, you can refer to Monitor and optimize cost on AWS Glue for Apache Spark to learn how to fine-tune your AWS Glue jobs for performance, efficiency ,and cost-optimization.

Let’s dive in!

Solution overview

The following diagram illustrates the architecture for the given solution. At a high level, a scheduled event triggers an orchestration flow consisting of multiple data, compute, and analytics resources—the output of which culminates as a set of visuals in a BI dashboard.

solution architecture

Now let’s dig into the technical details involved in this solution.

An AWS Step Functions workflow is scheduled to run once per hour through Amazon EventBridge, which triggers an AWS Lambda function that calls the AWS Glue GetJob and GetJobRun APIs. We parse this data to check for jobs that have succeeded, stopped, or failed in the past hour, as well as any streaming jobs. The metadata is extracted from each job run, including information like runtime, start time, end time, auto scaling, number of workers, and worker type, and is written to an Amazon DynamoDB table with TTL (time to live) enabled to ensure the table doesn’t grow too large.

We move into a parallel state to check two tables that Amazon Athena writes the output of the federated queries to. Athena first checks to make sure the tables exist in Amazon Simple Storage Service (Amazon S3), where the data will be stored. If the tables don’t exist, Athena creates them. One federated query gathers AWS Glue metric data from Amazon CloudWatch metrics; the other gathers data from the DynamoDB table where Lambda writes the AWS Glue job metadata it’s collecting. Both federated queries utilize appropriate filtering in order to only scan the necessary data from each source.

There is a choice state for each branch. If there is no new data to be added to a table in Amazon S3, the state ends and waits for the other to complete. For example, there could be an AWS Glue job that is running while the step is evaluating. In this case, the metrics for the job would be inserted in the table on Amazon S3, but the metadata from DynamoDB wouldn’t arrive until the following hour after the job has succeeded, stopped, or failed.

When new metrics or metadata are found, Athena inserts this data to the metrics or metadata tables in Amazon S3, which are both partitioned by the hour. After the data is inserted, the final steps call the QuickSight CreateIngestion API, which triggers data ingestion into QuickSight SPICE to power interactive analysis. At this point, the workflow has finished running and will run again the following hour.

In the following sections, we show you how to set up the solution, explore the dashboards, and configure alarms.

The code for this solution can be found at the AWS samples GitHub repository.

Prerequisites

You should have the following prerequisites:

Deploy solution resources with the AWS CDK

To provision the resources that build the dashboard and keep it up to date, we provide steps to download and deploy the solution via the AWS CDK. The solution was developed with cost-optimization as a priority, but some resources in the stack will incur costs once deployed.

This solution generates the following resources:

  • IAM role
  • EventBridge rule
  • Step Functions state machine
  • Lambda function
  • S3 bucket
  • Two AWS Glue tables and one AWS Glue database
  • DynamoDB table
  • Athena queries invoked by Step Functions
  • QuickSight data source, dataset, analysis, and dashboard

To deploy the solution, complete the following steps:

  1. Clone the source code from AWS samples GitHub repository to the client:
    git clone https://github.com/aws-samples/glue-metrics-in-quicksight

  2. Bootstrap your AWS CDK app:
    cd glue-metrics-in-quicksight
    npm i aws-cdk-lib
    cdk bootstrap

  3. Deploy the solution with the required parameters:
    1. The first parameter is for a new S3 bucket to be created, which holds the AWS Glue metrics and metadata.
    2. The second parameter is required in order for QuickSight to assign permissions to the user who will manage the assets. Refer to Managing user access inside Amazon QuickSight to find your existing QuickSight users.
      cdk deploy --parameters BucketName=New-Unique-Bucket-Name --parameters QuicksightUsername=QuickSight-Existing-User

If your deployment fails, make sure you installed the AWS CDK library and rerun cdk deploy after installing:

npm i aws-cdk-lib

The deployment may take up to 10 minutes.

After the solution is deployed, the Step Functions state machine will evaluate once per hour if it should ingest data into QuickSight. You can run some AWS Glue jobs after the stack is deployed and check the QuickSight dashboard in the next hour or two, where the job metadata and metrics will be populated for your analysis.

Explore the dashboard

The dashboard contains two sheets: Glue Jobs and Glue Metrics.

The Glue Jobs sheet includes all of the metadata about your AWS Glue job runs, including AWS Glue for Apache Spark, AWS Glue for Ray, and AWS Glue streaming ETL. Most of the visuals also have a hierarchy that you can drill down into with QuickSight, going as low as each specific job run ID. You can use controls to filter by date, job name, and job run ID.

In the following demonstration, you will see the pivot table, which is a simple view of all our job metadata, including estimated cost per job and job run. We open up a job name and see the different job runs. There is one individual job run that we would like to inspect the metrics on, so we choose the job name and choose View metrics for job run id: <my job run id>. This will take us to the Glue Metrics sheet and automatically filter for the job run ID we want to view.

glue information sheet

The Glue Metrics sheet is built to reflect the documentation we provide in AWS Glue resource monitoring. This documentation helps explain each visual in the dashboard. You can use the Glue Metrics sheet to view aggregated metrics across all jobs, a single job, or down to the job run ID.

To populate the Glue Metrics sheet, your AWS Glue jobs must be enabled to capture metrics in CloudWatch.

glue metrics sheet

Set up alerts

Setting up alerts on measures is also straightforward to do in QuickSight. To do so, choose (right-click) one of the tracked measures on either worksheet and choose Create Alarm. This will bring you to the configuration page to set up the metric you’d like to be alerted on.

quicksight alarm

The dashboard is designed to give you the freedom to alter it and make your own visualizations with the metadata and metrics that are provided to you. If you want even more insight into cost, consider deploying the CUDOS dashboard as well!

Clean up

If you no longer need the dashboard, delete the CDK app:

cdk destroy

Conclusion

In this post, we talked about the importance of having observability of your AWS Glue jobs and provided an AWS CDK app that deploys a QuickSight dashboard for you. We hope this helps you optimize your AWS Glue environment using the insights the dashboard provides. To learn about event-based alerting for your AWS Glue for Apache Spark and Ray jobs, refer to Automate alerting and reporting for AWS Glue job resource usage.


About the authors

Michael Hamilton is a Sr Analytics Solutions Architect focusing on helping enterprise customers in the south east modernize and simplify their analytics workloads on AWS. He enjoys mountain biking and spending time with his wife and three children when not working.

Cody Penta is a Solutions Architect at Amazon Web Services and is based out of Charlotte, NC. He has a focus in security and CDK, and enjoys solving the really difficult problems in the technology world. Off the clock, he loves relaxing in the mountains, coding personal projects, and gaming.

Angus Ferguson is a Solutions Architect at AWS who is passionate about meeting customers across the world, helping them solve their technical challenges. Angus specializes in Data & Analytics with a focus on customers in the financial services industry.

Unlock scalable analytics with AWS Glue and Google BigQuery

Post Syndicated from Kartikay Khator original https://aws.amazon.com/blogs/big-data/unlock-scalable-analytics-with-aws-glue-and-google-bigquery/

Data integration is the foundation of robust data analytics. It encompasses the discovery, preparation, and composition of data from diverse sources. In the modern data landscape, accessing, integrating, and transforming data from diverse sources is a vital process for data-driven decision-making. AWS Glue, a serverless data integration and extract, transform, and load (ETL) service, has revolutionized this process, making it more accessible and efficient. AWS Glue eliminates complexities and costs, allowing organizations to perform data integration tasks in minutes, boosting efficiency.

This blog post explores the newly announced managed connector for Google BigQuery and demonstrates how to build a modern ETL pipeline with AWS Glue Studio without writing code.

Overview of AWS Glue

AWS Glue is a serverless data integration service that makes it easier to discover, prepare, and combine data for analytics, machine learning (ML), and application development. AWS Glue provides all the capabilities needed for data integration, so you can start analyzing your data and putting it to use in minutes instead of months. AWS Glue provides both visual and code-based interfaces to make data integration easier. Users can more easily find and access data using the AWS Glue Data Catalog. Data engineers and ETL (extract, transform, and load) developers can visually create, run, and monitor ETL workflows in a few steps in AWS Glue Studio. Data analysts and data scientists can use AWS Glue DataBrew to visually enrich, clean, and normalize data without writing code.

Introducing Google BigQuery Spark connector

To meet the demands of diverse data integration use cases, AWS Glue now offers a native spark connector for Google BigQuery. Customers can now use AWS Glue 4.0 for Spark to read from and write to tables in Google BigQuery. Additionally, you can read an entire table or run a custom query and write your data using direct and indirect writing methods. You connect to BigQuery using service account credentials stored securely in AWS Secrets Manager.

Benefits of Google BigQuery Spark connector

  • Seamless integration: The native connector offers an intuitive and streamlined interface for data integration, reducing the learning curve.
  • Cost efficiency: Building and maintaining custom connectors can be expensive. The native connector provided by AWS Glue is a cost-effective alternative.
  • Efficiency: Data transformation tasks that previously took weeks or months can now be accomplished within minutes, optimizing efficiency.

Solution overview

In this example, you create two ETL jobs using AWS Glue with the native Google BigQuery connector.

  1. Query a BigQuery table and save the data into Amazon Simple Storage Service (Amazon S3) in Parquet format.
  2. Use the data extracted from the first job to transform and create an aggregated result to be stored in Google BigQuery.

solution architecture

Prerequisites

The dataset used in this solution is the NCEI/WDS Global Significant Earthquake Database, with a global listing of over 5,700 earthquakes from 2150 BC to the present. Copy this public data into your Google BigQuery project or use your existing dataset.

Configure BigQuery connections

To connect to Google BigQuery from AWS Glue, see Configuring BigQuery connections. You must create and store your Google Cloud Platform credentials in a Secrets Manager secret, then associate that secret with a Google BigQuery AWS Glue connection.

Set up Amazon S3

Every object in Amazon S3 is stored in a bucket. Before you can store data in Amazon S3, you must create an S3 bucket to store the results.

To create an S3 bucket:

  1. On the AWS Management Console for Amazon S3, choose Create bucket.
  2. Enter a globally unique Name for your bucket; for example, awsglue-demo.
  3. Choose Create bucket.

Create an IAM role for the AWS Glue ETL job

When you create the AWS Glue ETL job, you specify an AWS Identity and Access Management (IAM) role for the job to use. The role must grant access to all resources used by the job, including Amazon S3 (for any sources, targets, scripts, driver files, and temporary directories), and Secrets Manager.

For instructions, see Configure an IAM role for your ETL job.

Solution walkthrough

Create a visual ETL job in AWS Glue Studio to transfer data from Google BigQuery to Amazon S3

  1. Open the AWS Glue console.
  2. In AWS Glue, navigate to Visual ETL under the ETL jobs section and create a new ETL job using Visual with a blank canvas.
  3. Enter a Name for your AWS Glue job, for example, bq-s3-dataflow.
  4. Select Google BigQuery as the data source.
    1. Enter a name for your Google BigQuery source node, for example, noaa_significant_earthquakes.
    2. Select a Google BigQuery connection, for example, bq-connection.
    3. Enter a Parent project, for example, bigquery-public-datasources.
    4. Select Choose a single table for the BigQuery Source.
    5. Enter the table you want to migrate in the form [dataset].[table], for example, noaa_significant_earthquakes.earthquakes.
      big query data source for bq to amazon s3 dataflow
  5. Next, choose the data target as Amazon S3.
    1. Enter a Name for the target Amazon S3 node, for example, earthquakes.
    2. Select the output data Format as Parquet.
    3. Select the Compression Type as Snappy.
    4. For the S3 Target Location, enter the bucket created in the prerequisites, for example, s3://<YourBucketName>/noaa_significant_earthquakes/earthquakes/.
    5. You should replace <YourBucketName> with the name of your bucket.
      s3 target node for bq to amazon s3 dataflow
  6. Next go to the Job details. In the IAM Role, select the IAM role from the prerequisites, for example, AWSGlueRole.
    IAM role for bq to amazon s3 dataflow
  7. Choose Save.

Run and monitor the job

  1. After your ETL job is configured, you can run the job. AWS Glue will run the ETL process, extracting data from Google BigQuery and loading it into your specified S3 location.
  2. Monitor the job’s progress in the AWS Glue console. You can see logs and job run history to ensure everything is running smoothly.

run and monitor bq to amazon s3 dataflow

Data validation

  1. After the job has run successfully, validate the data in your S3 bucket to ensure it matches your expectations. You can see the results using Amazon S3 Select.

review results in amazon s3 from the bq to s3 dataflow run

Automate and schedule

  1. If needed, set up job scheduling to run the ETL process regularly. You can use AWS to automate your ETL jobs, ensuring your S3 bucket is always up to date with the latest data from Google BigQuery.

You’ve successfully configured an AWS Glue ETL job to transfer data from Google BigQuery to Amazon S3. Next, you create the ETL job to aggregate this data and transfer it to Google BigQuery.

Finding earthquake hotspots with AWS Glue Studio Visual ETL.

  1. Open AWS Glue console.
  2. In AWS Glue navigate to Visual ETL under the ETL jobs section and create a new ETL job using Visual with a blank canvas.
  3. Provide a name for your AWS Glue job, for example, s3-bq-dataflow.
  4. Choose Amazon S3 as the data source.
    1. Enter a Name for the source Amazon S3 node, for example, earthquakes.
    2. Select S3 location as the S3 source type.
    3. Enter the S3 bucket created in the prerequisites as the S3 URL, for example, s3://<YourBucketName>/noaa_significant_earthquakes/earthquakes/.
    4. You should replace <YourBucketName> with the name of your bucket.
    5. Select the Data format as Parquet.
    6. Select Infer schema.
      amazon s3 source node for s3 to bq dataflow
  5. Next, choose Select Fields transformation.
    1. Select earthquakes as Node parents.
    2. Select fields: id, eq_primary, and country.
      select field node for amazon s3 to bq dataflow
  6. Next, choose Aggregate transformation.
    1. Enter a Name, for example Aggregate.
    2. Choose Select Fields as Node parents.
    3. Choose eq_primary and country as the group by columns.
    4. Add id as the aggregate column and count as the aggregation function.
      aggregate node for amazon s3 to bq dataflow
  7. Next, choose RenameField transformation.
    1. Enter a name for the source Amazon S3 node, for example, Rename eq_primary.
    2. Choose Aggregate as Node parents.
    3. Choose eq_primary as the Current field name and enter earthquake_magnitude as the New field name.
      rename eq_primary field for amazon s3 to bq dataflow
  8. Next, choose RenameField transformation
    1. Enter a name for the source Amazon S3 node, for example, Rename count(id).
    2. Choose Rename eq_primary as Node parents.
    3. Choose count(id) as the Current field name and enter number_of_earthquakes as the New field name.
      rename cound(id) field for amazon s3 to bq dataflow
  9. Next, choose the data target as Google BigQuery.
    1. Provide a name for your Google BigQuery source node, for example, most_powerful_earthquakes.
    2. Select a Google BigQuery connection, for example, bq-connection.
    3. Select Parent project, for example, bigquery-public-datasources.
    4. Enter the name of the Table you want to create in the form [dataset].[table], for example, noaa_significant_earthquakes.most_powerful_earthquakes.
    5. Choose Direct as the Write method.
      bq destination for amazon s3 to bq dataflow
  10. Next go to the Job details tab and in the IAM Role, select the IAM role from the prerequisites, for example, AWSGlueRole.
    IAM role for amazon s3 to bq dataflow
  11. Choose Save.

Run and monitor the job

  1. After your ETL job is configured, you can run the job. AWS Glue runs the ETL process, extracting data from Google BigQuery and loading it into your specified S3 location.
  2. Monitor the job’s progress in the AWS Glue console. You can see logs and job run history to ensure everything is running smoothly.

monitor and run for amazon s3 to bq dataflow

Data validation

  1. After the job has run successfully, validate the data in your Google BigQuery dataset. This ETL job returns a list of countries where the most powerful earthquakes have occurred. It provides these by counting the number of earthquakes for a given magnitude by country.

aggregated results for amazon s3 to bq dataflow

Automate and schedule

  1. You can set up job scheduling to run the ETL process regularly. AWS Glue allows you to automate your ETL jobs, ensuring your S3 bucket is always up to date with the latest data from Google BigQuery.

That’s it! You’ve successfully set up an AWS Glue ETL job to transfer data from Amazon S3 to Google BigQuery. You can use this integration to automate the process of data extraction, transformation, and loading between these two platforms, making your data readily available for analysis and other applications.

Clean up

To avoid incurring charges, clean up the resources used in this blog post from your AWS account by completing the following steps:

  1. On the AWS Glue console, choose Visual ETL in the navigation pane.
  2. From the list of jobs, select the job bq-s3-data-flow and delete it.
  3. From the list of jobs, select the job s3-bq-data-flow and delete it.
  4. On the AWS Glue console, choose Connections in the navigation pane under Data Catalog.
  5. Choose the BiqQuery connection you created and delete it.
  6. On the Secrets Manager console, choose the secret you created and delete it.
  7. On the IAM console, choose Roles in the navigation pane, then select the role you created for the AWS Glue ETL job and delete it.
  8. On the Amazon S3 console, search for the S3 bucket you created, choose Empty to delete the objects, then delete the bucket.
  9. Clean up resources in your Google account by deleting the project that contains the Google BigQuery resources. Follow the documentation to clean up the Google resources.

Conclusion

The integration of AWS Glue with Google BigQuery simplifies the analytics pipeline, reduces time-to-insight, and facilitates data-driven decision-making. It empowers organizations to streamline data integration and analytics. The serverless nature of AWS Glue means no infrastructure management, and you pay only for the resources consumed while your jobs are running. As organizations increasingly rely on data for decision-making, this native spark connector provides an efficient, cost-effective, and agile solution to swiftly meet data analytics needs.

If you’re interested to see how to read from and write to tables in Google BigQuery in AWS Glue, take a look at step-by-step video tutorial. In this tutorial, we walk through the entire process, from setting up the connection to running the data transfer flow. For more information on AWS Glue, visit AWS Glue.

Appendix

If you are looking to implement this example, using code instead of the AWS Glue console, use the following code snippets.

Reading data from Google BigQuery and writing data into Amazon S3

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# STEP-1 Read the data from Big Query Table 
noaa_significant_earthquakes_node1697123333266 = (
    glueContext.create_dynamic_frame.from_options(
        connection_type="bigquery",
        connection_options={
            "connectionName": "bq-connection",
            "parentProject": "bigquery-public-datasources",
            "sourceType": "table",
            "table": "noaa_significant_earthquakes.earthquakes",
        },
        transformation_ctx="noaa_significant_earthquakes_node1697123333266",
    )
)
# STEP-2 Write the data read from Big Query Table into S3
# You should replace <YourBucketName> with the name of your bucket.
earthquakes_node1697157772747 = glueContext.write_dynamic_frame.from_options(
    frame=noaa_significant_earthquakes_node1697123333266,
    connection_type="s3",
    format="glueparquet",
    connection_options={
        "path": "s3://<YourBucketName>/noaa_significant_earthquakes/earthquakes/",
        "partitionKeys": [],
    },
    format_options={"compression": "snappy"},
    transformation_ctx="earthquakes_node1697157772747",
)

job.commit()

Reading and aggregating data from Amazon S3 and writing into Google BigQuery

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from awsglue import DynamicFrame
from pyspark.sql import functions as SqlFuncs

def sparkAggregate(
    glueContext, parentFrame, groups, aggs, transformation_ctx
) -> DynamicFrame:
    aggsFuncs = []
    for column, func in aggs:
        aggsFuncs.append(getattr(SqlFuncs, func)(column))
    result = (
        parentFrame.toDF().groupBy(*groups).agg(*aggsFuncs)
        if len(groups) > 0
        else parentFrame.toDF().agg(*aggsFuncs)
    )
    return DynamicFrame.fromDF(result, glueContext, transformation_ctx)

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# STEP-1 Read the data from Amazon S3 bucket
# You should replace <YourBucketName> with the name of your bucket.
earthquakes_node1697218776818 = glueContext.create_dynamic_frame.from_options(
    format_options={},
    connection_type="s3",
    format="parquet",
    connection_options={
        "paths": [
            "s3://<YourBucketName>/noaa_significant_earthquakes/earthquakes/"
        ],
        "recurse": True,
    },
    transformation_ctx="earthquakes_node1697218776818",
)

# STEP-2 Select fields
SelectFields_node1697218800361 = SelectFields.apply(
    frame=earthquakes_node1697218776818,
    paths=["id", "eq_primary", "country"],
    transformation_ctx="SelectFields_node1697218800361",
)

# STEP-3 Aggregate data
Aggregate_node1697218823404 = sparkAggregate(
    glueContext,
    parentFrame=SelectFields_node1697218800361,
    groups=["eq_primary", "country"],
    aggs=[["id", "count"]],
    transformation_ctx="Aggregate_node1697218823404",
)

Renameeq_primary_node1697219483114 = RenameField.apply(
    frame=Aggregate_node1697218823404,
    old_name="eq_primary",
    new_name="earthquake_magnitude",
    transformation_ctx="Renameeq_primary_node1697219483114",
)

Renamecountid_node1697220511786 = RenameField.apply(
    frame=Renameeq_primary_node1697219483114,
    old_name="`count(id)`",
    new_name="number_of_earthquakes",
    transformation_ctx="Renamecountid_node1697220511786",
)

# STEP-1 Write the aggregated data in Google BigQuery
most_powerful_earthquakes_node1697220563923 = (
    glueContext.write_dynamic_frame.from_options(
        frame=Renamecountid_node1697220511786,
        connection_type="bigquery",
        connection_options={
            "connectionName": "bq-connection",
            "parentProject": "bigquery-public-datasources",
            "writeMethod": "direct",
            "table": "noaa_significant_earthquakes.most_powerful_earthquakes",
        },
        transformation_ctx="most_powerful_earthquakes_node1697220563923",
    )
)

job.commit()


About the authors

Kartikay Khator is a Solutions Architect in Global Life Sciences at Amazon Web Services (AWS). He is passionate about building innovative and scalable solutions to meet the needs of customers, focusing on AWS Analytics services. Beyond the tech world, he is an avid runner and enjoys hiking.

Kamen SharlandjievKamen Sharlandjiev is a Sr. Big Data and ETL Solutions Architect and Amazon AppFlow expert. He’s on a mission to make life easier for customers who are facing complex data integration challenges. His secret weapon? Fully managed, low-code AWS services that can get the job done with minimal effort and no coding.

Anshul SharmaAnshul Sharma is a Software Development Engineer in AWS Glue Team. He is driving the connectivity charter which provide Glue customer native way of connecting any Data source (Data-warehouse, Data-lakes, NoSQL etc) to Glue ETL Jobs. Beyond the tech world, he is a cricket and soccer lover.

Create, train, and deploy Amazon Redshift ML model integrating features from Amazon SageMaker Feature Store

Post Syndicated from Anirban Sinha original https://aws.amazon.com/blogs/big-data/create-train-and-deploy-amazon-redshift-ml-model-integrating-features-from-amazon-sagemaker-feature-store/

Amazon Redshift is a fast, petabyte-scale, cloud data warehouse that tens of thousands of customers rely on to power their analytics workloads. Data analysts and database developers want to use this data to train machine learning (ML) models, which can then be used to generate insights on new data for use cases such as forecasting revenue, predicting customer churn, and detecting anomalies. Amazon Redshift ML makes it easy for SQL users to create, train, and deploy ML models using SQL commands familiar to many roles such as executives, business analysts, and data analysts. We covered in a previous post how you can use data in Amazon Redshift to train models in Amazon SageMaker, a fully managed ML service, and then make predictions within your Redshift data warehouse.

Redshift ML currently supports ML algorithms such as XGBoost, multilayer perceptron (MLP), KMEANS, and Linear Learner. Additionally, you can import existing SageMaker models into Amazon Redshift for in-database inference or remotely invoke a SageMaker endpoint.

Amazon SageMaker Feature Store is a fully managed, purpose-built repository to store, share, and manage features for ML models. However, one challenge in training a production-ready ML model using SageMaker Feature Store is access to a diverse set of features that aren’t always owned and maintained by the team that is building the model. For example, an ML model to identify fraudulent financial transactions needs access to both identifying (device type, browser) and transaction (amount, credit or debit, and so on) related features. As a data scientist building an ML model, you may have access to the identifying information but not the transaction information, and having access to a feature store solves this.

In this post, we discuss the combined feature store pattern, which allows teams to maintain their own local feature stores using a local Redshift table while still being able to access shared features from the centralized feature store. In a local feature store, you can store sensitive data that can’t be shared across the organization for regulatory and compliance reasons.

We also show you how to use familiar SQL statements to create and train ML models by combining shared features from the centralized store with local features and use these models to make in-database predictions on new data for use cases such as fraud risk scoring.

Overview of solution

For this post, we create an ML model to predict if a transaction is fraudulent or not, given the transaction record. To build this, we need to engineer features that describe an individual credit card’s spending pattern, such as the number of transactions or the average transaction amount, and also information about the merchant, the cardholder, the device used to make the payment, and any other data that may be relevant to detecting fraud.

To get started, we need an Amazon Redshift Serverless data warehouse with the Redshift ML feature enabled and an Amazon SageMaker Studio environment with access to SageMaker Feature Store. For an introduction to Redshift ML and instructions on setting it up, see Create, train, and deploy machine learning models in Amazon Redshift using SQL with Amazon Redshift ML.

We also need an offline feature store to store features in feature groups. The offline store uses an Amazon Simple Storage Service (Amazon S3) bucket for storage and can also fetch data using Amazon Athena queries. For an introduction to SageMaker Feature Store and instructions on setting it up, see Getting started with Amazon SageMaker Feature Store.

The following diagram illustrates solution architecture.

The workflow contains the following steps:

  1. Create the offline feature group in SageMaker Feature Store and ingest data into the feature group.
  2. Create a Redshift table and load local feature data into the table.
  3. Create an external schema for Amazon Redshift Spectrum to access the offline store data stored in Amazon S3 using the AWS Glue Data Catalog.
  4. Train and validate a fraud risk scoring ML model using local feature data and external offline feature store data.
  5. Use the offline feature store and local store for inference.

Dataset

To demonstrate this use case, we use a synthetic dataset with two tables: identity and transactions. They can both be joined by the TransactionID column. The transaction table contains information about a particular transaction, such as amount, credit or debit card, and so on, and the identity table contains information about the user, such as device type and browser. The transaction must exist in the transaction table, but might not always be available in the identity table.

The following is an example of the transactions dataset.

The following is an example of the identity dataset.

Let’s assume that across the organization, data science teams centrally manage the identity data and process it to extract features in a centralized offline feature store. The data warehouse team ingests and analyzes transaction data in a Redshift table, owned by them.

We work through this use case to understand how the data warehouse team can securely retrieve the latest features from the identity feature group and join it with transaction data in Amazon Redshift to create a feature set for training and inferencing a fraud detection model.

Create the offline feature group and ingest data

To start, we set up SageMaker Feature Store, create a feature group for the identity dataset, inspect and process the dataset, and ingest some sample data. We then prepare the transaction features from the transaction data and store it in Amazon S3 for further loading into the Redshift table.

Alternatively, you can author features using Amazon SageMaker Data Wrangler, create feature groups in SageMaker Feature Store, and ingest features in batches using an Amazon SageMaker Processing job with a notebook exported from SageMaker Data Wrangler. This mode allows for batch ingestion into the offline store.

Let’s explore some of the key steps in this section.

  1. Download the sample notebook.
  2. On the SageMaker console, under Notebook in the navigation pane, choose Notebook instances.
  3. Locate your notebook instance and choose Open Jupyter.
  4. Choose Upload and upload the notebook you just downloaded.
  5. Open the notebook sagemaker_featurestore_fraud_redshiftml_python_sdk.ipynb.
  6. Follow the instructions and run all the cells up to the Cleanup Resources section.

The following are key steps from the notebook:

  1. We create a Pandas DataFrame with the initial CSV data. We apply feature transformations for this dataset.
    identity_data = pd.read_csv(io.BytesIO(identity_data_object["Body"].read()))
    transaction_data = pd.read_csv(io.BytesIO(transaction_data_object["Body"].read()))
    
    identity_data = identity_data.round(5)
    transaction_data = transaction_data.round(5)
    
    identity_data = identity_data.fillna(0)
    transaction_data = transaction_data.fillna(0)
    
    # Feature transformations for this dataset are applied 
    # One hot encode card4, card6
    encoded_card_bank = pd.get_dummies(transaction_data["card4"], prefix="card_bank")
    encoded_card_type = pd.get_dummies(transaction_data["card6"], prefix="card_type")
    
    transformed_transaction_data = pd.concat(
        [transaction_data, encoded_card_type, encoded_card_bank], axis=1
    )

  2. We store the processed and transformed transaction dataset in an S3 bucket. This transaction data will be loaded later in the Redshift table for building the local feature store.
    transformed_transaction_data.to_csv("transformed_transaction_data.csv", header=False, index=False)
    s3_client.upload_file("transformed_transaction_data.csv", default_s3_bucket_name, prefix + "/training_input/transformed_transaction_data.csv")

  3. Next, we need a record identifier name and an event time feature name. In our fraud detection example, the column of interest is TransactionID.EventTime can be appended to your data when no timestamp is available. In the following code, you can see how these variables are set, and then EventTime is appended to both features’ data.
    # record identifier and event time feature names
    record_identifier_feature_name = "TransactionID"
    event_time_feature_name = "EventTime"
    
    # append EventTime feature
    identity_data[event_time_feature_name] = pd.Series(
        [current_time_sec] * len(identity_data), dtype="float64"
    )

  4. We then create and ingest the data into the feature group using the SageMaker SDK FeatureGroup.ingest API. This is a small dataset and therefore can be loaded into a Pandas DataFrame. When we work with large amounts of data and millions of rows, there are other scalable mechanisms to ingest data into SageMaker Feature Store, such as batch ingestion with Apache Spark.
    identity_feature_group.create(
        s3_uri=<S3_Path_Feature_Store>,
        record_identifier_name=record_identifier_feature_name,
        event_time_feature_name=event_time_feature_name,
        role_arn=<role_arn>,
        enable_online_store=False,
    )
    
    identity_feature_group_name = "identity-feature-group"
    
    # load feature definitions to the feature group. SageMaker FeatureStore Python SDK will auto-detect the data schema based on input data.
    identity_feature_group.load_feature_definitions(data_frame=identity_data)
    identity_feature_group.ingest(data_frame=identity_data, max_workers=3, wait=True)
    

  5. We can verify that data has been ingested into the feature group by running Athena queries in the notebook or running queries on the Athena console.

At this point, the identity feature group is created in an offline feature store with historical data persisted in Amazon S3. SageMaker Feature Store automatically creates an AWS Glue Data Catalog for the offline store, which enables us to run SQL queries against the offline data using Athena or Redshift Spectrum.

Create a Redshift table and load local feature data

To build a Redshift ML model, we build a training dataset joining the identity data and transaction data using SQL queries. The identity data is in a centralized feature store where the historical set of records are persisted in Amazon S3. The transaction data is a local feature for training data that needs to made available in the Redshift table.

Let’s explore how to create the schema and load the processed transaction data from Amazon S3 into a Redshift table.

  1. Create the customer_transaction table and load daily transaction data into the table, which you’ll use to train the ML model:
    DROP TABLE customer_transaction;
    CREATE TABLE customer_transaction (
      TransactionID INT,    
      isFraud INT,  
      TransactionDT INT,    
      TransactionAmt decimal(10,2), 
      card1 INT,    
      card2 decimal(10,2),card3 decimal(10,2),  
      card4 VARCHAR(20),card5 decimal(10,2),    
      card6 VARCHAR(20),    
      B1 INT,B2 INT,B3 INT,B4 INT,B5 INT,B6 INT,
      B7 INT,B8 INT,B9 INT,B10 INT,B11 INT,B12 INT,
      F1 INT,F2 INT,F3 INT,F4 INT,F5 INT,F6 INT,
      F7 INT,F8 INT,F9 INT,F10 INT,F11 INT,F12 INT,
      F13 INT,F14 INT,F15 INT,F16 INT,F17 INT,  
      N1 VARCHAR(20),N2 VARCHAR(20),N3 VARCHAR(20), 
      N4 VARCHAR(20),N5 VARCHAR(20),N6 VARCHAR(20), 
      N7 VARCHAR(20),N8 VARCHAR(20),N9 VARCHAR(20), 
      card_type_0  boolean,
      card_type_credit boolean,
      card_type_debit  boolean,
      card_bank_0  boolean,
      card_bank_american_express boolean,
      card_bank_discover  boolean,
      card_bank_mastercard  boolean,
      card_bank_visa boolean  
    );

  2. Load the sample data by using the following command. Replace your Region and S3 path as appropriate. You will find the S3 path in the S3 Bucket Setup For The OfflineStore section in the notebook or by checking the dataset_uri_prefix in the notebook.
    COPY customer_transaction
    FROM '<s3path>/transformed_transaction_data.csv' 
    IAM_ROLE default delimiter ',' 
    region 'your-region';

Now that we have created a local feature store for the transaction data, we focus on integrating a centralized feature store with Amazon Redshift to access the identity data.

Create an external schema for Redshift Spectrum to access the offline store data

We have created a centralized feature store for identity features, and we can access this offline feature store using services such as Redshift Spectrum. When the identity data is available through the Redshift Spectrum table, we can create a training dataset with feature values from the identity feature group and customer_transaction, joining on the TransactionId column.

This section provides an overview of how to enable Redshift Spectrum to query data directly from files on Amazon S3 through an external database in an AWS Glue Data Catalog.

  1. First, check that the identity-feature-group table is present in the Data Catalog under the sagemamker_featurestore database.
  2. Using Redshift Query Editor V2, create an external schema using the following command:
    CREATE EXTERNAL SCHEMA sagemaker_featurestore
    FROM DATA CATALOG
    DATABASE 'sagemaker_featurestore'
    IAM_ROLE default
    create external database if not exists;

All the tables, including identity-feature-group external tables, are visible under the sagemaker_featurestore external schema. In Redshift Query Editor v2, you can check the contents of the external schema.

  1. Run the following query to sample a few records—note that your table name may be different:
    Select * from sagemaker_featurestore.identity_feature_group_1680208535 limit 10;

  2. Create a view to join the latest data from identity-feature-group and customer_transaction on the TransactionId column. Be sure to change the external table name to match your external table name:
    create or replace view public.credit_fraud_detection_v
    AS select  "isfraud",
            "transactiondt",
            "transactionamt",
            "card1","card2","card3","card5",
             case when "card_type_credit" = 'False' then 0 else 1 end as card_type_credit,
             case when "card_type_debit" = 'False' then 0 else 1 end as card_type_debit,
             case when "card_bank_american_express" = 'False' then 0 else 1 end as card_bank_american_express,
             case when "card_bank_discover" = 'False' then 0 else 1 end as card_bank_discover,
             case when "card_bank_mastercard" = 'False' then 0 else 1 end as card_bank_mastercard,
             case when "card_bank_visa" = 'False' then 0 else 1 end as card_bank_visa,
            "id_01","id_02","id_03","id_04","id_05"
    from public.customer_transaction ct left join sagemaker_featurestore.identity_feature_group_1680208535 id
    on id.transactionid = ct.transactionid with no schema binding;

Train and validate the fraud risk scoring ML model

Redshift ML gives you the flexibility to specify your own algorithms and model types and also to provide your own advanced parameters, which can include preprocessors, problem type, and hyperparameters. In this post, we create a customer model by specifying AUTO OFF and the model type of XGBOOST. By turning AUTO OFF and using XGBoost, we are providing the necessary inputs for SageMaker to train the model. A benefit of this can be faster training times. XGBoost is as open-source version of the gradient boosted trees algorithm. For more details on XGBoost, refer to Build XGBoost models with Amazon Redshift ML.

We train the model using 80% of the dataset by filtering on transactiondt < 12517618. The other 20% will be used for inference. A centralized feature store is useful in providing the latest supplementing data for training requests. Note that you will need to provide an S3 bucket name in the create model statement. It will take approximately 10 minutes to create the model.

CREATE MODEL frauddetection_xgboost
FROM (select  "isfraud",
        "transactiondt",
        "transactionamt",
        "card1","card2","card3","card5",
        "card_type_credit",
        "card_type_debit",
        "card_bank_american_express",
        "card_bank_discover",
        "card_bank_mastercard",
        "card_bank_visa",
        "id_01","id_02","id_03","id_04","id_05"
from credit_fraud_detection_v where transactiondt < 12517618
)
TARGET isfraud
FUNCTION ml_fn_frauddetection_xgboost
IAM_ROLE default
AUTO OFF
MODEL_TYPE XGBOOST
OBJECTIVE 'binary:logistic'
PREPROCESSORS 'none'
HYPERPARAMETERS DEFAULT EXCEPT(NUM_ROUND '100')
SETTINGS (S3_BUCKET <s3_bucket>);

When you run the create model command, it will complete quickly in Amazon Redshift while the model training is happening in the background using SageMaker. You can check the status of the model by running a show model command:

show model frauddetection_xgboost;

The output of the show model command shows that the model state is TRAINING. It also shows other information such as the model type and the training job name that SageMaker assigned.
After a few minutes, we run the show model command again:

show model frauddetection_xgboost;

Now the output shows the model state is READY. We can also see the train:error score here, which at 0 tells us we have a good model. Now that the model is trained, we can use it for running inference queries.

Use the offline feature store and local store for inference

We can use the SQL function to apply the ML model to data in queries, reports, and dashboards. Let’s use the function ml_fn_frauddetection_xgboost created by our model against our test dataset by filtering where transactiondt >=12517618, to predict whether a transaction is fraudulent or not. SageMaker Feature Store can be useful in supplementing data for inference requests.

Run the following query to predict whether transactions are fraudulent or not:

select  "isfraud" as "Actual",
        ml_fn_frauddetection_xgboost(
        "transactiondt",
        "transactionamt",
        "card1","card2","card3","card5",
        "card_type_credit",
        "card_type_debit",
        "card_bank_american_express",
        "card_bank_discover",
        "card_bank_mastercard",
        "card_bank_visa",
        "id_01","id_02","id_03","id_04","id_05") as "Predicted"
from credit_fraud_detection_v where transactiondt >= 12517618;

For binary and multi-class classification problems, we compute the accuracy as the model metric. Accuracy can be calculated based on the following:

accuracy = (sum (actual == predicted)/total) *100

Let’s apply the preceding code to our use case to find the accuracy of the model. We use the test data (transactiondt >= 12517618) to test the accuracy, and use the newly created function ml_fn_frauddetection_xgboost to predict and take the columns other than the target and label as the input:

-- check accuracy 
WITH infer_data AS (
SELECT "isfraud" AS label,
ml_fn_frauddetection_xgboost(
        "transactiondt",
        "transactionamt",
        "card1","card2","card3","card5",
        "card_type_credit",
        "card_type_debit",
        "card_bank_american_express",
        "card_bank_discover",
        "card_bank_mastercard",
        "card_bank_visa",
        "id_01","id_02","id_03","id_04","id_05") AS predicted,
CASE 
   WHEN label IS NULL
       THEN 0
   ELSE label
   END AS actual,
CASE 
   WHEN actual = predicted
       THEN 1::INT
   ELSE 0::INT
   END AS correct
FROM credit_fraud_detection_v where transactiondt >= 12517618),
aggr_data AS (
SELECT SUM(correct) AS num_correct,
COUNT(*) AS total
FROM infer_data) 

SELECT (num_correct::FLOAT / total::FLOAT) AS accuracy FROM aggr_data;

Clean up

As a final step, clean up the resources:

  1. Delete the Redshift cluster.
  2. Run the Cleanup Resources section of your notebook.

Conclusion

Redshift ML enables you to bring machine learning to your data, powering fast and informed decision-making. SageMaker Feature Store provides a purpose-built feature management solution to help organizations scale ML development across business units and data science teams.

In this post, we showed how you can train an XGBoost model using Redshift ML with data spread across SageMaker Feature Store and a Redshift table. Additionally, we showed how you can make inferences on a trained model to detect fraud using Amazon Redshift SQL commands.


About the authors

Anirban Sinha is a Senior Technical Account Manager at AWS. He is passionate about building scalable data warehouses and big data solutions working closely with customers. He works with large ISVs customers, in helping them build and operate secure, resilient, scalable, and high-performance SaaS applications in the cloud.

Phil Bates is a Senior Analytics Specialist Solutions Architect at AWS. He has more than 25 years of experience implementing large-scale data warehouse solutions. He is passionate about helping customers through their cloud journey and using the power of ML within their data warehouse.

Gaurav Singh is a Senior Solutions Architect at AWS, specializing in AI/ML and Generative AI. Based in Pune, India, he focuses on helping customers build, deploy, and migrate ML production workloads to SageMaker at scale. In his spare time, Gaurav loves to explore nature, read, and run.

Unstructured data management and governance using AWS AI/ML and analytics services

Post Syndicated from Sakti Mishra original https://aws.amazon.com/blogs/big-data/unstructured-data-management-and-governance-using-aws-ai-ml-and-analytics-services/

Unstructured data is information that doesn’t conform to a predefined schema or isn’t organized according to a preset data model. Unstructured information may have a little or a lot of structure but in ways that are unexpected or inconsistent. Text, images, audio, and videos are common examples of unstructured data. Most companies produce and consume unstructured data such as documents, emails, web pages, engagement center phone calls, and social media. By some estimates, unstructured data can make up to 80–90% of all new enterprise data and is growing many times faster than structured data. After decades of digitizing everything in your enterprise, you may have an enormous amount of data, but with dormant value. However, with the help of AI and machine learning (ML), new software tools are now available to unearth the value of unstructured data.

In this post, we discuss how AWS can help you successfully address the challenges of extracting insights from unstructured data. We discuss various design patterns and architectures for extracting and cataloging valuable insights from unstructured data using AWS. Additionally, we show how to use AWS AI/ML services for analyzing unstructured data.

Why it’s challenging to process and manage unstructured data

Unstructured data makes up a large proportion of the data in the enterprise that can’t be stored in a traditional relational database management systems (RDBMS). Understanding the data, categorizing it, storing it, and extracting insights from it can be challenging. In addition, identifying incremental changes requires specialized patterns and detecting sensitive data and meeting compliance requirements calls for sophisticated functions. It can be difficult to integrate unstructured data with structured data from existing information systems. Some view structured and unstructured data as apples and oranges, instead of being complementary. But most important of all, the assumed dormant value in the unstructured data is a question mark, which can only be answered after these sophisticated techniques have been applied. Therefore, there is a need to being able to analyze and extract value from the data economically and flexibly.

Solution overview

Data and metadata discovery is one of the primary requirements in data analytics, where data consumers explore what data is available and in what format, and then consume or query it for analysis. If you can apply a schema on top of the dataset, then it’s straightforward to query because you can load the data into a database or impose a virtual table schema for querying. But in the case of unstructured data, metadata discovery is challenging because the raw data isn’t easily readable.

You can integrate different technologies or tools to build a solution. In this post, we explain how to integrate different AWS services to provide an end-to-end solution that includes data extraction, management, and governance.

The solution integrates data in three tiers. The first is the raw input data that gets ingested by source systems, the second is the output data that gets extracted from input data using AI, and the third is the metadata layer that maintains a relationship between them for data discovery.

The following is a high-level architecture of the solution we can build to process the unstructured data, assuming the input data is being ingested to the raw input object store.

Unstructured Data Management - Block Level Architecture Diagram

The steps of the workflow are as follows:

  1. Integrated AI services extract data from the unstructured data.
  2. These services write the output to a data lake.
  3. A metadata layer helps build the relationship between the raw data and AI extracted output. When the data and metadata are available for end-users, we can break the user access pattern into additional steps.
  4. In the metadata catalog discovery step, we can use query engines to access the metadata for discovery and apply filters as per our analytics needs. Then we move to the next stage of accessing the actual data extracted from the raw unstructured data.
  5. The end-user accesses the output of the AI services and uses the query engines to query the structured data available in the data lake. We can optionally integrate additional tools that help control access and provide governance.
  6. There might be scenarios where, after accessing the AI extracted output, the end-user wants to access the original raw object (such as media files) for further analysis. Additionally, we need to make sure we have access control policies so the end-user has access only to the respective raw data they want to access.

Now that we understand the high-level architecture, let’s discuss what AWS services we can integrate in each step of the architecture to provide an end-to-end solution.

The following diagram is the enhanced version of our solution architecture, where we have integrated AWS services.

Unstructured Data Management - AWS Native Architecture

Let’s understand how these AWS services are integrated in detail. We have divided the steps into two broad user flows: data processing and metadata enrichment (Steps 1–3) and end-users accessing the data and metadata with fine-grained access control (Steps 4–6).

  1. Various AI services (which we discuss in the next section) extract data from the unstructured datasets.
  2. The output is written to an Amazon Simple Storage Service (Amazon S3) bucket (labeled Extracted JSON in the preceding diagram). Optionally, we can restructure the input raw objects for better partitioning, which can help while implementing fine-grained access control on the raw input data (labeled as the Partitioned bucket in the diagram).
  3. After the initial data extraction phase, we can apply additional transformations to enrich the datasets using AWS Glue. We also build an additional metadata layer, which maintains a relationship between the raw S3 object path, the AI extracted output path, the optional enriched version S3 path, and any other metadata that will help the end-user discover the data.
  4. In the metadata catalog discovery step, we use the AWS Glue Data Catalog as the technical catalog, Amazon Athena and Amazon Redshift Spectrum as query engines, AWS Lake Formation for fine-grained access control, and Amazon DataZone for additional governance.
  5. The AI extracted output is expected to be available as a delimited file or in JSON format. We can create an AWS Glue Data Catalog table for querying using Athena or Redshift Spectrum. Like the previous step, we can use Lake Formation policies for fine-grained access control.
  6. Lastly, the end-user accesses the raw unstructured data available in Amazon S3 for further analysis. We have proposed integrating Amazon S3 Access Points for access control at this layer. We explain this in detail later in this post.

Now let’s expand the following parts of the architecture to understand the implementation better:

  • Using AWS AI services to process unstructured data
  • Using S3 Access Points to integrate access control on raw S3 unstructured data

Process unstructured data with AWS AI services

As we discussed earlier, unstructured data can come in a variety of formats, such as text, audio, video, and images, and each type of data requires a different approach for extracting metadata. AWS AI services are designed to extract metadata from different types of unstructured data. The following are the most commonly used services for unstructured data processing:

  • Amazon Comprehend – This natural language processing (NLP) service uses ML to extract metadata from text data. It can analyze text in multiple languages, detect entities, extract key phrases, determine sentiment, and more. With Amazon Comprehend, you can easily gain insights from large volumes of text data such as extracting product entity, customer name, and sentiment from social media posts.
  • Amazon Transcribe – This speech-to-text service uses ML to convert speech to text and extract metadata from audio data. It can recognize multiple speakers, transcribe conversations, identify keywords, and more. With Amazon Transcribe, you can convert unstructured data such as customer support recordings into text and further derive insights from it.
  • Amazon Rekognition – This image and video analysis service uses ML to extract metadata from visual data. It can recognize objects, people, faces, and text, detect inappropriate content, and more. With Amazon Rekognition, you can easily analyze images and videos to gain insights such as identifying entity type (human or other) and identifying if the person is a known celebrity in an image.
  • Amazon Textract – You can use this ML service to extract metadata from scanned documents and images. It can extract text, tables, and forms from images, PDFs, and scanned documents. With Amazon Textract, you can digitize documents and extract data such as customer name, product name, product price, and date from an invoice.
  • Amazon SageMaker – This service enables you to build and deploy custom ML models for a wide range of use cases, including extracting metadata from unstructured data. With SageMaker, you can build custom models that are tailored to your specific needs, which can be particularly useful for extracting metadata from unstructured data that requires a high degree of accuracy or domain-specific knowledge.
  • Amazon Bedrock – This fully managed service offers a choice of high-performing foundation models (FMs) from leading AI companies like AI21 Labs, Anthropic, Cohere, Meta, Stability AI, and Amazon with a single API. It also offers a broad set of capabilities to build generative AI applications, simplifying development while maintaining privacy and security.

With these specialized AI services, you can efficiently extract metadata from unstructured data and use it for further analysis and insights. It’s important to note that each service has its own strengths and limitations, and choosing the right service for your specific use case is critical for achieving accurate and reliable results.

AWS AI services are available via various APIs, which enables you to integrate AI capabilities into your applications and workflows. AWS Step Functions is a serverless workflow service that allows you to coordinate and orchestrate multiple AWS services, including AI services, into a single workflow. This can be particularly useful when you need to process large amounts of unstructured data and perform multiple AI-related tasks, such as text analysis, image recognition, and NLP.

With Step Functions and AWS Lambda functions, you can create sophisticated workflows that include AI services and other AWS services. For instance, you can use Amazon S3 to store input data, invoke a Lambda function to trigger an Amazon Transcribe job to transcribe an audio file, and use the output to trigger an Amazon Comprehend analysis job to generate sentiment metadata for the transcribed text. This enables you to create complex, multi-step workflows that are straightforward to manage, scalable, and cost-effective.

The following is an example architecture that shows how Step Functions can help invoke AWS AI services using Lambda functions.

AWS AI Services - Lambda Event Workflow -Unstructured Data

The workflow steps are as follows:

  1. Unstructured data, such as text files, audio files, and video files, are ingested into the S3 raw bucket.
  2. A Lambda function is triggered to read the data from the S3 bucket and call Step Functions to orchestrate the workflow required to extract the metadata.
  3. The Step Functions workflow checks the type of file, calls the corresponding AWS AI service APIs, checks the job status, and performs any postprocessing required on the output.
  4. AWS AI services can be accessed via APIs and invoked as batch jobs. To extract metadata from different types of unstructured data, you can use multiple AI services in sequence, with each service processing the corresponding file type.
  5. After the Step Functions workflow completes the metadata extraction process and performs any required postprocessing, the resulting output is stored in an S3 bucket for cataloging.

Next, let’s understand how can we implement security or access control on both the extracted output as well as the raw input objects.

Implement access control on raw and processed data in Amazon S3

We just consider access controls for three types of data when managing unstructured data: the AI-extracted semi-structured output, the metadata, and the raw unstructured original files. When it comes to AI extracted output, it’s in JSON format and can be restricted via Lake Formation and Amazon DataZone. We recommend keeping the metadata (information that captures which unstructured datasets are already processed by the pipeline and available for analysis) open to your organization, which will enable metadata discovery across the organization.

To control access of raw unstructured data, you can integrate S3 Access Points and explore additional support in the future as AWS services evolve. S3 Access Points simplify data access for any AWS service or customer application that stores data in Amazon S3. Access points are named network endpoints that are attached to buckets that you can use to perform S3 object operations. Each access point has distinct permissions and network controls that Amazon S3 applies for any request that is made through that access point. Each access point enforces a customized access point policy that works in conjunction with the bucket policy that is attached to the underlying bucket. With S3 Access Points, you can create unique access control policies for each access point to easily control access to specific datasets within an S3 bucket. This works well in multi-tenant or shared bucket scenarios where users or teams are assigned to unique prefixes within one S3 bucket.

An access point can support a single user or application, or groups of users or applications within and across accounts, allowing separate management of each access point. Every access point is associated with a single bucket and contains a network origin control and a Block Public Access control. For example, you can create an access point with a network origin control that only permits storage access from your virtual private cloud (VPC), a logically isolated section of the AWS Cloud. You can also create an access point with the access point policy configured to only allow access to objects with a defined prefix or to objects with specific tags. You can also configure custom Block Public Access settings for each access point.

The following architecture provides an overview of how an end-user can get access to specific S3 objects by assuming a specific AWS Identity and Access Management (IAM) role. If you have a large number of S3 objects to control access, consider grouping the S3 objects, assigning them tags, and then defining access control by tags.

S3 Access Points - Unstructured Data Management - Access Control

If you are implementing a solution that integrates S3 data available in multiple AWS accounts, you can take advantage of cross-account support for S3 Access Points.

Conclusion

This post explained how you can use AWS AI services to extract readable data from unstructured datasets, build a metadata layer on top of them to allow data discovery, and build an access control mechanism on top of the raw S3 objects and extracted data using Lake Formation, Amazon DataZone, and S3 Access Points.

In addition to AWS AI services, you can also integrate large language models with vector databases to enable semantic or similarity search on top of unstructured datasets. To learn more about how to enable semantic search on unstructured data by integrating Amazon OpenSearch Service as a vector database, refer to Try semantic search with the Amazon OpenSearch Service vector engine.

As of writing this post, S3 Access Points is one of the best solutions to implement access control on raw S3 objects using tagging, but as AWS service features evolve in the future, you can explore alternative options as well.


About the Authors

Sakti Mishra is a Principal Solutions Architect at AWS, where he helps customers modernize their data architecture and define their end-to-end data strategy, including data security, accessibility, governance, and more. He is also the author of the book Simplify Big Data Analytics with Amazon EMR. Outside of work, Sakti enjoys learning new technologies, watching movies, and visiting places with family.

Bhavana Chirumamilla is a Senior Resident Architect at AWS with a strong passion for data and machine learning operations. She brings a wealth of experience and enthusiasm to help enterprises build effective data and ML strategies. In her spare time, Bhavana enjoys spending time with her family and engaging in various activities such as traveling, hiking, gardening, and watching documentaries.

Sheela Sonone is a Senior Resident Architect at AWS. She helps AWS customers make informed choices and trade-offs about accelerating their data, analytics, and AI/ML workloads and implementations. In her spare time, she enjoys spending time with her family—usually on tennis courts.

Daniel Bruno is a Principal Resident Architect at AWS. He had been building analytics and machine learning solutions for over 20 years and splits his time helping customers build data science programs and designing impactful ML products.

Migrate data from Azure Blob Storage to Amazon S3 using AWS Glue

Post Syndicated from Qiushuang Feng original https://aws.amazon.com/blogs/big-data/migrate-data-from-azure-blob-storage-to-amazon-s3-using-aws-glue/

Today, we are pleased to announce new AWS Glue connectors for Azure Blob Storage and Azure Data Lake Storage that allow you to move data bi-directionally between Azure Blob Storage, Azure Data Lake Storage, and Amazon Simple Storage Service (Amazon S3).

We’ve seen a demand to design applications that enable data to be portable across cloud environments and give you the ability to derive insights from one or more data sources. One of the data sources you can now quickly integrate with is Azure Blob Storage, a managed service for storing both unstructured data and structured data, and Azure Data Lake Storage, a data lake for analytics workloads. With these connectors, you can bring the data from Azure Blob Storage and Azure Data Lake Storage separately to Amazon S3.

In this post, we use Azure Blob Storage as an example and demonstrate how the new connector works, introduce the connector’s functions, and provide you with key steps to set it up. We provide you with prerequisites, share how to subscribe to this connector in AWS Marketplace, and describe how to create and run AWS Glue for Apache Spark jobs with it. Regarding the Azure Data Lake Storage Gen2 Connector, we highlight any major differences in this post.

AWS Glue is a serverless data integration service that makes it simple to discover, prepare, and combine data for analytics, machine learning, and application development. AWS Glue natively integrates with various data stores such as MySQL, PostgreSQL, MongoDB, and Apache Kafka, along with AWS data stores such as Amazon S3, Amazon Redshift, Amazon Relational Database Service (Amazon RDS), and Amazon DynamoDB. AWS Glue Marketplace connectors allow you to discover and integrate additional data sources, such as software as a service (SaaS) applications and your custom data sources. With just a few clicks, you can search for and select connectors from AWS Marketplace and begin your data preparation workflow in minutes.

How the connectors work

In this section, we discuss how the new connectors work.

Azure Blob Storage connector

This connector relies on the Spark DataSource API and calls Hadoop’s FileSystem interface. The latter has implemented libraries for reading and writing various distributed or traditional storage. This connector also includes the hadoop-azure module, which lets you run Apache Hadoop or Apache Spark jobs directly with data in Azure Blob Storage. AWS Glue loads the library from the Amazon Elastic Container Registry (Amazon ECR) repository during initialization (as a connector), reads the connection credentials using AWS Secrets Manager, and reads data source configurations from input parameters. When AWS Glue has internet access, the Spark job in AWS Glue can read from and write to Azure Blob Storage.

We support the following two methods for authentication: the authentication key for Shared Key and shared access signature (SAS) tokens:

#Method 1:Shared Key, 
spark.conf.set("fs.azure.account.key.youraccountname.blob.core.windows.net", "*Your account key")

df = spark.read.format("csv").option("header","true").load("wasbs://[email protected]/loadingtest-input/100mb")

df.write.format("csv").option("compression","snappy").mode("overwrite").save("wasbs://<container_name>@<account_name>.blob.core.windows.net/output-CSV/20210831/")

#Method 2:Shared Access Signature (SAS) Tokens
spark.conf.set("fs.azure.sas.yourblob.youraccountname.blob.core.windows.net", "Your SAS token*")

df = spark.read.format("csv").option("header","true").load("wasbs://[email protected]/loadingtest-input/100mb")

df.write.format("csv").option("compression","snappy").mode("overwrite").save("wasbs://<container_name>@<account_name>.blob.core.windows.net/output-CSV/20210831/")

Azure Data Lake Storage Gen2 connector

The usage of Azure Data Lake Storage Gen2 is much the same as the Azure Blob Storage connector. The Azure Data Lake Storage Gen2 connector uses the same library as the Azure Blob Storage connector, and relies on the Spark DataSource API, Hadoop’s FileSystem interface, and the Azure Blob Storage connector for Hadoop.

As of this writing, we only support the Shared Key authentication method:

#Method: Shared Key 
spark.conf.set("fs.azure.account.key.youraccountname.dfs.core.windows.net", "*Your account key")

# Read file from ADLS example
df= spark.read.format("csv").option("header","true").load("abfss://<container_name>@<account_name>.dfs.core.windows.net/input-csv/covid/") 

# Write file to ADLS example
df.write.format("parquet").option("compression","snappy").partitionBy("state").mode("overwrite").save("abfss://<container_name>@<account_name>.dfs.core.windows.net/output-glue3/csv-partitioned/") 

Solution overview

The following architecture diagram shows how AWS Glue connects to Azure Blob Storage for data ingestion.

In the following sections, we show you how to create a new secret for Azure Blob Storage in Secrets Manager, subscribe to the AWS Glue connector, and move data from Azure Blob Storage to Amazon S3.

Prerequisites

You need the following prerequisites:

  • A storage account in Microsoft Azure and your data path in Azure Blob Storage. Prepare the storage account credentials in advance. For instructions, refer to Create a storage account shared key.
  • A Secrets Manager secret to store a Shared Key secret, using one of the supporting authentication methods.
  • An AWS Identity and Access Management (IAM) role for the AWS Glue job with the following policies:
    • AWSGlueServiceRole, which allows the AWS Glue service role access to related services.
    • AmazonEC2ContainerRegistryReadOnly, which provides read-only access to Amazon EC2 Container Registry repositories. This policy is for using AWS Marketplace’s connector libraries.
    • A Secrets Manager policy, which provides read access to the secret in Secrets Manager.
    • An S3 bucket policy for the S3 bucket that you need to load ETL (extract, transform, and load) data from Azure Blob Storage.

Create a new secret for Azure Blob Storage in Secrets Manager

Complete the following steps to create a secret in Secrets Manager to store the Azure Blob Storage connection strings using the Shared Key authentication method:

  1. On the Secrets Manager console, choose Secrets in the navigation pane.
  2. Choose Store a new secret.
  3. For Secret type, select Other type of secret.
  4. Replace the values for accountName, accountKey, and container with your own values.
  5. Leave the rest of the options at their default.
  6. Choose Next.
  7. Provide a name for the secret, such as azureblobstorage_credentials.
  8. Follow the rest of the steps to store the secret.

Subscribe to the AWS Glue connector for Azure Blob Storage

To subscribe to the connector, complete the following steps:

  1. Navigate to the Azure Blob Storage Connector for AWS Glue on AWS Marketplace.
  2. On the product page for the connector, use the tabs to view information about the connector, then choose Continue to Subscribe.
  3. Review the pricing terms and the seller’s End User License Agreement, then choose Accept Terms.
  4. Continue to the next step by choosing Continue to Configuration.
  5. On the Configure this software page, choose the fulfillment options and the version of the connector to use.

We have provided two options for the Azure Blob Storage Connector: AWS Glue 3.0 and AWS Glue 4.0. In this example, we focus on AWS Glue 4.0. Choose Continue to Launch.

  1. On the Launch this software page, choose Usage instructions to review the usage instructions provided by AWS.
  2. When you’re ready to continue, choose Activate the Glue connector from AWS Glue Studio.

The console will display the Create marketplace connection page in AWS Glue Studio.

Move data from Azure Blob Storage to Amazon S3

To move your data to Amazon S3, you must configure the custom connection and then set up an AWS Glue job.

Create a custom connection in AWS Glue

An AWS Glue connection stores connection information for a particular data store, including login credentials, URI strings, virtual private cloud (VPC) information, and more. Complete the following steps to create your connection:

  1. On the AWS Glue console, choose Connectors in the navigation pane.
  2. Choose Create connection.
  3. For Connector, choose Azure Blob Storage Connector for AWS Glue.
  4. For Name, enter a name for the connection (for example, AzureBlobStorageConnection).
  5. Enter an optional description.
  6. For AWS secret, enter the secret you created (azureblobstorage_credentials).
  7. Choose Create connection and activate connector.

The connector and connection information is now visible on the Connectors page.

Create an AWS Glue job and configure connection options

Complete the following steps:

  1. On the AWS Glue console, choose Connectors in the navigation pane.
  2. Choose the connection you created (AzureBlobStorageConnection).
  3. Choose Create job.
  4. For Name, enter Azure Blob Storage Connector for AWS Glue. This name should be unique among all the nodes for this job.
  5. For Connection, choose the connection you created (AzureBlobStorageConnection).
  6. For Key, enter path, and for Value, enter your Azure Blob Storage URI. For example, when we created our new secret, we already set a container value for the Azure Blob Storage. Here, we enter the file path /input_data/.
  7. Enter another key-value pair. For Key, enter fileFormat. For Value, enter csv, because our sample data is in this format.
  8. Optionally, if the CSV file contains a header line, enter another key-value pair. For Key, enter header. For Value, enter true.
  9. To preview your data, choose the Data preview tab, then choose Start data preview session and choose the IAM role defined in the prerequisites.
  10. Choose Confirm and wait for the results to display.
  11. Select S3 as Target Location.
  12. Choose Browse S3 to see the S3 buckets that you have access to and choose one as the target destination for the data output.
  13. For the other options, use the default values.
  14. On the Job details tab, for IAM Role, choose the IAM role defined in the prerequisites.
  15. For Glue version, choose your AWS Glue version.
  16. Continue to create your ETL job. For instructions, refer to Creating ETL jobs with AWS Glue Studio.
  17. Choose Run to run your job.

When the job is complete, you can navigate to the Run details page on the AWS Glue console and check the logs in Amazon CloudWatch.

The data is ingested into Amazon S3, as shown in the following screenshot. We are now able to import data from Azure Blob Storage to Amazon S3.

Scaling considerations

In this example, we use the default AWS Glue capacity, 10 DPU (Data Processing Units). A DPU is a standardized unit of processing capacity that consists of 4 vCPUs of compute capacity and 16 GB of memory. To scale your AWS Glue job, you can increase the number of DPU, and also take advantage of Auto Scaling. With Auto Scaling enabled, AWS Glue automatically adds and removes workers from the cluster depending on the workload. After you choose the maximum number of workers, AWS Glue will adapt the right size of resources for the workload.

Clean up

To clean up your resources, complete the following steps:

  1. Remove the AWS Glue job and secret in Secrets Manager with the following command:
    aws glue delete-job —job-name <your_job_name>
    
    aws glue delete-connection —connection-name <your_connection_name>
    
    aws secretsmanager delete-secret —secret-id <your_secretsmanager_id>

  2. If you are no longer going to use this connector, you can cancel the subscription to the Azure Blob Storage connector:
    1. On the AWS Marketplace console, go to the Manage subscriptions page.
    2. Select the subscription for the product that you want to cancel.
    3. On the Actions menu, choose Cancel subscription.
    4. Read the information provided and select the acknowledgement check box.
    5. Choose Yes, cancel subscription.
  3. Delete the data in the S3 bucket that you used in the previous steps.

Conclusion

In this post, we showed how to use AWS Glue and the new connector for ingesting data from Azure Blob Storage to Amazon S3. This connector provides access to Azure Blob Storage, facilitating cloud ETL processes for operational reporting, backup and disaster recovery, data governance, and more.

We welcome any feedback or questions in the comments section.

Appendix

When you need SAS token authentication for Azure Data Lake Storage Gen 2, you can use Azure SAS Token Provider for Hadoop. To do that, upload the JAR file to your S3 bucket and configure your AWS Glue job to set the S3 location in the job parameter --extra-jars (in AWS Glue Studio, Dependent JARs path). Then save the SAS token in Secrets Manager and set the value to spark.hadoop.fs.azure.sas.fixed.token.<azure storage account>.dfs.core.windows.net in SparkConf using script mode at runtime. Learn more in README.

from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.hadoop.fs.azure.sas.fixed.token.<azure storage account>.dfs.core.windows.ne", sas_token) \
.getOrCreate()

References


About the authors

Qiushuang Feng is a Solutions Architect at AWS, responsible for Enterprise customers’ technical architecture design, consulting, and design optimization on AWS Cloud services. Before joining AWS, Qiushuang worked in IT companies such as IBM and Oracle, and accumulated rich practical experience in development and analytics.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is passionate about architecting fast-growing data environments, diving deep into distributed big data software like Apache Spark, building reusable software artifacts for data lakes, and sharing knowledge in AWS Big Data blog posts.

Shengjie Luo is a Big Data Architect on the Amazon Cloud Technology professional service team. They are responsible for solutions consulting, architecture, and delivery of AWS-based data warehouses and data lakes. They are skilled in serverless computing, data migration, cloud data integration, data warehouse planning, and data service architecture design and implementation.

Greg Huang is a Senior Solutions Architect at AWS with expertise in technical architecture design and consulting for the China G1000 team. He is dedicated to deploying and utilizing enterprise-level applications on AWS Cloud services. He possesses nearly 20 years of rich experience in large-scale enterprise application development and implementation, having worked in the cloud computing field for many years. He has extensive experience in helping various types of enterprises migrate to the cloud. Prior to joining AWS, he worked for well-known IT enterprises such as Baidu and Oracle.

Maciej Torbus is a Principal Customer Solutions Manager within Strategic Accounts at Amazon Web Services. With extensive experience in large-scale migrations, he focuses on helping customers move their applications and systems to highly reliable and scalable architectures in AWS. Outside of work, he enjoys sailing, traveling, and restoring vintage mechanical watches.

Load data incrementally from transactional data lakes to data warehouses

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/load-data-incrementally-from-transactional-data-lakes-to-data-warehouses/

Data lakes and data warehouses are two of the most important data storage and management technologies in a modern data architecture. Data lakes store all of an organization’s data, regardless of its format or structure. An open table format such as Apache Hudi, Delta Lake, or Apache Iceberg is widely used to build data lakes on Amazon Simple Storage Service (Amazon S3) in a transactionally consistent manner for use cases including record-level upserts and deletes, change data capture (CDC), time travel queries, and more. Data warehouses, on the other hand, store data that has been cleaned, organized, and structured for analysis. Depending on your use case, it’s common to have a copy of the data between your data lake and data warehouse to support different access patterns.

When the data becomes very large and unwieldy, it can be difficult to keep the copy of the data between data lakes and data warehouses in sync and up to date in an efficient manner.

In this post, we discuss different architecture patterns to keep data in sync and up to date between data lakes built on open table formats and data warehouses such as Amazon Redshift. We also discuss the benefits of incremental loading and the techniques for implementing the architecture using AWS Glue, which is a serverless, scalable data integration service that helps you discover, prepare, move, and integrate data from multiple sources. Various data stores are supported in AWS Glue; for example, AWS Glue 4.0 supports an enhanced Amazon Redshift connector to read from and write to Amazon Redshift, and also supports a built-in Snowflake connector to read from and write to Snowflake. Moreover, Apache Hudi, Delta Lake, and Apache Iceberg are natively supported in AWS Glue.

Architecture patterns

Generally, there are three major architecture patterns to keep your copy of data between data lakes and data warehouses in sync and up to date:

  • Dual writes
  • Incremental queries
  • Change data capture

Let’s discuss each of the architecture patterns and the techniques to achieve them.

Dual writes

When initially ingesting data from its raw source into the data lake and data warehouse, a single batch process is configured to write to both. We call this pattern dual writes. Although this architecture pattern (see the following diagram) is straightforward and easy to implement, it can become error-prone because there are two separate transactions threads, and each can have its own errors, causing inconsistencies between the data lake and data warehouse when a write fails in one but not both.

Incremental queries

An incremental query architectural pattern is designed to ingest data first into the data lake with an open table format, and then load the newly written data from the data lake into the data warehouse. Open table formats such as Apache Hudi and Apache Iceberg support incremental queries based on their respective transaction logs. You can capture records inserted or updated with the incremental queries, and then merge the captured records into the destination data warehouses.

Apache Hudi supports incremental query, which allows you to retrieve all records written during specific time range.

Delta Lake doesn’t have a specific concept for incremental queries. It’s covered in a change data feed, which is explained in the next section.

Apache Iceberg supports incremental read, which allows you to read appended data incrementally. As of this writing, Iceberg gets incremental data only from the append operation; other operations such as replace, overwrite, and delete aren’t supported by incremental read.

For merging the records into Amazon Redshift, you can use the MERGE SQL command, which was released in April 2023. AWS Glue supports the Redshift MERGE SQL command within its data integration jobs. To learn more, refer to Exploring new ETL and ELT capabilities for Amazon Redshift from the AWS Glue Studio visual editor.

Incremental queries are useful to capture changed records; however, incremental queries can’t handle the deletes and just send the latest version of each record. If you need to handle delete operations in the source data lake, you will need to use a CDC-based approach.

The following diagram illustrates an incremental query architectural pattern.

Change data capture

Change data capture (CDC) is a well-known technique to capture all mutating operations in a source database system and relay those operations to another system. CDC keeps all the intermediate changes, including the deletes. With this architecture pattern, you capture not only inserts and updates, but also deletes committed to the data lake, and then merge those captured changes into the data warehouses.

Apache Hudi 0.13.0 or later supports change data capture as an experimental feature, which is only available for Copy-on-Write (CoW) tables. Merge-on-Read tables (MoR) do not support CDC as of this writing.

Delta Lake 2.0.0 or later supports a change data feed, which allows Delta tables to track record-level changes between table versions.

Apache Iceberg 1.2.1 or later supports change data capture through its create_changelog_view procedure. When you run this procedure, a new view that contains the changes from a given table is created.

The following diagram illustrates a CDC architecture.

Example scenario

To demonstrate the end-to-end experience, this post uses the Global Historical Climatology Network Daily (GHCN-D) dataset. The data is publicly accessible through an S3 bucket. For more information, see the Registry of Open Data on AWS. You can also learn more in Visualize over 200 years of global climate data using Amazon Athena and Amazon QuickSight.

The Amazon S3 location s3://noaa-ghcn-pds/csv/by_year/ has all of the observations from 1763 to the present organized in CSV files, one file for each year. The following block shows an example of what the records look like:

ID,DATE,ELEMENT,DATA_VALUE,M_FLAG,Q_FLAG,S_FLAG,OBS_TIME
AE000041196,20220101,TAVG,204,H,,S,
AEM00041194,20220101,TAVG,211,H,,S,
AEM00041217,20220101,TAVG,209,H,,S,
AEM00041218,20220101,TAVG,207,H,,S,
AE000041196,20220102,TAVG,226,H,,S,
...
AE000041196,20221231,TMAX,243,,,S,
AE000041196,20221231,PRCP,0,D,,S,
AE000041196,20221231,TAVG,202,H,,S,

The records have fields including ID, DATE, ELEMENT, and more. Each combination of ID, DATE, and ELEMENT represents a unique record in this dataset. For example, the record with ID as AE000041196, ELEMENT as TAVG, and DATE as 20220101 is unique. We use this dataset in the following examples and simulate record-level updates and deletes as sample operations.

Prerequisites

To continue with the examples in this post, you need to create (or already have) the following AWS resources:

For the first tutorial (loading from Apache Hudi to Amazon Redshift), you also need the following:

For the second tutorial (loading from Delta Lake to Snowflake), you need the following:

  • A Snowflake account.
  • An AWS Glue connection named snowflake for Snowflake access. For more information, refer to Configuring Snowflake connections.
  • An AWS Secrets Manager secret named snowflake_credentials with the following key pairs:
    • Key sfUser with value <Your Snowflake username>
    • Key sfPassword with value <Your Snowflake password>

These tutorials are inter-changeable, so you can easily apply the same pattern for any combination of source and destination, for example, Hudi to Snowflake, or Delta to Amazon Redshift.

Load data incrementally from Apache Hudi table to Amazon Redshift using a Hudi incremental query

This tutorial uses Hudi incremental queries to load data from a Hudi table and then merge the changes to Amazon Redshift.

Ingest initial data to a Hudi table

Complete the following steps:

  1. Open AWS Glue Studio.
  2. Choose ETL jobs.
  3. Choose Visual with a source and target.
  4. For Source and Target, choose Amazon S3, then choose Create.

A new visual job configuration appears. The next step is to configure the data source to read an example dataset.

  1. Name this new job hudi-data-ingestion.
  2. Under Visual, choose Data source – S3 bucket.
  3. Under Node properties, for S3 source type, select S3 location.
  4. For S3 URL, enter s3://noaa-ghcn-pds/csv/by_year/2022.csv.

The data source is configured. The next step is to configure the data target to ingest data in Apache Hudi on your S3 bucket.

  1. Choose Data target – S3 bucket.
  2. Under Data target properties – S3, for Format, choose Apache Hudi.
  3. For Hudi Table Name, enter ghcn_hudi.
  4. For Hudi Storage Type, choose Copy on write.
  5. For Hudi Write Operation, choose Upsert.
  6. For Hudi Record Key Fields, choose ID.
  7. For Hudi Precombine Key Field, choose DATE.
  8. For Compression Type, choose GZIP.
  9. For S3 Target location, enter s3://<Your S3 bucket name>/<Your S3 bucket prefix>/hudi_incremental/ghcn/. (Provide your S3 bucket name and prefix.)
  10. For Data Catalog update options, select Do not update the Data Catalog.

Now your data integration job is authored in the visual editor completely. Let’s add one remaining setting about the IAM role, then run the job.

  1. Under Job details, for IAM Role, choose your IAM role.
  2. Choose Save, then choose Run.

You can track the progress on the Runs tab. It finishes in several minutes.

Load data from the Hudi table to a Redshift table

In this step, we assume that the files are updated with new records every day, and want to store only the latest record per the primary key (ID and ELEMENT) to make the latest snapshot data queryable. One typical approach is to do an INSERT for all the historical data, and calculate the latest records in queries; however, this can introduce additional overhead in all the queries. When you want to analyze only the latest records, it’s better to do an UPSERT (update and insert) based on the primary key and DATE field rather than just an INSERT in order to avoid duplicates and maintain a single updated row of data.

Complete the following steps to load data from the Hudi table to a Redshift table:

  1. Download the file hudi2redshift-incremental-load.ipynb.
  2. In AWS Glue Studio, choose Jupyter Notebook, then choose Create.
  3. For Job name, enter hudi-ghcn-incremental-load-notebook.
  4. For IAM Role, choose your IAM role.
  5. Choose Start notebook.

Wait for the notebook to be ready.

  1. Run the first cell to set up an AWS Glue interactive session.
  2. Replace the parameters with yours and run the cell under Configure your resource.
  3. Run the cell under Initialize SparkSession and GlueContext.
  4. Run the cell under Determine target time range for incremental query.
  5. Run the cells under Run query to load data updated during a given timeframe.
  6. Run the cells under Merge changes into destination table.

You can see the exact query immediately run right after ingesting a temp table into the Redshift table.

  1. Run the cell under Update the last query end time.

Validate initial records in the Redshift table

Complete the following steps to validate the initial records in the Redshift table:

  1. On the Amazon Redshift console, open Query Editor v2.
  2. Run the following query:
    SELECT * FROM "dev"."public"."ghcn" WHERE ID = 'AE000041196'

The query returns the following result set.

The original source file 2022.csv has historical records for record ID='AE000041196' from 20220101 to 20221231; however, the query result shows only four records, one record per ELEMENT at the latest snapshot of the day 20221230 or 20221231. Because we used the UPSERT write option when writing data, we configured the ID field as a Hudi record key field, the DATE field as a Hudi precombine field, and the ELEMENT field as partition key field. When two records have the same key value, Hudi picks the one with the largest value for the precombine field. When the job ingested data, it compared all the values in the DATE field for each pair of ID and ELEMENT, and then picked the record with the largest value in the DATE field. We use the current state of this table as an initial state.

Ingest updates to a Hudi table

Complete the following steps to simulating ingesting more records to the Hudi table:

  1. On AWS Glue Studio, choose the job hudi-data-ingestion.
  2. On the Data target – S3 bucket node, change the S3 location from s3://noaa-ghcn-pds/csv/by_year/2022.csv to s3://noaa-ghcn-pds/csv/by_year/2023.csv.
  3. Run the job.

Because this job uses the DATE field as a Hudi precombine field, the records included in the new source file have been upserted into the Hudi table.

Load data incrementally from the Hudi table to the Redshift table

Complete the following steps to load the ingested records incrementally to the Redshift table:

  1. On AWS Glue Studio, choose the job hudi-ghcn-incremental-load-notebook.
  2. Run all the cells again.

In the cells under Run query, you will notice that the records shown this time have DATE in 2023. Only newly ingested records are shown here.

In the cells under Merge changes into destination table, the newly ingested records are merged into the Redshift table. The generated MERGE query statement in the notebook is as follows:

MERGE INTO public.ghcn USING public.ghcn_tmp ON 
    public.ghcn.ID = public.ghcn_tmp.ID AND 
    public.ghcn.ELEMENT = public.ghcn_tmp.ELEMENT
WHEN MATCHED THEN UPDATE SET 
    _hoodie_commit_time = public.ghcn_tmp._hoodie_commit_time,
    _hoodie_commit_seqno = public.ghcn_tmp._hoodie_commit_seqno,
    _hoodie_record_key = public.ghcn_tmp._hoodie_record_key,
    _hoodie_partition_path = public.ghcn_tmp._hoodie_partition_path,
    _hoodie_file_name = public.ghcn_tmp._hoodie_file_name, 
    ID = public.ghcn_tmp.ID, 
    DATE = public.ghcn_tmp.DATE, 
    ELEMENT = public.ghcn_tmp.ELEMENT, 
    DATA_VALUE = public.ghcn_tmp.DATA_VALUE, 
    M_FLAG = public.ghcn_tmp.M_FLAG, 
    Q_FLAG = public.ghcn_tmp.Q_FLAG, 
    S_FLAG = public.ghcn_tmp.S_FLAG, 
    OBS_TIME = public.ghcn_tmp.OBS_TIME 
WHEN NOT MATCHED THEN INSERT VALUES (
    public.ghcn_tmp._hoodie_commit_time, 
    public.ghcn_tmp._hoodie_commit_seqno, 
    public.ghcn_tmp._hoodie_record_key, 
    public.ghcn_tmp._hoodie_partition_path, 
    public.ghcn_tmp._hoodie_file_name, 
    public.ghcn_tmp.ID, 
    public.ghcn_tmp.DATE, 
    public.ghcn_tmp.ELEMENT, 
    public.ghcn_tmp.DATA_VALUE, 
    public.ghcn_tmp.M_FLAG, 
    public.ghcn_tmp.Q_FLAG, 
    public.ghcn_tmp.S_FLAG, 
    public.ghcn_tmp.OBS_TIME
);

The next step is to verify the result on the Redshift side.

Validate updated records in the Redshift table

Complete the following steps to validate the updated records in the Redshift table:

  1. On the Amazon Redshift console, open Query Editor v2.
  2. Run the following query:
    SELECT * FROM "dev"."public"."ghcn" WHERE ID = 'AE000041196'

The query returns the following result set.

Now you see that the four records have been updated with the new records in 2023. If you have further future records, this approach works well to upsert new records based on the primary keys.

Load data incrementally from a Delta Lake table to Snowflake using a Delta change data feed

This tutorial uses a Delta change data feed to load data from a Delta table, and then merge the changes to Snowflake.

Ingest initial data to a Delta table

Complete the following steps:

  1. Open AWS Glue Studio.
  2. Choose ETL jobs.
  3. Choose Visual with a source and target.
  4. For Source and Target, choose Amazon S3, then choose Create.

A new visual job configuration appears. The next step is to configure the data source to read an example dataset.

  1. Name this new job delta-data-ingestion.
  2. Under Visual, choose Data source – S3 bucket.
  3. Under Node properties, for S3 source type, select S3 location.
  4. For S3 URL, enter s3://noaa-ghcn-pds/csv/by_year/2022.csv.

The data source is configured. The next step is to configure the data target to ingest data in Apache Hudi on your S3 bucket.

  1. Choose Data target – S3 bucket.
  2. Under Data target properties – S3, for Format, choose Delta Lake.
  3. For Compression Type, choose Snappy.
  4. For S3 Target location, enter s3://<Your S3 bucket name>/<Your S3 bucket prefix>/delta_incremental/ghcn/. (Provide your S3 bucket name and prefix.)
  5. For Data Catalog update options, select Do not update the Data Catalog.

Now your data integration job is authored in the visual editor completely. Let’s add an additional detail about the IAM role and job parameters, and then run the job.

  1. Under Job details, for IAM Role, choose your IAM role.
  2. Under Job parameters, for Key, enter --conf and for Value, enter spark.databricks.delta.properties.defaults.enableChangeDataFeed=true.
  3. Choose Save, then choose Run.

Load data from the Delta table to a Snowflake table

Complete the following steps to load data from the Delta table to a Snowflake table:

  1. Download the file delta2snowflake-incremental-load.ipynb.
  2. On AWS Glue Studio, choose Jupyter Notebook, then choose Create.
  3. For Job name, enter delta-ghcn-incremental-load-notebook.
  4. For IAM Role, choose your IAM role.
  5. Choose Start notebook.

Wait for the notebook to be ready.

  1. Run the first cell to start an AWS Glue interactive session.
  2. Replace the parameters with yours and run the cell under Configure your resource.
  3. Run the cell under Initialize SparkSession and GlueContext.
  4. Run the cell under Determine target time range for CDC.
  5. Run the cells under Run query to load data updated during a given timeframe.
  6. Run the cells under Merge changes into destination table.

You can see the exact query immediately run right after ingesting a temp table in the Snowflake table.

  1. Run the cell under Update the last query end time.

Validate initial records in the Snowflake warehouse

Run the following query in Snowflake:

SELECT * FROM "dev"."public"."ghcn" WHERE ID = 'AE000041196'

The query should return the following result set:

There are three records returned in this query.

Update and delete a record on the Delta table

Complete the following steps to update and delete a record on the Delta table as sample operations:

  1. Return to the AWS Glue notebook job.
  2. Run the cells under Update the record and Delete the record.

Load data incrementally from the Delta table to the Snowflake table

Complete the following steps to load the ingested records incrementally to the Redshift table:

  1. On AWS Glue Studio, choose the job delta-ghcn-incremental-load-notebook.
  2. Run all the cells again.

When you run the cells under Run query, you will notice that there are only three records, which correspond to the update and delete operation performed in the previous step.

In the cells under Merge changes into destination table, the changes are merged into the Snowflake table. The generated MERGE query statement in the notebook is as follows:

MERGE INTO public.ghcn USING public.ghcn_tmp ON 
    public.ghcn.ID = public.ghcn_tmp.ID AND 
    public.ghcn.DATE = public.ghcn_tmp.DATE AND 
    public.ghcn.ELEMENT = public.ghcn_tmp.ELEMENT 
WHEN MATCHED AND public.ghcn_tmp._change_type = 'update_postimage' THEN UPDATE SET 
    ID = public.ghcn_tmp.ID, 
    DATE = public.ghcn_tmp.DATE, 
    ELEMENT = public.ghcn_tmp.ELEMENT, 
    DATA_VALUE = public.ghcn_tmp.DATA_VALUE, 
    M_FLAG = public.ghcn_tmp.M_FLAG, 
    Q_FLAG = public.ghcn_tmp.Q_FLAG, 
    S_FLAG = public.ghcn_tmp.S_FLAG, 
    OBS_TIME = public.ghcn_tmp.OBS_TIME, 
    _change_type = public.ghcn_tmp._change_type, 
    _commit_version = public.ghcn_tmp._commit_version, 
    _commit_timestamp = public.ghcn_tmp._commit_timestamp 
WHEN MATCHED AND public.ghcn_tmp._change_type = 'delete' THEN DELETE 
WHEN NOT MATCHED THEN INSERT VALUES (
    public.ghcn_tmp.ID, 
    public.ghcn_tmp.DATE, 
    public.ghcn_tmp.ELEMENT, 
    public.ghcn_tmp.DATA_VALUE, 
    public.ghcn_tmp.M_FLAG, 
    public.ghcn_tmp.Q_FLAG, 
    public.ghcn_tmp.S_FLAG, 
    public.ghcn_tmp.OBS_TIME, 
    public.ghcn_tmp._change_type, 
    public.ghcn_tmp._commit_version, 
    public.ghcn_tmp._commit_timestamp
);

The next step is to verify the result on the Snowflake side.

Validate updated records in the Snowflake table

Complete the following steps to validate the updated and deleted records in the Snowflake table:

  1. On Snowflake, run the following query:
    SELECT * FROM ghcn WHERE ID = 'AE000041196' AND DATE = '20221231'

The query returns the following result set:

You will notice that the query only returns two records. The value of DATA_VALUE of the record ELEMENT=PRCP has been updated from 0 to 12345. The record ELEMENT=TMAX has been deleted. This means that your update and delete operations on the source Delta table have been successfully replicated to the target Snowflake table.

Clean up

Complete the following steps to clean up your resources:

  1. Delete the following AWS Glue jobs:
    • hudi-data-ingestion
    • hudi-ghcn-incremental-load-notebook
    • delta-data-ingestion
    • delta-ghcn-incremental-load-notebook
  2. Clean up your S3 bucket.
  3. If needed, delete the Redshift cluster or the Redshift Serverless workgroup.

Conclusion

This post discussed architecture patterns to keep a copy of your data between data lakes using open table formats and data warehouses in sync and up to date. We also discussed the benefits of incremental loading and the techniques for achieving the use case using AWS Glue. We covered two use cases: incremental load from a Hudi table to Amazon Redshift, and from a Delta table to Snowflake.


About the author

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

How healthcare organizations can analyze and create insights using price transparency data

Post Syndicated from Gokhul Srinivasan original https://aws.amazon.com/blogs/big-data/how-healthcare-organizations-can-analyze-and-create-insights-using-price-transparency-data/

In recent years, there has been a growing emphasis on price transparency in the healthcare industry. Under the Transparency in Coverage (TCR) rule, hospitals and payors to publish their pricing data in a machine-readable format. With this move, patients can compare prices between different hospitals and make informed healthcare decisions. For more information, refer to Delivering Consumer-friendly Healthcare Transparency in Coverage On AWS.

The data in the machine-readable files can provide valuable insights to understand the true cost of healthcare services and compare prices and quality across hospitals. The availability of machine-readable files opens up new possibilities for data analytics, allowing organizations to analyze large amounts of pricing data. Using machine learning (ML) and data visualization tools, these datasets can be transformed into actionable insights that can inform decision-making.

In this post, we explain how healthcare organizations can use AWS services to ingest, analyze, and generate insights from the price transparency data created by hospitals. We use sample data from three different hospitals, analyze the data, and create comparative trends and insights from the data.

Solution overview

As part of the Centers for Medicare and Medicaid Services (CMS) mandate, all hospitals now have their machine-readable file containing the pricing data. As hospitals generate this data, they can use their organization data or ingest data from other hospitals to derive analytics and competitive comparison. This comparison can help hospitals do the following:

  • Derive a price baseline for all medical services and perform gap analysis
  • Analyze pricing trends and identify services where competitors don’t participate
  • Evaluate and identify the services where cost difference is above a specific threshold

The size of the machine-readable files from hospitals is smaller than those generated by the payors. This is due to the complexity of the JSON structure, contracts, and the risk evaluation process on the payor side. Due to this low complexity, the solution uses AWS serverless services to ingest the data, transform it, and make it available for analytics. The analysis of the machine-readable files from payors requires advanced computational capabilities due to the complexity and the interrelationship in the JSON file.

Prerequisites

As a prerequisite, evaluate the hospitals for which the pricing analysis will be performed and identify the machine-readable files for analysis. Amazon Simple Storage Service (Amazon S3) is an object storage service offering industry-leading scalability, data availability, security, and performance. Create separate folders for each hospital inside the S3 bucket.

Architecture overview

The architecture uses AWS serverless technology for the implementation. The serverless architecture features auto scaling, high availability, and a pay-as-you-go billing model to increase agility and optimize costs. The architecture approach is split into a data intake layer, a data analysis layer, and a data visualization layer.

The architecture contains three independent stages:

  • File ingestion – Hospitals negotiate their contract and pricing with the payors one time a year with periodical revisions on a quarterly or monthly basis. The data ingestion process copies the machine-readable files from the hospitals, validates the data, and keeps the validated files available for analysis.
  • Data analysis – In this stage, the files are transformed using AWS Glue and stored in the AWS Glue Data Catalog. AWS Glue is a serverless data integration service that makes it easier to discover, prepare, move, and integrate data from multiple sources for analytics, ML, and application development. Then you can use Amazon Athena V3 to query the tables in the Data Catalog.
  • Data visualizationAmazon QuickSight is a cloud-powered business analytics service that makes it straightforward to build visualizations, perform ad hoc analysis, and quickly get business insights from the pricing data. This stage uses QuickSight to visually analyze the data in the machine-readable file using Athena queries.

File ingestion

The file ingestion process works as defined in the following figure. The architecture uses AWS Lambda, a serverless, event-driven compute service that lets you run code without provisioning or managing servers.

TCR Intake Architecture

The following flow defines the process to ingest and analyze the data:

  1. Copy the machine-readable files from the hospitals into the respective raw data S3 bucket.
  2. The file upload to the S3 bucket triggers an S3 event, which invokes a format Lambda function.
  3. The Lambda function triggers a notification when it identifies issues in the file.
  4. The Lambda function ingests the file, transforms the data, and stores the clean file in a new clean data S3 bucket.

Organizations can create new Lambda functions depending on the difference in the file formats.

Data analysis

The file intake and data analysis processes are independent of each other. Whereas the file intake happens on a scheduled or periodical basis, the data analysis happens regularly based on the business operation needs. The architecture for the data analysis is shown in the following figure.

TCR Data Analysis

This stage uses an AWS Glue crawler, the AWS Glue Data Catalog, and Athena v3 to analyze the data from the machine-readable files.

  1. An AWS Glue crawler scans the clean data in the S3 bucket and creates or updates the tables in the AWS Glue Data Catalog. The crawler can run on demand or on a schedule, and can crawl multiple machine-readable files in a single run.
  2. The Data Catalog now contains references to the machine-readable data. The Data Catalog contains the table definition, which contains metadata about the data in the machine-readable file. The tables are written to a database, which acts as a container.
  3. Use the Data Catalog and transform the hospital price transparency data.
  4. When the data is available in the Data Catalog, you can develop the analytics query using Athena. Athena is a serverless, interactive analytics service that provides a simplified, flexible way to analyze petabytes of data using SQL queries.
  5. Any failure during the process will be captured in the Amazon CloudWatch logs, which can be used for troubleshooting and analysis. The Data Catalog needs to be refreshed only when there is a change in the machine-readable file structure or a new machine-readable file is uploaded to the clean S3 bucket. When the crawler runs periodically, it automatically identifies the changes and updates the Data Catalog.

Data visualization

When the data analysis is complete and queries are developed using Athena, we can visually analyze the results and gain insights using QuickSight. As shown in the following figure, once the data ingestion and data analysis are complete, the queries are built using Athena.

TCR Visualization

In this stage, we use QuickSight to create datasets using the Athena queries, build visualizations, and deploy dashboards for visual analysis and insights.

Create a QuickSight dataset

Complete the following steps to create a QuickSight dataset:

  1. On the QuickSight console, choose Manage data.
  2. On the Datasets page, choose New data set.
  3. In the Create a Data Set page, choose the connection profile icon for the existing Athena data source that you want to use.
  4. Choose Create data set.
  5. On the Choose your table page, choose Use custom SQL and enter the Athena query.

After the dataset is created, you can add visualizations and analyze the data from the machine-readable file. With the QuickSight dashboard, organizations can easily perform price comparisons across different hospitals, identify high-cost services, and find other price outliers. In addition, you can use ML in QuickSight to gain ML-driven insights, detect pricing anomalies, and create forecasts based on historical files.

The following figure shows an illustrative QuickSight dashboard with insights comparing the machine-readable files from three different hospitals. With these visuals, you compare the pricing data across hospitals, create price benchmarks, determine cost-effective hospitals, and identify opportunities for competitive advantage.
Quicksight dashboard

Performance, operational, and cost considerations

The solution recommends QuickSight Enterprise for visualization and insights. For QuickSight dashboards, the Athena query results can be stored within the SPICE database for better performance.

The approach uses Athena V3, which offers performance improvements, reliability enhancements, and newer features. Using the Athena query result reuse feature enables caching and query result reuse. When multiple identical queries are run with the query result reuse option, repeat queries run up to five times faster, giving you increased productivity for interactive data analysis. Because you don’t scan the data, you get improved performance at a lower cost.

Cost

Hospitals create the machine-readable files on a monthly basis. This approach uses a serverless architecture that keeps the cost low and takes away the challenge of maintenance overhead. The analysis can begin with the machine-readable files for a few hospitals, and they can add new hospitals as they scale. The following example helps understand the cost for different hospital based on the data size:

  • A typical hospital with 100 GB storage/month, querying 20 GB data with 2 authors and 5 readers, costs around $2,500/year

AWS offers you a pay-as-you-go approach for pricing for the vast majority of our cloud services. With AWS you pay only for the individual services you need, for as long as you use them, and without requiring long-term contracts or complex licensing.

TCR Monthly cost

Conclusion

This post illustrated how to collect and analyze hospital-created price transparency data and generate insights using AWS services. This type of analysis and the visualizations provide the framework to analyze the machine-readable files. Hospitals, payors, brokers, underwriters, and other healthcare stakeholders can use this architecture to analyze and draw insights from pricing data published by hospitals of their choice. Our AWS teams can assist you to identify the correct strategy by offering thought leadership and prescriptive technical support for price transparency analysis.

Contact your AWS account team for more help on design and to explore private pricing. If you don’t have a contact with AWS yet, please reach out to be connected with an AWS representative.


About the Authors

Gokhul Srinivasan is a Senior Partner Solutions Architect leading AWS Healthcare and Life Sciences (HCLS) Global Startup Partners. Gokhul has over 19 years of Healthcare experience helping organizations with digital transformation, platform modernization, and deliver business outcomes.

Laks Sundararajan is a seasoned Enterprise Architect helping companies reset, transform and modernize their IT, digital, cloud, data and insight strategies. A proven leader with significant expertise around Generative AI, Digital, Cloud and Data/Analytics Transformation, Laks is a Sr. Solutions Architect with Healthcare and Life Sciences (HCLS).

Anil Chinnam Anil Chinnam is a Solutions Architect in the Digital Native Business Segment at Amazon Web Services(AWS). He enjoys working with customers to understand their challenges and solve them by creating innovative solutions using AWS services. Outside of work, Anil enjoys being a father, swimming and traveling.

Automated data governance with AWS Glue Data Quality, sensitive data detection, and AWS Lake Formation

Post Syndicated from Shoukat Ghouse original https://aws.amazon.com/blogs/big-data/automated-data-governance-with-aws-glue-data-quality-sensitive-data-detection-and-aws-lake-formation/

Data governance is the process of ensuring the integrity, availability, usability, and security of an organization’s data. Due to the volume, velocity, and variety of data being ingested in data lakes, it can get challenging to develop and maintain policies and procedures to ensure data governance at scale for your data lake. Data confidentiality and data quality are the two essential themes for data governance. Data confidentiality refers to the protection and control of sensitive and private information to prevent unauthorized access, especially when dealing with personally identifiable information (PII). Data quality focuses on maintaining accurate, reliable, and consistent data across the organization. Poor data quality can lead to erroneous decisions, inefficient operations, and compromised business performance.

Companies need to ensure data confidentiality is maintained throughout the data pipeline and that high-quality data is available to consumers in a timely manner. A lot of this effort is manual, where data owners and data stewards define and apply the policies statically up front for each dataset in the lake. This gets tedious and delays the data adoption across the enterprise.

In this post, we showcase how to use AWS Glue with AWS Glue Data Quality, sensitive data detection transforms, and AWS Lake Formation tag-based access control to automate data governance.

Solution overview

Let’s consider a fictional company, OkTank. OkTank has multiple ingestion pipelines that populate multiple tables in the data lake. OkTank wants to ensure the data lake is governed with data quality rules and access policies in place at all times.

Multiple personas consume data from the data lake, such as business leaders, data scientists, data analysts, and data engineers. For each set of users, a different level of governance is needed. For example, business leaders need top-quality and highly accurate data, data scientists cannot see PII data and need data within an acceptable quality range for their model training, and data engineers can see all data except PII.

Currently, these requirements are hard-coded and managed manually for each set of users. OkTank wants to scale this and is looking for ways to control governance in an automated way. Primarily, they are looking for the following features:

  • When new data and tables get added to the data lake, the governance policies (data quality checks and access controls) get automatically applied for them. Unless the data is certified to be consumed, it shouldn’t be accessible to the end-users. For example, they want to ensure basic data quality checks are applied on all new tables and provide access to the data based on the data quality score.
  • Due to changes in source data, the existing data profile of data lake tables may drift. It’s required to ensure the governance is met as defined. For example, the system should automatically mark columns as sensitive if sensitive data is detected in a column that was earlier marked as public and was available publicly for users. The system should hide the column from unauthorized users accordingly.

For the purpose of this post, the following governance policies are defined:

  • No PII data should exist in tables or columns tagged as public.
  • If  a column has any PII data, the column should be marked as sensitive. The table should then also be marked sensitive.
  • The following data quality rules should be applied on all tables:
    • All tables should have a minimum set of columns: data_key, data_load_date, and data_location.
    • data_key is a key column and should meet key requirements of being unique and complete.
    • data_location should match with locations defined in a separate reference (base) table.
    • The data_load_date column should be complete.
  • User access to tables is controlled as per the following table.
User Description Can Access Sensitive Tables Can Access Sensitive Columns Min Data Quality Threshold Needed to consume Data
Category 1 Yes Yes 100%
Category 2 Yes No 50%
Category 3 No No 0%

In this post, we use AWS Glue Data Quality and sensitive data detection features. We also use Lake Formation tag-based access control to manage access at scale.

The following diagram illustrates the solution architecture.

The governance requirements highlighted in the previous table are translated to the following Lake Formation LF-Tags.

IAM User LF-Tag: tbl_class LF-Tag: col_class LF-Tag: dq_tag
Category 1 sensitive, public sensitive, public DQ100
Category 2 sensitive, public public DQ100,DQ90,DQ50_80,DQ80_90
Category 3 public public DQ90, DQ100, DQ_LT_50, DQ50_80, DQ80_90

This post uses AWS Step Functions to orchestrate the governance jobs, but you can use any other orchestration tool of choice. To simulate data ingestion, we manually place the files in an Amazon Simple Storage Service (Amazon S3) bucket. In this post, we trigger the Step Functions state machine manually for ease of understanding. In practice, you can integrate or invoke the jobs as part of a data ingestion pipeline, via event triggers like AWS Glue crawler or Amazon S3 events, or schedule them as needed.

In this post, we use an AWS Glue database named oktank_autogov_temp and a target table named customer on which we apply the governance rules. We use AWS CloudFormation to provision the resources. AWS CloudFormation lets you model, provision, and manage AWS and third-party resources by treating infrastructure as code.

Prerequisites

Complete the following prerequisite steps:

  1. Identify an AWS Region in which you want to create the resources and ensure you use the same Region throughout the setup and verifications.
  2. Have a Lake Formation administrator role to run the CloudFormation template and grant permissions.

Sign in to the Lake Formation console and add yourself as a Lake Formation data lake administrator if you aren’t already an admin. If you are setting up Lake Formation for the first time in your Region, then you can do this in the following pop-up window that appears up when you connect to the Lake Formation console and select the desired Region.

Otherwise, you can add data lake administrators by choosing Administrative roles and tasks in the navigation pane on the Lake Formation console and choosing Add administrators. Then select Data lake administrator, identity your users and roles, and choose Confirm.

Deploy the CloudFormation stack

Run the provided CloudFormation stack to create the solution resources.

You need to provide a unique bucket name and specify passwords for the three users reflecting three different user personas (Category 1, Category 2, and Category 3) that we use for this post.

The stack provisions an S3 bucket to store the dummy data, AWS Glue scripts, results of sensitive data detection, and Amazon Athena query results in their respective folders.

The stack copies the AWS Glue scripts into the scripts folder and creates two AWS Glue jobs Data-Quality-PII-Checker_Job and LF-Tag-Handler_Job pointing to the corresponding scripts.

The AWS Glue job Data-Quality-PII-Checker_Job applies the data quality rules and publishes the results. It also checks for sensitive data in the columns. In this post, we check for the PERSON_NAME and EMAIL data types. If any columns with sensitive data are detected, it persists the sensitive data detection results to the S3 bucket.

AWS Glue Data Quality uses Data Quality Definition Language (DQDL) to author the data quality rules.

The data quality requirements as defined earlier in this post are written as the following DQDL in the script:

Rules = [
ReferentialIntegrity "data_location" "reference.data_location" = 1.0,
IsPrimaryKey "data_key",
ColumnExists "data_load_date",
IsComplete "data_load_date
]

The following screenshot shows a sample result from the job after it runs. You can see this after you trigger the Step Functions workflow in subsequent steps. To check the results, on the AWS Glue console, choose ETL jobs and choose the job called Data-Quality-PII-Checker_Job. Then navigate to the Data quality tab to view the results.

The AWS Glue jobLF-Tag-Handler_Job fetches the data quality metrics published by Data-Quality-PII-Checker_Job. It checks the status of the DataQuality_PIIColumns result. It gets the list of sensitive column names from the sensitive data detection file created in the Data-Quality-PII-Checker_Job and tags the columns as sensitive. The rest of the columns are tagged as public. It also tags the table assensitive if sensitive columns are detected. The table is marked as public if no sensitive columns are detected.

The job also checks the data quality score for the DataQuality_BasicChecks result set. It maps the data quality score into tags as shown in the following table and applies the corresponding tag on the table.

Data Quality Score Data Quality Tag
100% DQ100
90-100% DQ90
80-90% DQ80_90
50-80% DQ50_80
Less than 50% DQ_LT_50

The CloudFormation stack copies some mock data to the data folder and registers this location under AWS Lake Formation Data lake locations so Lake Formation can govern access on the location using service-linked role for Lake Formation.

The customer subfolder contains the initial customer dataset for the table customer. The base subfolder contains the base dataset, which we use to check referential integrity as part of the data quality checks. The column data_location in the customer table should match with locations defined in this base table.

The stack also copies some additional mock data to the bucket under the data-v1 folder. We use this data to simulate data quality issues.

It also creates the following resources:

  • An AWS Glue database called oktank_autogov_temp and two tables under the database:
    • customer – This is our target table on which we will be governing the access based on data quality rules and PII checks.
    • base – This is the base table that has the reference data. One of the data quality rules checks that the customer data always adheres to locations present in the base table.
  • AWS Identity and Access Management (IAM) users and roles:
    • DataLakeUser_Category1 – The data lake user corresponding to the Category 1 user. This user should be able to access sensitive data but needs 100% accurate data.
    • DataLakeUser_Category2 – The data lake user corresponding to the Category 2 user. This user should not be able to access sensitive columns in the table. It needs more than 50% accurate data.
    • DataLakeUser_Category3 – The data lake user corresponding to the Category 3 user. This user should not be able to access tables containing sensitive data. Data quality can be 0%.
    • GlueServiceDQRole – The role for the data quality and sensitive data detection job.
    • GlueServiceLFTaggerRole – The role for the LF-Tags handler job for applying the tags to the table.
    • StepFunctionRole – The Step Functions role for triggering the AWS Glue jobs.
  • Lake Formation LF-Tags keys and values:
    • tbl_classsensitive, public
    • dq_classDQ100, DQ90, DQ80_90, DQ50_80, DQ_LT_50
    • col_classsensitive, public
  • A Step Functions state machine named AutoGovMachine that you use to trigger the runs for the AWS Glue jobs to check data quality and update the LF-Tags.
  • Athena workgroups named auto_gov_blog_workgroup_temporary_user1, auto_gov_blog_workgroup_temporary_user2, and auto_gov_blog_workgroup_temporary_user3. These workgroups point to different Athena query result locations for each user. Each user is granted access to the corresponding query result location only. This ensures a specific user doesn’t access the query results of other users. You should switch to a specific workgroup to run queries in Athena as part of the test for the specific user.

The CloudFormation stack generates the following outputs. Take note of the values of the IAM users to use in subsequent steps.

Grant permissions

After you launch the CloudFormation stack, complete the following steps:

  1. On the Lake Formation console, under Permissions choose Data lake permissions in the navigation pane.
  2. Search for the database oktank_autogov_temp and table customer.
  3. If IAMAllowedPrincipals access if present, select it choose Revoke.

  1. Choose Revoke again to revoke the permissions.

Category 1 users can access all data except if the data quality score of the table is below 100%. Therefore, we grant the user the necessary permissions.

  1. Under Permissions in the navigation pane, choose Data lake permissions.
  2. Search for database oktank_autogov_temp and table customer.
  3. Choose Grant
  4. Select IAM users and roles and choose the value for UserCategory1 from your CloudFormation stack output.
  5. Under LF-Tags or catalog resources, choose Add LF-Tag key-value pair.
  6. Add the following key-value pairs:
    1. For the col_class key, add the values public and sensitive.
    2. For the tbl_class key, add the values public and sensitive.
    3. For the dq_tag key, add the value DQ100.

  1. For Table permissions, select Select.
  2. Choose Grant.

Category 2 users can’t access sensitive columns. They can access tables with a data quality score above 50%.

  1. Repeat the preceding steps to grant the appropriate permissions in Lake Formation to UserCategory2:
    1. For the col_class key, add the value public.
    2. For the tbl_class key, add the values public and sensitive.
    3. For the dq_tag key, add the values DQ50_80, DQ80_90, DQ90, and DQ100.

  1. For Table permissions, select Select.
  2. Choose Grant.

Category 3 users can’t access tables that contain any sensitive columns. Such tables are marked as sensitive by the system. They can access tables with any data quality score.

  1. Repeat the preceding steps to grant the appropriate permissions in Lake Formation to UserCategory3:
    1. For the col_class key, add the value public.
    2. For the tbl_class key, add the value public.
    3. For the dq_tag key, add the values DQ_LT_50, DQ50_80, DQ80_90, DQ90, and DQ100.

  1. For Table permissions, select Select.
  2. Choose Grant.

You can verify the LF-Tag permissions assigned in Lake Formation by navigating to the Data lake permissions page and searching for the Resource type LF-Tag expression.

Test the solution

Now we can test the workflow. We test three different use cases in this post. You will notice how the permissions to the tables change based on the values of LF-Tags applied to the customer table and the columns of the table. We use Athena to query the tables.

Use case 1

In this first use case, a new table was created on the lake and new data was ingested to the table. The data file cust_feedback_v0.csv was copied to the data/customer location in the S3 bucket. This simulates new data ingestion on a new table called customer.

Lake Formation doesn’t allow any users to access this table currently. To test this scenario, complete the following steps:

  1. Sign in to the Athena console with the UserCategory1 user.
  2. Switch the workgroup to auto_gov_blog_workgroup_temporary_user1 in the Athena query editor.
  3. Choose Acknowledge to accept the workgroup settings.

  1. Run the following query in the query editor:
select * from "oktank_autogov_temp"."customer" limit 10

  1. On the Step Functions console, run the AutoGovMachine state machine.
  2. In the Input – optional section, use the following JSON and replace the BucketName value with the bucket name you used for the CloudFormation stack earlier (for this post, we use auto-gov-blog):
{
  "Comment": "Auto Governance with AWS Glue and AWS LakeFormation",
  "BucketName": "<Replace with your bucket name>"
}

The state machine triggers the AWS Glue jobs to check data quality on the table and apply the corresponding LF-Tags.

  1. You can check the LF-Tags applied on the table and the columns. To do so, when the state machine is complete, sign in to Lake Formation with the admin role used earlier to grant permissions.
  2. Navigate to the table customer under the oktank_autogov_temp database and choose Edit LF-Tags to validate the tags applied on the table.

You can also validate that columns customer_email and customer_name are tagged as sensitive for the col_class LF-Tag.

  1. To check this, choose Edit Schema for the customer table.
  2. Select the two columns and choose Edit LF-Tags.

You can check the tags on these columns.

The rest of the columns are tagged as public.

  1. Sign in to the Athena console with UserCategory1 and run the same query again:
select * from "oktank_autogov_temp"."customer" limit 10

This time, the user is able to see the data. This is because the LF-Tag permissions we applied earlier are in effect.

  1. Sign in as UserCategory2 user to verify permissions.
  2. Switch to workgroup auto_gov_blog_workgroup_temporary_user2 in Athena.

This user can access the table but can only see public columns. Therefore, the user shouldn’t be able to see the customer_email and customer_phone columns because these columns contain sensitive data as identified by the system.

  1. Run the same query again:
select * from "oktank_autogov_temp"."customer" limit 10

  1. Sign in to Athena and verify the permissions for DataLakeUser_Category3.
  2. Switch to workgroup auto_gov_blog_workgroup_temporary_user3 in Athena.

This user can’t access the table because the table is marked as sensitive due to the presence of sensitive data columns in the table.

  1. Run the same query again:
select * from "oktank_autogov_temp"."customer" limit 10

Use case 2

Let’s ingest some new data on the table.

  1. Sign in to the Amazon S3 console with the admin role used earlier to grant permissions.
  2. Copy the file cust_feedback_v1.csv from the data-v1 folder in the S3 bucket to the data/customer folder in the S3 bucket using the default options.

This new data file has data quality issues because the column data_location breaks referential integrity with the base table. This data also introduces some sensitive data in column comment1. This column was earlier marked as public because it didn’t have any sensitive data.

The following screenshot shows what the customer folder should look like now.

  1. Run the AutoGovMachine state machine again and use the same JSON as the StartExecution input you used earlier:
{
  "Comment": "Auto Governance with AWS Glue and AWS LakeFormation",
  "BucketName": "<Replace with your bucket name>"
}

The job classifies column comment1 as sensitive on the customer table. It also updates the dq_tag value on the table because the data quality has changed due to the breaking referential integrity check.

You can verify the new tag values via the Lake Formation console as described earlier. The dq_tag value was DQ100. The value is changed to DQ50_80, reflecting the data quality score for the table.

Also, earlier the value for the col_class tag for the comment1 column was public. The value is now changed to sensitive because sensitive data is detected in this column.

Category 2 users shouldn’t be able to access sensitive columns in the table.

  1. Sign in with UserCategory2 to Athena and rerun the earlier query:
select * from "oktank_autogov_temp"."customer" limit 10

The column comment1 is now not available for UserCategory2 as expected. The access permissions are handled automatically.

Also, because the data quality score goes down below 100%, this new dataset is now not available for the Category1 user. This user should have access to data only when the score is 100% as per our defined rules.

  1. Sign in with UserCategory1 to Athena and rerun the earlier query:
select * from "oktank_autogov_temp"."customer" limit 10

You will see the user is not able to access the table now. The access permissions are handled automatically.

Use case 3

Let’s fix the invalid data and remove the data quality issue.

  1. Delete the cust_feedback_v1.csv file from the data/customer Amazon S3 location.
  2. Copy the file cust_feedback_v1_fixed.csv from the data-v1 folder in the S3 bucket to the data/customer S3 location. This data file fixes the data quality issues.
  3. Rerun the AutoGovMachine state machine.

When the state machine is complete, the data quality score goes up to 100% again and the tag on the table gets updated accordingly. You can verify the new tag as shown earlier via the Lake Formation console.

The Category1 user can access the table again.

Clean up

To avoid incurring further charges, delete the CloudFormation stack to delete the resources provisioned as part of this post.

Conclusion

This post covered AWS Glue Data Quality and sensitive detection features and Lake Formation LF-Tag based access control. We explored how you can combine these features and use them to build a scalable automated data governance capability on your data lake. We explored how user permissions changed when data was initially ingested to the table and when data drift was observed as part of subsequent ingestions.

For further reading, refer to the following resources:


About the Author

Shoukat Ghouse is a Senior Big Data Specialist Solutions Architect at AWS. He helps customers around the world build robust, efficient and scalable data platforms on AWS leveraging AWS analytics services like AWS Glue, AWS Lake Formation, Amazon Athena and Amazon EMR.